In [2]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
Figure 5 — H´enon attractor at (a, b) = (1.4, 0.3).¶
In [3]:
# Henon map function
def henon_map(a, b, x0, y0, steps):
x = [x0]
y = [y0]
for _ in range(steps - 1):
x_new = 1 - a * x[-1]**2 + y[-1]
y_new = b * x[-1]
x.append(x_new)
y.append(y_new)
return x, y
x_vals, y_vals = henon_map(a=1.4, b=0.3, x0=0, y0=0, steps=10000)
plt.figure(figsize=(8, 8))
plt.scatter(x_vals, y_vals, s=0.1, color='black')
plt.title("Hénon Attractor")
plt.xlabel("x")
plt.ylabel("y")
plt.show()
Figure 6 — 2D and 3D delay embeddings using different measurement functions (τ = 1).¶
In [5]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
x_vals = np.zeros(n_points)
y_vals = np.zeros(n_points)
x_vals[0], y_vals[0] = x0, y0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 10000
tau = 1
# ----------------------
# Measurement functions (without absolute value variants)
# ----------------------
measurement_functions = [
(lambda x, y: np.sin(x/2) + np.cos(y/2), "sin(x/2)+cos(y/2)"),
(lambda x, y: np.sin(6*x), "sin(6x)"),
(lambda x, y: x * y, "x * y"),
(lambda x, y: x - y, "x - y")
]
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
# ----------------------
# Delay embedding + side-by-side with set_box_aspect
# ----------------------
for h_func, label in measurement_functions:
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# skip constant outputs
if np.allclose(s_vals, s_vals[0]):
print(f"Skipping measurement {label} (constant output)")
continue
# side-by-side plots with balanced box aspect
fig = plt.figure(figsize=(12, 5))
# 2D plot
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(s_vals[:-tau], s_vals[tau:], '.', color='navy', markersize=0.5, alpha=0.5)
ax1.set_title(fr"2D delay embedding: $h(x,y)={label}$")
ax1.set_xlabel(r"$s_n$")
ax1.set_ylabel(rf"$s_{{n+{tau}}}$")
ax1.grid(True, alpha=0.3)
ax1.set_box_aspect(1) # make it square
# 3D plot
ax2 = fig.add_subplot(1, 2, 2, projection='3d')
ax2.scatter(s_vals[:-2*tau], s_vals[tau:-tau], s_vals[2*tau:],
c='darkred', s=0.3, alpha=0.5)
ax2.set_title(fr"3D delay embedding: $h(x,y)={label}$")
ax2.set_xlabel(r"$s_n$")
ax2.set_ylabel(rf"$s_{{n+{tau}}}$")
ax2.set_zlabel(rf"$s_{{n+{2*tau}}}$")
ax2.view_init(elev=30, azim=45)
ax2.set_box_aspect([1,1,1]) # cube aspect ratio
plt.tight_layout()
plt.show()
# show multiple angles
fig = plt.figure(figsize=(15, 4))
for i, azim in enumerate([30, 60, 90, 120]):
ax = fig.add_subplot(1, 4, i+1, projection='3d')
ax.scatter(s_vals[:-2*tau], s_vals[tau:-tau], s_vals[2*tau:],
c='darkgreen', s=0.3, alpha=0.5)
ax.view_init(elev=30, azim=azim)
ax.set_title(f"azim={azim}°")
ax.set_xticks([]); ax.set_yticks([]); ax.set_zticks([])
ax.set_box_aspect([1,1,1])
plt.suptitle(fr"3D delay embedding $h(x,y)={label}$ viewed at different angles")
plt.show()
Figure 7 — 3D Delay Reconstructions with Varying Delays¶
Change the tau (τ) parameter to create delay reconstructions with varying delays.
In [8]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
x_vals = np.zeros(n_points)
y_vals[0], x_vals[0] = y0, x0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 10000
tau = 4 ######### Chnage this #############
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
# ----------------------
# Measurement function
# ----------------------
# h_func = lambda x, y: np.sin(x/2) + np.cos(y/2)
h_func = lambda x, y: np.sin(x/2) + np.cos(y/2)
label = "sin(2/x) + cos(2/x)"
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# ----------------------
# Side-by-side 2D and 3D delay embedding
# ----------------------
fig = plt.figure(figsize=(14,6))
# 2D delay embedding
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(s_vals[:-tau], s_vals[tau:], '.', color='navy', markersize=0.5, alpha=0.5)
ax1.set_title(fr"2D delay embedding: $h(x,y)={label}$")
ax1.set_xlabel(r"$s_n$")
ax1.set_ylabel(rf"$s_{{n+{tau}}}$")
ax1.grid(True, alpha=0.3)
ax1.set_box_aspect(1) # keep it square
# 3D delay embedding
ax2 = fig.add_subplot(1, 2, 2, projection='3d')
ax2.scatter(s_vals[:-2*tau], s_vals[tau:-tau], s_vals[2*tau:],
c='darkred', s=0.3, alpha=0.5)
ax2.set_title(fr"3D delay embedding: $h(x,y)={label}$")
ax2.set_xlabel(r"$s_n$")
ax2.set_ylabel(rf"$s_{{n+{tau}}}$")
ax2.set_zlabel(rf"$s_{{n+{2*tau}}}$")
ax2.view_init(elev=30, azim=45)
ax2.set_box_aspect([1,1,1]) # keep cube
plt.tight_layout()
plt.show()
# ----------------------
# 3D delay embedding from multiple angles
# ----------------------
fig = plt.figure(figsize=(15, 4))
for i, azim in enumerate([30, 60, 90, 120]):
ax = fig.add_subplot(1, 4, i+1, projection='3d')
ax.scatter(s_vals[:-2*tau], s_vals[tau:-tau], s_vals[2*tau:],
c='darkgreen', s=0.3, alpha=0.5)
ax.view_init(elev=30, azim=azim)
ax.set_title(f"azim={azim}°")
ax.set_xticks([]); ax.set_yticks([]); ax.set_zticks([])
ax.set_box_aspect([1,1,1])
plt.suptitle(fr"3D delay embedding $h(x,y)={label}$ viewed at different angles")
plt.show()
Figure 8: 4D delay reconstruction with h(x, y) = sin(2/x) + cos(2/x), τ = 1, (a, b) = (1.4, 0.3).¶
In [12]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
x_vals = np.zeros(n_points)
y_vals = np.zeros(n_points)
x_vals[0], y_vals[0] = x0, y0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 10000
tau = 1
Q = 4
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
# ----------------------
# Measurement function
# ----------------------
h_func = lambda x, y: np.sin(x / 2) + np.cos(y / 2)
# h_func = lambda x, y: np.sin(6*x)
# h_func = lambda x, y: x - y
label = r"x - y"
# Compute measurement time series
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# ----------------------
# Build 4D delay embedding: (s_n, s_{n+1}, s_{n+2}, s_{n+3})
# ----------------------
delay_vectors = np.column_stack((
s_vals[:-3*tau],
s_vals[tau:-2*tau],
s_vals[2*tau:-tau],
s_vals[3*tau:]
))
print(f"4D delay vectors shape: {delay_vectors.shape}")
# ----------------------
# Single 3D plot from one angle, colored by s_{n+3}
# ----------------------
fig = plt.figure(figsize=(8,6))
ax = fig.add_subplot(111, projection='3d')
p = ax.scatter(delay_vectors[:,0], delay_vectors[:,1], delay_vectors[:,2],
c=delay_vectors[:,3], cmap='viridis', s=0.3, alpha=0.8)
ax.view_init(elev=30, azim=45)
ax.set_title(fr"4D embedding: $(s_n, s_{{n+1}}, s_{{n+2}})$ colored by $s_{{n+3}}$ for $h(x,y)={label}$")
ax.set_xlabel(r"$s_n$")
ax.set_ylabel(r"$s_{n+1}$")
ax.set_zlabel(r"$s_{n+2}$")
ax.set_box_aspect([1,1,1])
plt.colorbar(p, ax=ax, shrink=0.6, label=r"$s_{n+3}$")
plt.show()
4D delay vectors shape: (9997, 4)
In [13]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
x_vals = np.zeros(n_points)
y_vals = np.zeros(n_points)
x_vals[0], y_vals[0] = x0, y0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 10000
tau = 1
Q = 4
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
# ----------------------
# Measurement function
# ----------------------
h_func = lambda x, y: np.sin(x / 2) + np.cos(y / 2)
label = r"\sin(x/2)+\cos(y/2)"
# Compute measurement time series
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# ----------------------
# Build 4D delay embedding: (s_n, s_{n+1}, s_{n+2}, s_{n+3})
# ----------------------
delay_vectors = np.column_stack((
s_vals[:-3*tau],
s_vals[tau:-2*tau],
s_vals[2*tau:-tau],
s_vals[3*tau:]
))
print(f"4D delay vectors shape: {delay_vectors.shape}")
# ----------------------
# Plot multiple 3D views (slice colored by s_{n+3})
# ----------------------
fig = plt.figure(figsize=(18,4))
azimuth_angles = [30, 60, 90, 120]
for i, azim in enumerate(azimuth_angles):
ax = fig.add_subplot(1, len(azimuth_angles), i+1, projection='3d')
p = ax.scatter(delay_vectors[:,0], delay_vectors[:,1], delay_vectors[:,2],
c=delay_vectors[:,3], cmap='viridis', s=0.3, alpha=0.8)
ax.view_init(elev=30, azim=azim)
ax.set_title(f"azim={azim}°")
ax.set_xticks([]); ax.set_yticks([]); ax.set_zticks([])
ax.set_box_aspect([1,1,1])
fig.suptitle(fr"4D embedding: $(s_n, s_{{n+1}}, s_{{n+2}})$ colored by $s_{{n+3}}$ for $h(x,y)={label}$", y=1.02)
plt.colorbar(p, ax=fig.axes, orientation='vertical', fraction=0.02, pad=0.04, label=r"$s_{n+3}$")
plt.show()
4D delay vectors shape: (9997, 4)
Figure 9: 4D delay reconstructions with h(x) = sin(kx), k = 1, 2, 4, 6, 8, 10¶
Change the dimension (Q) parameter to create delay reconstructions with varying dimensions.
In [14]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
x_vals = np.zeros(n_points)
y_vals = np.zeros(n_points)
x_vals[0], y_vals[0] = x0, y0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 10000
tau = 1
Q = 4 ###### Change this #######
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
# ----------------------
# Measurement function
# ----------------------
# h_func = lambda x, y: np.sin(x / 2) + np.cos(y / 2)
h_func = lambda x, y: np.sin(6*x)
# h_func = lambda x, y: x - y
label = r"\sin(6x)"
# Compute measurement time series
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# ----------------------
# Build 4D delay embedding: (s_n, s_{n+1}, s_{n+2}, s_{n+3})
# ----------------------
delay_vectors = np.column_stack((
s_vals[:-3*tau],
s_vals[tau:-2*tau],
s_vals[2*tau:-tau],
s_vals[3*tau:]
))
print(f"4D delay vectors shape: {delay_vectors.shape}")
# ----------------------
# Single 3D plot from one angle, colored by s_{n+3}
# ----------------------
fig = plt.figure(figsize=(8,6))
ax = fig.add_subplot(111, projection='3d')
p = ax.scatter(delay_vectors[:,0], delay_vectors[:,1], delay_vectors[:,2],
c=delay_vectors[:,3], cmap='viridis', s=0.3, alpha=0.8)
ax.view_init(elev=30, azim=45)
ax.set_title(fr"4D embedding: $(s_n, s_{{n+1}}, s_{{n+2}})$ colored by $s_{{n+3}}$ for $h(x,y)={label}$")
ax.set_xlabel(r"$s_n$")
ax.set_ylabel(r"$s_{n+1}$")
ax.set_zlabel(r"$s_{n+2}$")
ax.set_box_aspect([1,1,1])
plt.colorbar(p, ax=ax, shrink=0.6, label=r"$s_{n+3}$")
plt.show()
4D delay vectors shape: (9997, 4)
Figure 10 — FNN analysis for different measurement functions and embedding dimensions.¶
In [16]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
"""
Generates points for the Hénon map.
Args:
a (float): Parameter 'a' for the Hénon map.
b (float): Parameter 'b' for the Hénon map.
x0 (float): Initial x-coordinate.
y0 (float): Initial y-coordinate.
n_points (int): Number of points to generate.
Returns:
tuple: x and y coordinates of the generated points.
"""
x_vals = np.zeros(n_points)
y_vals = np.zeros(n_points)
x_vals[0], y_vals[0] = x0, y0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Delay Embedding Function
# ----------------------
def create_delay_vectors(s_vals, dim, tau):
"""
Creates delay embedding vectors from a time series.
Args:
s_vals (np.ndarray): The 1D time series.
dim (int): The embedding dimension (m).
tau (int): The delay time (tau).
Returns:
np.ndarray: A 2D array of delay vectors, shape (N - (dim-1)*tau, dim).
"""
num_points = len(s_vals)
# Calculate the maximum index for the last component of the last vector
max_idx = num_points - (dim - 1) * tau
if max_idx <= 0:
# print(f"Debug: Not enough points for dim={dim}, tau={tau}. Returning empty array.")
return np.array([]) # Not enough points to form vectors
# Create an array to hold the delay vectors
delay_vectors = np.zeros((max_idx, dim))
for i in range(dim):
delay_vectors[:, i] = s_vals[i * tau : i * tau + max_idx]
return delay_vectors
# ----------------------
# False Nearest Neighbors (FNN) Fraction Calculation
# ----------------------
def calculate_fnn_fraction(s_vals, max_dim, tau, R_tol=15.0, A_tol=2.0, theiler_window=0):
fnn_fractions = []
series_std = np.std(s_vals) # Standard deviation of the original time series
print(f"Calculating FNN for dimensions 1 to {max_dim}...")
for m in range(1, max_dim + 1):
# Create m-dimensional delay vectors
X_m = create_delay_vectors(s_vals, m, tau)
if X_m.shape[0] < 2: # Need at least 2 points to find a neighbor
print(f" Warning: Not enough points to form {m}-dimensional vectors or find neighbors. Stopping FNN calculation for higher dimensions.")
fnn_fractions.extend([np.nan] * (max_dim - m + 1)) # Fill remaining with NaN
break
# Create (m+1)-dimensional delay vectors (for comparison)
X_m_plus_1 = create_delay_vectors(s_vals, m + 1, tau)
if X_m_plus_1.shape[0] < 2: # Need at least 2 points
print(f" Warning: Not enough points to form {m+1}-dimensional vectors. Stopping FNN calculation for higher dimensions.")
fnn_fractions.extend([np.nan] * (max_dim - m + 1)) # Fill remaining with NaN
break
# Ensure both X_m and X_m_plus_1 have the same number of points
# (X_m_plus_1 will be shorter by 'tau' points due to its higher dimension)
num_valid_points = min(X_m.shape[0], X_m_plus_1.shape[0])
X_m = X_m[:num_valid_points]
X_m_plus_1 = X_m_plus_1[:num_valid_points]
if num_valid_points == 0:
fnn_fractions.append(np.nan)
print(f" Dimension m={m}: FNN Fraction = NaN (no valid points after alignment)")
continue
# Find nearest neighbors in m dimensions
# k=2 because the first neighbor is always the point itself
nn_model = NearestNeighbors(n_neighbors=2, algorithm='kd_tree')
nn_model.fit(X_m)
distances, indices = nn_model.kneighbors(X_m)
num_false_neighbors = 0
total_neighbors_checked = 0
for i in range(num_valid_points):
# The first index (0) is the point itself, second (1) is its nearest neighbor
nn_idx_in_m = indices[i, 1]
dist_m = distances[i, 1]
# Apply Theiler window: skip if neighbor is too close in time
if theiler_window > 0 and abs(i - nn_idx_in_m) <= theiler_window:
continue
total_neighbors_checked += 1
# Calculate the full Euclidean distance in m+1 dimensions
dist_m_plus_1 = np.linalg.norm(X_m_plus_1[i] - X_m_plus_1[nn_idx_in_m])
# Get the new coordinate difference (the (m)-th component, 0-indexed)
# This is the coordinate added when going from dim m to dim m+1
new_coord_diff = abs(X_m_plus_1[i, m] - X_m_plus_1[nn_idx_in_m, m])
# FNN Criterion 1 (Ratio Test):
# Checks if the increase in distance is significant relative to the m-dimensional distance.
# This form sqrt(delta_Z^2)/dist_m is one common formulation.
if dist_m > 0 and (new_coord_diff / dist_m) > R_tol:
num_false_neighbors += 1
# FNN Criterion 2 (Absolute Size Test):
# Checks if the absolute change in the new coordinate is large compared to the series' std.
elif series_std > 0 and (new_coord_diff / series_std) > A_tol:
num_false_neighbors += 1
if total_neighbors_checked > 0:
fnn_fraction = num_false_neighbors / total_neighbors_checked
else:
fnn_fraction = np.nan # No valid neighbors to check
fnn_fractions.append(fnn_fraction)
print(f" Dimension m={m}: FNN Fraction = {fnn_fraction:.4f} ({num_false_neighbors} false out of {total_neighbors_checked} checked)")
return fnn_fractions
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 20000 # Increased points for better FNN estimation
tau = 1
Q = 4 # Maximum dimension to test for FNN (increased for broader analysis)
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
# ----------------------
# Measurement function ##############################################################################################
# ----------------------
# Choose one measurement function
h_func = lambda x, y: np.sin(x / 2) + np.cos(y / 2)
label = r"\sin(x/2) + \cos(y/2)"
# h_func = lambda x, y: x - y
# label = r"x - y"
# h_func = lambda x, y: np.sin(6*x)
# label = r"sin(6x)"
# h_func = lambda x, y: y
# label = r"y"
# Compute measurement time series
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# ----------------------
# Calculate FNN Fraction
# ----------------------
# A_tol is often set relative to the standard deviation of the time series.
# R_tol is typically 10-15.
# Theiler window can be set based on the autocorrelation of the series,
# but for a deterministic system like Henon, a small window or 0 is fine.
# Adjusting A_tol here to be relative to the series' standard deviation if it's not already.
# A_tol=2.0 means 2 * series_std for the threshold.
fnn_fractions = calculate_fnn_fraction(s_vals, max_dim=Q, tau=tau,
R_tol=15.0, A_tol=2.0, theiler_window=0)
# ----------------------
# Plot FNN Fraction
# ----------------------
dimensions = np.arange(1, Q + 1)
plt.figure(figsize=(9, 6))
plt.plot(dimensions[:len(fnn_fractions)], fnn_fractions, 'o-', color='blue', linewidth=2, markersize=8)
plt.xlabel("Embedding Dimension (m)", fontsize=12)
plt.ylabel("FNN Fraction", fontsize=12)
plt.title(f"False Nearest Neighbors (FNN) Fraction for Hénon Map\n"
f"($h(x,y)={label}$, $\\tau={tau}$)", fontsize=14)
plt.xticks(dimensions[:len(fnn_fractions)])
plt.grid(True, linestyle='--', alpha=0.7)
plt.axhline(y=0.05, color='red', linestyle=':', label='5% Threshold') # Common threshold
plt.legend()
plt.tight_layout()
plt.show()
# ----------------------
# Interpret Results
# ----------------------
print("\n--- FNN Analysis Summary ---")
optimal_dim = None
for i, fnn in enumerate(fnn_fractions):
dim = dimensions[i]
if not np.isnan(fnn):
print(f"Dimension {dim}: FNN Fraction = {fnn:.4f}")
# A common heuristic for optimal dimension is when FNN drops below a small threshold (e.g., 5%)
# and remains low, or when it first reaches a minimum.
if optimal_dim is None and fnn < 0.05:
optimal_dim = dim
else:
print(f"Dimension {dim}: FNN Fraction = NaN (insufficient data)")
if optimal_dim is not None:
print(f"\nBased on the 5% threshold, the **optimal embedding dimension** is approximately: {optimal_dim}")
print("This is the minimum dimension where the attractor is likely fully unfolded,")
print("meaning the dynamics are no longer spuriously self-intersecting due to projection.")
else:
print("\nCould not determine an optimal embedding dimension based on the 5% threshold.")
print("Consider increasing `n_points` (number of generated points) or `max_dim` (maximum dimension to test),")
print("or adjusting FNN parameters (`R_tol`, `A_tol`, `theiler_window`).")
Calculating FNN for dimensions 1 to 4... Dimension m=1: FNN Fraction = 0.8079 (16157 false out of 19999 checked) Dimension m=2: FNN Fraction = 0.0000 (0 false out of 19998 checked) Dimension m=3: FNN Fraction = 0.0000 (0 false out of 19997 checked) Dimension m=4: FNN Fraction = 0.0000 (0 false out of 19996 checked)
--- FNN Analysis Summary --- Dimension 1: FNN Fraction = 0.8079 Dimension 2: FNN Fraction = 0.0000 Dimension 3: FNN Fraction = 0.0000 Dimension 4: FNN Fraction = 0.0000 Based on the 5% threshold, the **optimal embedding dimension** is approximately: 2 This is the minimum dimension where the attractor is likely fully unfolded, meaning the dynamics are no longer spuriously self-intersecting due to projection.
In [15]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# ----------------------
# Henon map generator
# ----------------------
def henon_map(a, b, x0, y0, n_points):
x_vals = np.zeros(n_points)
y_vals = np.zeros(n_points)
x_vals[0], y_vals[0] = x0, y0
for n in range(1, n_points):
x_vals[n] = 1 - a * x_vals[n-1]**2 + y_vals[n-1]
y_vals[n] = b * x_vals[n-1]
return x_vals, y_vals
# ----------------------
# Parameters
# ----------------------
a, b = 1.4, 0.3
x0, y0 = 0, 0
n_points = 10000
tau = 1
# ----------------------
# Choose a single measurement function
# ----------------------
h_func = lambda x, y: np.sin(x / 2) + np.cos(y / 2)
label = r"\sin(x/2) + \cos(y/2)"
# ----------------------
# Generate Hénon data
# ----------------------
x_vals, y_vals = henon_map(a, b, x0, y0, n_points)
s_vals = np.array([h_func(x, y) for x, y in zip(x_vals, y_vals)])
# ----------------------
# Create 1x3 plot: 2D, 3D, and 4D embeddings
# ----------------------
fig = plt.figure(figsize=(18, 5))
# 2D delay embedding
ax1 = fig.add_subplot(1, 3, 1)
ax1.plot(s_vals[:-tau], s_vals[tau:], '.', color='navy', markersize=0.5, alpha=0.5)
ax1.set_title("2D delay embedding")
ax1.set_xlabel(r"$s_n$")
ax1.set_ylabel(rf"$s_{{n+{tau}}}$")
ax1.grid(True, alpha=0.3)
ax1.set_box_aspect(1)
# 3D delay embedding
ax2 = fig.add_subplot(1, 3, 2, projection='3d')
ax2.scatter(s_vals[:-2*tau], s_vals[tau:-tau], s_vals[2*tau:], c='darkred', s=0.3, alpha=0.5)
ax2.set_title("3D delay embedding")
ax2.set_xlabel(r"$s_n$")
ax2.set_ylabel(rf"$s_{{n+{tau}}}$")
ax2.set_zlabel(rf"$s_{{n+{2*tau}}}$")
ax2.view_init(elev=30, azim=45)
ax2.set_box_aspect([1,1,1])
# 4D delay embedding — colored 3D projection
from matplotlib import cm
color_vals = s_vals[3*tau:]
ax3 = fig.add_subplot(1, 3, 3, projection='3d')
p = ax3.scatter(s_vals[:-3*tau], s_vals[tau:-2*tau], s_vals[2*tau:-tau],
c=color_vals, cmap='viridis', s=0.3, alpha=0.5)
fig.colorbar(p, ax=ax3, shrink=0.6, label=rf"$s_{{n+{3*tau}}}$")
ax3.set_title("4D delay embedding (color = 4th dim)")
ax3.set_xlabel(r"$s_n$")
ax3.set_ylabel(rf"$s_{{n+{tau}}}$")
ax3.set_zlabel(rf"$s_{{n+{2*tau}}}$")
ax3.view_init(elev=30, azim=45)
ax3.set_box_aspect([1,1,1])
fig.suptitle(fr"Delay Embeddings with $h(x,y)={label}$", fontsize=16)
plt.tight_layout()
plt.show()
Figure 11: Delay reconstructions vs. GPR-generated attractors.¶
- Original Henon Data
- Apply measurement function
- Delay Embedding for 2D and 3D
- Sample and GPR on 2D and 3D (Use 2D/3D dimentional data for both input and output)
- Wassertein Distance
Change num_training_pairs variable to test different number of training data points.
In [17]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
from scipy.stats import wasserstein_distance
# --- Step 1: Henon Map Generation ---
# This function generates a time series from the Henon map.
def generate_henon_map(n_points, a=1.4, b=0.3, initial_x=0.0, initial_y=0.0):
"""
Generates a time series from the Henon map.
Args:
n_points (int): The number of points to generate.
a (float): The parameter 'a' for the Henon map.
b (float): The parameter 'b' for the Henon map.
initial_x (float): The initial x-coordinate.
initial_y (float): The initial y-coordinate.
Returns:
tuple: A tuple containing two numpy arrays for the x and y coordinates.
"""
x = np.zeros(n_points)
y = np.zeros(n_points)
x[0], y[0] = initial_x, initial_y
for i in range(n_points - 1):
x[i+1] = 1 - a * x[i]**2 + y[i]
y[i+1] = b * x[i]
return x, y
# --- Step 2: Measurement Function and Delay Reconstruction ---
# This is the specified measurement function.
def measurement_function(x, y):
"""
The measurement function h(x, y) = sin(x/2) + cos(y/2).
"""
return np.sin(x/2) + np.cos(y/2)
# This function creates a delay reconstruction from a time series.
def delay_reconstruction(time_series, dimension, delay):
"""
Performs a delay reconstruction on a time series.
Args:
time_series (np.ndarray): The 1D time series to embed.
dimension (int): The embedding dimension.
delay (int): The delay parameter (tau).
Returns:
np.ndarray: The delay-embedded attractor.
"""
n_points = len(time_series)
embedded_points = np.zeros((n_points - (dimension - 1) * delay, dimension))
for i in range(dimension):
embedded_points[:, i] = time_series[i * delay : i * delay + embedded_points.shape[0]]
return embedded_points
# This function trains a GPR model and generates a new time series by predicting
# the next single scalar value.
def train_and_generate_gpr_fixed(X_train, y_train_scalar, start_point, n_predict_points):
"""
Trains a GPR model to predict the next single scalar value and uses it to
iteratively generate a new time series.
Args:
X_train (np.ndarray): The input state vectors for training.
y_train_scalar (np.ndarray): The single scalar values to predict.
start_point (np.ndarray): The starting state vector for prediction.
n_predict_points (int): The number of points to predict.
Returns:
np.ndarray: The new GPR-generated attractor.
"""
# Define the GPR kernel.
kernel = 1.0 * RBF(length_scale=1.0) + WhiteKernel(noise_level=1e-4)
# Train the GPR model to predict the single next value.
gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=0.0001, normalize_y=False)
gp.fit(X_train, y_train_scalar)
# Use the trained model to generate a new time series by iteration.
predicted_attractor = np.zeros((n_predict_points, start_point.shape[0]))
predicted_attractor[0, :] = start_point
# Create the initial state vector for prediction
current_state_vector = start_point.copy()
# Generate points iteratively
for i in range(n_predict_points - 1):
# Predict the NEXT scalar value of the time series from the current state vector.
next_scalar_value = gp.predict(current_state_vector.reshape(1, -1))[0]
# Construct the new state vector by shifting the old one and appending the new scalar.
new_state_vector = np.roll(current_state_vector, -1)
new_state_vector[-1] = next_scalar_value
predicted_attractor[i+1, :] = new_state_vector
current_state_vector = new_state_vector
return predicted_attractor
# --- Step 3: Wasserstein Distance Calculation ---
# This function calculates a simplified multi-dimensional Wasserstein distance.
def calculate_wasserstein(points1, points2, dimension):
"""
Calculates the average 1D Wasserstein distance across all dimensions.
This is a simplification of the true multi-dimensional Wasserstein distance,
which would require an optimal transport library.
"""
distances = []
for i in range(dimension):
distances.append(wasserstein_distance(points1[:, i], points2[:, i]))
return np.mean(distances)
# New function to calculate normalized Wasserstein distance
def calculate_normalized_wasserstein(points1, points2, dimension):
"""
Calculates the normalized Wasserstein distance between two point clouds.
The normalization is done by dividing by the product of the number of
dimensions and the range of the measurement function.
"""
wasserstein_dist = calculate_wasserstein(points1, points2, dimension)
# The range of the measurement function h(x,y) is [-2, 2], so the side length is 4.
normalization_factor = dimension * 4
return wasserstein_dist / normalization_factor
# --- Main Execution ---
if __name__ == "__main__":
# Parameters
num_original_points = 2000 # Number of points for the full Henon map
num_training_pairs = 40 # Number of (current_state, next_state) pairs for training ##################################
num_gpr_points = 2000 # Number of points to generate with GPR
delay_tau = 1 # Delay parameter for reconstruction
# Set a random seed for reproducibility
np.random.seed(42)
# --- Generate Original Data and Attractors ---
# 1. Generate the original Henon map data
original_x, original_y = generate_henon_map(num_original_points)
# 2. Apply the measurement function to the full dataset
original_measured_data = measurement_function(original_x, original_y)
# 3. Create the full, original attractors for comparison
original_attractor_2d = delay_reconstruction(original_measured_data, dimension=2, delay=delay_tau)
original_attractor_3d = delay_reconstruction(original_measured_data, dimension=3, delay=delay_tau)
# --- Prepare training data for 2D ---
print("Processing 2D Attractor...")
# Randomly select a single set of indices
valid_indices_2d = np.arange(len(original_attractor_2d) - 1)
random_indices_2d = np.random.choice(valid_indices_2d, size=num_training_pairs, replace=False)
# Correctly build X_train and y_train (scalar) for the 2D case
X_train_2d = original_attractor_2d[random_indices_2d, :]
y_train_2d_scalar = original_measured_data[random_indices_2d + 2*delay_tau]
# Get a starting point for prediction from the original attractor
start_point_2d = original_attractor_2d[-1, :]
# Train GPR and generate a new attractor
gpr_attractor_2d = train_and_generate_gpr_fixed(X_train_2d, y_train_2d_scalar, start_point_2d, num_gpr_points)
# --- Prepare training data for 3D ---
print("Processing 3D Attractor...")
# Randomly select a single set of indices
valid_indices_3d = np.arange(len(original_attractor_3d) - 1)
random_indices_3d = np.random.choice(valid_indices_3d, size=num_training_pairs, replace=False)
# Correctly build X_train and y_train (scalar) for the 3D case
X_train_3d = original_attractor_3d[random_indices_3d, :]
y_train_3d_scalar = original_measured_data[random_indices_3d + 3*delay_tau]
# Get a starting point for prediction from the original attractor
start_point_3d = original_attractor_3d[-1, :]
# Train GPR and generate a new attractor
gpr_attractor_3d = train_and_generate_gpr_fixed(X_train_3d, y_train_3d_scalar, start_point_3d, num_gpr_points)
# --- Calculate and Print Results ---
# Calculate and print Wasserstein distances
wasserstein_2d = calculate_wasserstein(original_attractor_2d, gpr_attractor_2d, dimension=2)
wasserstein_3d = calculate_wasserstein(original_attractor_3d, gpr_attractor_3d, dimension=3)
print(f"Wasserstein Distance for 2D Attractors: {wasserstein_2d:.4f}")
print(f"Wasserstein Distance for 3D Attractors: {wasserstein_3d:.4f}")
# Calculate and print Normalized Wasserstein distances
normalized_wasserstein_2d = calculate_normalized_wasserstein(original_attractor_2d, gpr_attractor_2d, dimension=2)
normalized_wasserstein_3d = calculate_normalized_wasserstein(original_attractor_3d, gpr_attractor_3d, dimension=3)
print(f"Normalized Wasserstein Distance for 2D Attractors: {normalized_wasserstein_2d:.4f}")
print(f"Normalized Wasserstein Distance for 3D Attractors: {normalized_wasserstein_3d:.4f}")
# --- Plot the results for visual comparison ---
fig = plt.figure(figsize=(15, 6))
# Plot 2D Attractors
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(original_attractor_2d[:, 0], original_attractor_2d[:, 1], 'o', markersize=1, alpha=0.5, label='Original Henon', color = 'black')
ax1.plot(gpr_attractor_2d[:, 0], gpr_attractor_2d[:, 1], 'o', markersize=1, alpha=0.5, label='GPR Generated', color = 'pink')
ax1.set_title('2D Delay Reconstruction (GPR on Embedding)')
ax1.set_xlabel('$h(t)$')
ax1.set_ylabel('$h(t + \\tau)$')
ax1.legend()
# Plot 3D Attractors
ax2 = fig.add_subplot(1, 2, 2, projection='3d')
ax2.plot(original_attractor_3d[:, 0], original_attractor_3d[:, 1], original_attractor_3d[:, 2], 'o', markersize=1, alpha=0.5, label='Original Henon', color = 'black')
ax2.plot(gpr_attractor_3d[:, 0], gpr_attractor_3d[:, 1], gpr_attractor_3d[:, 2], 'o', markersize=1, alpha=0.5, label='GPR Generated', color = 'pink')
ax2.set_title('3D Delay Reconstruction (GPR on Embedding)')
ax2.set_xlabel('$h(t)$')
ax2.set_ylabel('$h(t + \\tau)$')
ax2.set_zlabel('$h(t + 2\\tau)$')
ax2.legend()
plt.tight_layout()
plt.show()
Processing 2D Attractor...
/usr/local/lib/python3.12/dist-packages/sklearn/gaussian_process/kernels.py:442: ConvergenceWarning: The optimal value found for dimension 0 of parameter k2__noise_level is close to the specified lower bound 1e-05. Decreasing the bound and calling fit again may find a better value. warnings.warn(
Processing 3D Attractor...
/usr/local/lib/python3.12/dist-packages/sklearn/gaussian_process/kernels.py:442: ConvergenceWarning: The optimal value found for dimension 0 of parameter k2__noise_level is close to the specified lower bound 1e-05. Decreasing the bound and calling fit again may find a better value. warnings.warn(
Wasserstein Distance for 2D Attractors: 0.0119 Wasserstein Distance for 3D Attractors: 0.0236 Normalized Wasserstein Distance for 2D Attractors: 0.0015 Normalized Wasserstein Distance for 3D Attractors: 0.0020
Figure 12 — Delay embeddings and GPR reconstructions from noisy data.¶
Change num_training_pairs to test different numer of training points.
In [19]:
!pip install POT
import numpy as np
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel, ConstantKernel
# POT (Python Optimal Transport)
import ot
# ========== UTILITIES ==========
def set_seed(seed: int = 1):
np.random.seed(seed)
# ========== HÉNON MAP & MEASUREMENT ==========
def generate_henon_map(n_points: int, a: float = 1.4, b: float = 0.3,
initial_x: float = 0.0, initial_y: float = 0.0):
"""
Generate a trajectory of the canonical Hénon map:
x_{t+1} = 1 - a x_t^2 + y_t
y_{t+1} = b x_t
"""
x = np.zeros(n_points)
y = np.zeros(n_points)
x[0], y[0] = initial_x, initial_y
for i in range(n_points - 1):
x[i+1] = 1 - a * x[i]**2 + y[i]
y[i+1] = b * x[i]
return x, y
def measurement_function(x, y):
"""
Example measurement function: h(x, y) = sin(x/2) + cos(y/2).
Replace or wrap with others as needed.
"""
return np.sin(x/2) + np.cos(y/2)
# ========== DELAY EMBEDDING & SUPERVISED DATA ==========
def delay_reconstruction(time_series: np.ndarray, dimension: int, delay: int):
"""
Build a delay vector embedding from a 1D series s_t:
v_i = [s_i, s_{i+τ}, s_{i+2τ}, ..., s_{i+(m-1)τ}]
Returns shape: (N_eff, dimension) with N_eff = N - (dimension-1)*delay
"""
s = np.asarray(time_series)
N = len(s)
N_eff = N - (dimension - 1) * delay
if N_eff <= 0:
raise ValueError("Time series too short for requested embedding.")
out = np.zeros((N_eff, dimension))
for j in range(dimension):
out[:, j] = s[j*delay : j*delay + N_eff]
return out
def make_supervised_from_embedding(time_series: np.ndarray, dimension: int, delay: int):
"""
Create (X, y) for next-scalar prediction consistent with the embedding.
If v_i = [s_i, s_{i+τ}, ..., s_{i+(m-1)τ}], the 'next' scalar we append is s_{i + mτ}.
We must ensure indices are valid up to s_{i+mτ}, so we truncate the final rows accordingly.
"""
s = np.asarray(time_series)
N = len(s)
m = dimension
tau = delay
# valid i satisfy i + m*tau <= N-1 -> i <= N - m*tau - 1
last_i = N - m*tau - 1
if last_i < 0:
raise ValueError("Time series too short to form (X,y) with given (m, τ).")
# Build embedding up to i = last_i
X_full = delay_reconstruction(s, m, tau)
# X_full has length N - (m-1)*tau; we need only first (last_i+1) rows
X = X_full[:last_i + 1, :]
y = s[(np.arange(last_i + 1) + m*tau)]
return X, y
# ========== GPR TRAIN / GENERATE ==========
def train_gpr_for_next_scalar(X_train: np.ndarray, y_train: np.ndarray) -> Pipeline:
"""
Train a GPR that maps current delay-vector state -> next scalar.
We wrap with a StandardScaler for stability.
"""
kernel = ConstantKernel(1.0, (1e-2, 1e3)) * RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e3)) \
+ WhiteKernel(noise_level=1e-4, noise_level_bounds=(1e-8, 1e-1))
gpr = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5, alpha=0.0, normalize_y=True)
model = Pipeline([("scaler", StandardScaler(with_mean=True, with_std=True)),
("gpr", gpr)])
model.fit(X_train, y_train)
return model
def iterate_gpr(model: Pipeline, start_state: np.ndarray, n_steps: int):
"""
Iteratively roll the predicted scalar into the delay state (shift left, append).
Returns an array of shape (n_steps, state_dim).
"""
state = start_state.copy().astype(float)
m = state.size
out = np.zeros((n_steps, m))
out[0] = state
for t in range(1, n_steps):
next_scalar = model.predict(state.reshape(1, -1))[0]
state = np.roll(state, -1)
state[-1] = next_scalar
out[t] = state
return out
# ========== WASSERSTEIN (POT) ==========
def wasserstein_distance_pot(points1: np.ndarray, points2: np.ndarray):
"""
True 2nd-order Wasserstein distance between two point clouds with uniform weights.
"""
n1, n2 = points1.shape[0], points2.shape[0]
a = np.ones(n1) / n1
b = np.ones(n2) / n2
M = ot.dist(points1, points2, metric="euclidean")
W2 = ot.emd2(a, b, M) # squared 2-Wasserstein with cost = Euclidean distance
return np.sqrt(W2)
def normalized_wasserstein(points1: np.ndarray, points2: np.ndarray):
"""
Normalize by the max possible distance in a d-dim cube of side length L.
For your measurement range h∈[-2,2], L=4, so max distance ~ L*sqrt(d).
"""
d = points1.shape[1]
L = 4.0
denom = L * np.sqrt(d)
return wasserstein_distance_pot(points1, points2) / denom
# ========== DIKS–TAKENS DELAY‑VECTOR DISTANCE TEST ==========
# (Gaussian-kernel–smoothed unbiased distance estimator Q, and standardized S)
# See Diks et al. (1996) “Detecting differences between delay vector distributions”
# Key pieces we implement:
# - h(s,t) = exp(-||s - t||^2 / (4 d_bw)) with bandwidth d_bw > 0
# - Segment (Theiler) averaging with block length l to reduce dependence
# - S = Q / sqrt(Var_0(Q | data)) and reject null (same distribution) if S > 3
# (one-sided; conservative threshold) [paper’s guidance]
# References: sections on the estimator Q and segment method, and S-threshold.
# (Citations inline above.)
def _gaussian_h(u: np.ndarray, v: np.ndarray, d_bw: float) -> float:
# h(s,t) = exp(-||s - t||^2 / (4 d))
diff = u - v
return np.exp(- (diff @ diff) / (4.0 * d_bw))
def _pairwise_h_blockavg(Z: np.ndarray, d_bw: float, l: int) -> np.ndarray:
"""
Build block-averaged h'(i',j') over l×l index squares on the (i,j) plane.
Returns H′ as a square matrix of size B×B where B = ceil(N/l), with
H′[I,J] = average of h(z_{i}, z_{j}) for i∈block I, j∈block J, i!=j when I==J.
This mirrors the segment method idea.
"""
N = Z.shape[0]
B = int(np.ceil(N / l))
Hprime = np.zeros((B, B))
for I in range(B):
i_start, i_end = I*l, min((I+1)*l, N)
Zi = Z[i_start:i_end]
for J in range(B):
j_start, j_end = J*l, min((J+1)*l, N)
Zj = Z[j_start:j_end]
# compute mean of h over all pairs; omit i==j only when same block
count = 0
acc = 0.0
for ii in range(Zi.shape[0]):
for jj in range(Zj.shape[0]):
if not (I == J and i_start+ii == j_start+jj):
acc += _gaussian_h(Zi[ii], Zj[jj], d_bw)
count += 1
if count > 0:
Hprime[I, J] = acc / count
else:
Hprime[I, J] = 0.0
return Hprime
def diks_takens_S(X: np.ndarray, Y: np.ndarray, d_bw: float, segment_len: int = 18):
"""
Compute the standardized DV statistic S comparing two point sets X, Y (same dim).
Uses block-averaged segment method (length l) to mitigate dependence (Theiler-like).
Returns (S, Q_hat, var_cond).
"""
# Build segment-averaged H′ matrices
Hxx = _pairwise_h_blockavg(X, d_bw, segment_len)
Hyy = _pairwise_h_blockavg(Y, d_bw, segment_len)
# Cross-block averages
# For cross, we average h over all pairs mixing blocks from X and Y
# Build condensed block-averaged matrix Hxy of size Bx × By
def blockavg_cross(A: np.ndarray, Bm: np.ndarray, d_bw: float, l: int):
# A and Bm are the raw point arrays (not blocks)
NA, NB = A.shape[0], Bm.shape[0]
BA = int(np.ceil(NA/l)); BB = int(np.ceil(NB/l))
out = np.zeros((BA, BB))
for I in range(BA):
ia, ib = I*l, min((I+1)*l, NA)
Ai = A[ia:ib]
for J in range(BB):
ja, jb = J*l, min((J+1)*l, NB)
Bj = Bm[ja:jb]
acc, cnt = 0.0, 0
for ii in range(Ai.shape[0]):
for jj in range(Bj.shape[0]):
acc += _gaussian_h(Ai[ii], Bj[jj], d_bw)
cnt += 1
out[I, J] = acc / cnt
return out
Hxy = blockavg_cross(X, Y, d_bw, segment_len)
# Q_hat = mean(Hxx) + mean(Hyy) - 2*mean(Hxy)
Q_xx = Hxx.mean()
Q_yy = Hyy.mean()
Q_xy = Hxy.mean()
Q_hat = Q_xx + Q_yy - 2.0 * Q_xy
# Conditional variance under H0, using block-averaged version analogous to paper.
# We use the (centered) matrix and the combinatorial prefactor from the derivation.
# For simplicity and robustness here, we estimate Var via a leave-block-out style:
# - jackknife over blocks for Q_hat to get Var(Q_hat). (Practical and stable.)
def jackknife_var_Q(Hxx, Hyy, Hxy):
# Compute Q_hat with all blocks
Bx = Hxx.shape[0]; By = Hyy.shape[0]
# Leave-one-out in X or Y block sets and average
q_vals = []
# Leave one X block out
for i in range(Bx):
idx = np.arange(Bx) != i
Q_xx_i = Hxx[idx][:, idx].mean() if idx.any() else 0.0
Q_xy_i = Hxy[idx, :].mean()
q_vals.append(Q_xx_i + Q_yy - 2.0 * Q_xy_i)
# Leave one Y block out
for j in range(By):
jdx = np.arange(By) != j
Q_yy_j = Hyy[jdx][:, jdx].mean() if jdx.any() else 0.0
Q_xy_j = Hxy[:, jdx].mean()
q_vals.append(Q_xx + Q_yy_j - 2.0 * Q_xy_j)
q_vals = np.array(q_vals)
# jackknife variance estimate
n = len(q_vals)
mu = q_vals.mean()
var = (n - 1) / n * np.sum((q_vals - mu)**2)
return max(var, 1e-12)
var_hat = jackknife_var_Q(Hxx, Hyy, Hxy)
S = Q_hat / np.sqrt(var_hat)
return S, Q_hat, var_hat
# ========== MAIN DEMO ==========
if __name__ == "__main__":
# --- Parameters ---
set_seed(50)
num_original_points = 2000
delay_tau = 1
noise_std_dev = 0.05
# Train size (random subsample of supervised pairs)
num_training_pairs = 200 # larger than 40 for more stable GPR
num_gpr_points = 2000
# Toggle DV test
use_dv_test = True
dv_bandwidth = 2.5e-2 # try 2.5e-2 .. 1e-1; tune per data scale
dv_segment_len = 18 # as suggested by segment method examples
# Embedding dimensions to compare
dims = [2, 3]
# --- Generate original data ---
x, y = generate_henon_map(num_original_points)
measured = measurement_function(x, y)
# Noise & noisy measured
noisy = measured + np.random.normal(0.0, noise_std_dev, size=measured.shape)
# For plotting truth (noise-free) embeddings
true_attractor = {
2: delay_reconstruction(measured, dimension=2, delay=delay_tau),
3: delay_reconstruction(measured, dimension=3, delay=delay_tau),
}
results = {}
for m in dims:
print(f"\n=== Processing {m}D embedding ===")
# Build embedding (noisy) for comparison & training
noisy_emb = delay_reconstruction(noisy, dimension=m, delay=delay_tau)
# Build supervised pairs safely (no off-by-one)
X_all, y_all = make_supervised_from_embedding(noisy, dimension=m, delay=delay_tau)
# Choose random training subset (reproducible)
n_pairs = min(num_training_pairs, len(X_all))
idx = np.random.choice(len(X_all), size=n_pairs, replace=False)
X_train = X_all[idx]
y_train = y_all[idx]
# Start state: last available embedded state that also allows roll-forward
start_state = noisy_emb[-1].copy()
# Train & iterate
model = train_gpr_for_next_scalar(X_train, y_train)
gpr_emb = iterate_gpr(model, start_state, num_gpr_points)
# Wasserstein distances
W = wasserstein_distance_pot(noisy_emb, gpr_emb)
Wn = normalized_wasserstein(noisy_emb, gpr_emb)
print(f"Wasserstein (POT) for {m}D: {W:.4f}")
print(f"Normalized Wasserstein for {m}D: {Wn:.4f}")
# DV test (optional)
if use_dv_test:
# To keep runtime reasonable, subsample clouds if huge
max_pts = 3000
A = noisy_emb if len(noisy_emb) <= max_pts else noisy_emb[np.random.choice(len(noisy_emb), max_pts, replace=False)]
B = gpr_emb if len(gpr_emb) <= max_pts else gpr_emb[np.random.choice(len(gpr_emb), max_pts, replace=False)]
S, Qhat, varQ = diks_takens_S(A, B, d_bw=dv_bandwidth, segment_len=dv_segment_len)
print(f"Diks–Takens DV test (m={m}): S = {S:.3f} (reject H0 if S>3)")
results[m] = {
"noisy_emb": noisy_emb,
"gpr_emb": gpr_emb,
"true_emb": true_attractor[m],
"W": W,
"Wn": Wn,
"S": S if use_dv_test else None,
}
# --- Plot ---
fig = plt.figure(figsize=(15, 6))
# 2D
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(results[2]["true_emb"][:, 0], results[2]["true_emb"][:, 1],
'o', ms=1, alpha=0.5, label='Original (noise-free)', color='blue')
ax1.plot(results[2]["noisy_emb"][:, 0], results[2]["noisy_emb"][:, 1],
'o', ms=1, alpha=0.4, label='Noisy', color='gray')
ax1.plot(results[2]["gpr_emb"][:, 0], results[2]["gpr_emb"][:, 1],
'o', ms=1, alpha=0.5, label='GPR generated', color='orange')
ax1.set_title('2D Delay Reconstruction')
ax1.set_xlabel('$h(t)$')
ax1.set_ylabel('$h(t+\\tau)$')
ax1.legend(markerscale=5)
# 3D
ax2 = fig.add_subplot(1, 2, 2, projection='3d')
ax2.plot(results[3]["true_emb"][:, 0], results[3]["true_emb"][:, 1], results[3]["true_emb"][:, 2],
'o', ms=1, alpha=0.5, label='Original (noise-free)', color='blue')
ax2.plot(results[3]["noisy_emb"][:, 0], results[3]["noisy_emb"][:, 1], results[3]["noisy_emb"][:, 2],
'o', ms=1, alpha=0.4, label='Noisy', color='gray')
ax2.plot(results[3]["gpr_emb"][:, 0], results[3]["gpr_emb"][:, 1], results[3]["gpr_emb"][:, 2],
'o', ms=1, alpha=0.5, label='GPR generated', color='orange')
ax2.set_title('3D Delay Reconstruction')
ax2.set_xlabel('$h(t)$')
ax2.set_ylabel('$h(t+\\tau)$')
ax2.set_zlabel('$h(t+2\\tau)$')
ax2.legend(markerscale=5)
plt.tight_layout()
plt.show()
Collecting POT
Downloading pot-0.9.6-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 40.2/40.2 kB 1.0 MB/s eta 0:00:00
Requirement already satisfied: numpy>=1.16 in /usr/local/lib/python3.12/dist-packages (from POT) (2.0.2)
Requirement already satisfied: scipy>=1.6 in /usr/local/lib/python3.12/dist-packages (from POT) (1.16.1)
Downloading pot-0.9.6-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.5/1.5 MB 13.5 MB/s eta 0:00:00
Installing collected packages: POT
Successfully installed POT-0.9.6
=== Processing 2D embedding ===
Wasserstein (POT) for 2D: 0.2642
Normalized Wasserstein for 2D: 0.0467
Diks–Takens DV test (m=2): S = 4.451 (reject H0 if S>3)
=== Processing 3D embedding ===
Wasserstein (POT) for 3D: 0.2807
Normalized Wasserstein for 3D: 0.0405
Diks–Takens DV test (m=3): S = 4.776 (reject H0 if S>3)