Source code for analytic_continuation.laurent

"""
Laurent map fitting and evaluation for analytic continuation.

Implements Stage 3 of the pipeline: fitting z(ζ) so that the unit circle
maps to approximate a Jordan curve.
"""

import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Dict, Any
import json

from .types import Point, SplineExport, LaurentMap, Complex


[docs] @dataclass class LaurentFitConfig: """Configuration for Laurent map fitting.""" N_min: int = 6 N_max: int = 64 m_samples: int = 2048 lambda_init: float = 1e-6 lambda_increase_factor: float = 10.0 max_lambda_tries: int = 6 reparam_iters: int = 2 theta_search_grid: int = 2048 theta_refine_iters: int = 3 fit_tol_max_factor: float = 0.002 prefer_smallest_N: bool = True rho_in: float = 0.97 rho_out: float = 1.03 check_theta_grid: int = 4096 min_abs_deriv_factor: float = 1e-8 min_sep_factor: float = 1e-5 poly_eps_factor: float = 0.0005
[docs] @classmethod def from_pipeline_config(cls, cfg: dict) -> "LaurentFitConfig": """Create from pipeline_config.json structure.""" fit_cfg = cfg.get("fitLaurentMap", {}) deg = fit_cfg.get("degree", {}) samp = fit_cfg.get("sampling", {}) reg = fit_cfg.get("regularization", {}) lam_auto = reg.get("lambdaAuto", {}) reparam = fit_cfg.get("reparameterization", {}) theta_search = reparam.get("thetaSearch", {}) stopping = fit_cfg.get("stopping", {}) checks = fit_cfg.get("checks", {}) return cls( N_min=deg.get("N_min", 6), N_max=deg.get("N_max", 64), m_samples=samp.get("m_samples", 2048), lambda_init=lam_auto.get("lambda0Factor", 1e-6), lambda_increase_factor=lam_auto.get("increaseFactorOnFailure", 10.0), max_lambda_tries=lam_auto.get("maxTries", 6), reparam_iters=reparam.get("iters", 2), theta_search_grid=theta_search.get("grid", 2048), theta_refine_iters=theta_search.get("localRefineIters", 3), fit_tol_max_factor=stopping.get("fitTolMaxFactor", 0.002), prefer_smallest_N=stopping.get("preferSmallestN", True), rho_in=checks.get("rho_in", 0.97), rho_out=checks.get("rho_out", 1.03), check_theta_grid=checks.get("theta_grid", 4096), min_abs_deriv_factor=checks.get("minAbsDerivFactor", 1e-8), min_sep_factor=checks.get("minSepFactor", 1e-5), poly_eps_factor=checks.get("simplicity", {}).get("polyEpsFactor", 0.0005), )
[docs] @dataclass class LaurentMapResult: """Result of Laurent map evaluation.""" N: int a0: complex a: np.ndarray # [a_1, ..., a_N] b: np.ndarray # [b_1, ..., b_N]
[docs] def eval(self, zeta: complex) -> complex: """Evaluate z(ζ) = a0 + Σ a_k ζ^k + Σ b_k ζ^{-k}.""" z = self.a0 p = zeta for k in range(self.N): z += self.a[k] * p p *= zeta q = 1.0 / zeta p = q for k in range(self.N): z += self.b[k] * p p *= q return z
[docs] def eval_array(self, zeta: np.ndarray) -> np.ndarray: """Vectorized evaluation.""" z = np.full_like(zeta, self.a0) p = zeta.copy() for k in range(self.N): z += self.a[k] * p p *= zeta q = 1.0 / zeta p = q.copy() for k in range(self.N): z += self.b[k] * p p *= q return z
[docs] def deriv(self, zeta: complex) -> complex: """Evaluate z'(ζ) = Σ k a_k ζ^{k-1} - Σ k b_k ζ^{-k-1}.""" dz = 0.0 p = 1.0 for k in range(self.N): dz += (k + 1) * self.a[k] * p p *= zeta q = 1.0 / zeta p = q * q for k in range(self.N): dz -= (k + 1) * self.b[k] * p p *= q return dz
[docs] def deriv_array(self, zeta: np.ndarray) -> np.ndarray: """Vectorized derivative.""" dz = np.zeros_like(zeta) p = np.ones_like(zeta) for k in range(self.N): dz += (k + 1) * self.a[k] * p p *= zeta q = 1.0 / zeta p = q * q for k in range(self.N): dz -= (k + 1) * self.b[k] * p p *= q return dz
[docs] def to_laurent_map(self) -> LaurentMap: """Convert to serializable LaurentMap type.""" return LaurentMap( N=self.N, a0=Complex.from_complex(self.a0), a=[Complex.from_complex(c) for c in self.a], b=[Complex.from_complex(c) for c in self.b], )
[docs] @classmethod def from_laurent_map(cls, lm: LaurentMap) -> "LaurentMapResult": """Create from serializable LaurentMap type.""" return cls( N=lm.N, a0=lm.a0.to_complex(), a=np.array([c.to_complex() for c in lm.a]), b=np.array([c.to_complex() for c in lm.b]), )
[docs] @dataclass class FitResult: """Result of fitting a Laurent map.""" ok: bool failure_reason: Optional[str] = None curve_scale: float = 0.0 polyline_used: List[complex] = field(default_factory=list) laurent_map: Optional[LaurentMapResult] = None fit_max_err: float = float('inf') fit_rms_err: float = float('inf') simple_on_unit_circle: bool = False min_abs_deriv_unit: float = 0.0 min_sep_unit: float = 0.0 min_sep_in: float = 0.0 min_sep_out: float = 0.0
[docs] def load_polyline_from_export( export: SplineExport, field_name: str = "adaptivePolyline", drop_duplicate_terminal: bool = True, ) -> List[complex]: """ Load polyline from SplineExport, converting to complex. Parameters ---------- export : SplineExport The spline export data field_name : str Which field to use: 'adaptivePolyline', 'spline', or 'controlPoints' drop_duplicate_terminal : bool If True, remove trailing points that duplicate the first point Returns ------- List[complex] The polyline as complex numbers """ if field_name == "adaptivePolyline": pts = export.adaptivePolyline elif field_name == "spline": pts = export.spline else: pts = export.controlPoints # Fall back if requested field is empty if not pts: pts = export.adaptivePolyline or export.spline or export.controlPoints poly = [complex(p.x, p.y) for p in pts] if drop_duplicate_terminal: while len(poly) >= 2 and abs(poly[-1] - poly[0]) < 1e-12: poly.pop() return poly
[docs] def estimate_diameter(poly: List[complex]) -> float: """Estimate curve diameter as max pairwise distance.""" n = len(poly) dmax = 0.0 for i in range(n): for j in range(i + 1, n): d = abs(poly[i] - poly[j]) if d > dmax: dmax = d return dmax
[docs] def resample_closed_polyline(poly: List[complex], m: int) -> np.ndarray: """ Resample a closed polyline by arc length to m points. Returns array of m complex points, uniformly spaced by arc length. """ n = len(poly) if n == 0: return np.array([], dtype=complex) # Compute cumulative arc length cum = [0.0] total = 0.0 for i in range(n): a = poly[i] b = poly[(i + 1) % n] seg = abs(b - a) total += seg cum.append(total) if total < 1e-14: return np.full(m, poly[0], dtype=complex) # Resample result = np.zeros(m, dtype=complex) for j in range(m): s = total * j / m # Binary search for segment lo, hi = 0, n while lo < hi: mid = (lo + hi) // 2 if cum[mid] < s: lo = mid + 1 else: hi = mid i = max(0, lo - 1) # Interpolate within segment seg_len = cum[i + 1] - cum[i] if seg_len > 1e-14: t = (s - cum[i]) / seg_len else: t = 0.0 a = poly[i] b = poly[(i + 1) % n] result[j] = a + t * (b - a) return result
[docs] def build_laurent_matrix(thetas: np.ndarray, N: int) -> np.ndarray: """ Build the design matrix for Laurent fitting. For m sample points and degree N, produces m x (1 + 2N) complex matrix. Column 0: constant term (1) Columns 1..N: positive powers ζ^1, ..., ζ^N Columns N+1..2N: negative powers ζ^{-1}, ..., ζ^{-N} """ m = len(thetas) A = np.zeros((m, 1 + 2 * N), dtype=complex) zeta = np.exp(1j * thetas) A[:, 0] = 1.0 # Positive powers p = zeta.copy() for k in range(N): A[:, 1 + k] = p p = p * zeta # Negative powers q = 1.0 / zeta p = q.copy() for k in range(N): A[:, 1 + N + k] = p p = p * q return A
[docs] def solve_tikhonov( A: np.ndarray, y: np.ndarray, N: int, lam: float, ) -> np.ndarray: """ Solve regularized least squares with Tikhonov penalty. Penalty weights: k^2 on coefficients a_k, b_k (k=1..N), no penalty on a0. Uses real-valued formulation: stack real and imaginary parts. """ m, n = A.shape # n = 1 + 2*N # Build penalty weights: w[k] = sqrt(lambda) * k for k >= 1 w = np.zeros(n) for k in range(1, N + 1): w[k] = np.sqrt(lam) * k # a_k w[N + k] = np.sqrt(lam) * k # b_k # Convert to real system: [Re(A); Im(A)] @ x = [Re(y); Im(y)] A_real = np.vstack([A.real, A.imag]) y_real = np.hstack([y.real, y.imag]) # Add regularization rows reg_rows = np.diag(w) A_aug = np.vstack([A_real, reg_rows]) y_aug = np.hstack([y_real, np.zeros(n)]) # Solve least squares c, _, _, _ = np.linalg.lstsq(A_aug, y_aug, rcond=None) return c
[docs] def check_polyline_simple(poly: np.ndarray, eps: float) -> bool: """ Check if a closed polyline is simple (non-self-intersecting). Uses segment-segment intersection test with tolerance. """ n = len(poly) if n < 4: return True def segments_intersect(p1, p2, p3, p4, eps): """Check if segment p1-p2 intersects p3-p4.""" d1 = p2 - p1 d2 = p4 - p3 d3 = p3 - p1 cross = d1.real * d2.imag - d1.imag * d2.real if abs(cross) < eps: return False # Parallel t = (d3.real * d2.imag - d3.imag * d2.real) / cross s = (d3.real * d1.imag - d3.imag * d1.real) / cross return eps < t < 1 - eps and eps < s < 1 - eps # Check all non-adjacent segment pairs for i in range(n): p1, p2 = poly[i], poly[(i + 1) % n] for j in range(i + 2, n): if i == 0 and j == n - 1: continue # Adjacent (wrapping) p3, p4 = poly[j], poly[(j + 1) % n] if segments_intersect(p1, p2, p3, p4, eps): return False return True
[docs] def check_laurent_map( lmap: LaurentMapResult, rho_in: float, rho_out: float, theta_grid: int, min_abs_deriv: float, min_sep: float, poly_eps: float, ) -> Dict[str, Any]: """ Check Laurent map quality on the annulus. Returns dict with: - simple_on_unit_circle: bool - min_abs_deriv_unit: float - min_sep_unit, min_sep_in, min_sep_out: float - ok: bool """ thetas = np.linspace(0, 2 * np.pi, theta_grid, endpoint=False) # Sample on unit circle zeta_unit = np.exp(1j * thetas) z_unit = lmap.eval_array(zeta_unit) dz_unit = lmap.deriv_array(zeta_unit) # Check simplicity simple = check_polyline_simple(z_unit, poly_eps) # Min derivative magnitude on unit circle min_deriv = np.min(np.abs(dz_unit)) # Min separation on each circle def min_separation(pts): n = len(pts) min_sep = float('inf') for i in range(n): d = abs(pts[(i + 1) % n] - pts[i]) if d < min_sep: min_sep = d return min_sep sep_unit = min_separation(z_unit) # Inner circle zeta_in = rho_in * np.exp(1j * thetas) z_in = lmap.eval_array(zeta_in) sep_in = min_separation(z_in) # Outer circle zeta_out = rho_out * np.exp(1j * thetas) z_out = lmap.eval_array(zeta_out) sep_out = min_separation(z_out) ok = ( simple and min_deriv >= min_abs_deriv and sep_unit >= min_sep and sep_in >= min_sep and sep_out >= min_sep ) return { "ok": ok, "simple_on_unit_circle": simple, "min_abs_deriv_unit": min_deriv, "min_sep_unit": sep_unit, "min_sep_in": sep_in, "min_sep_out": sep_out, }
[docs] def reparam_closest_theta( lmap: LaurentMapResult, targets: np.ndarray, theta_grid: int, refine_iters: int, ) -> np.ndarray: """ Reparameterize by finding closest theta for each target point. For each target z, find θ minimizing |z(e^{iθ}) - z|. """ m = len(targets) thetas = np.zeros(m) # Coarse grid search grid = np.linspace(0, 2 * np.pi, theta_grid, endpoint=False) zeta_grid = np.exp(1j * grid) z_grid = lmap.eval_array(zeta_grid) for j in range(m): target = targets[j] # Find closest on grid dists = np.abs(z_grid - target) best_idx = np.argmin(dists) best_theta = grid[best_idx] # Local refinement (ternary-like search) delta = 2 * np.pi / theta_grid for _ in range(refine_iters): delta /= 3 candidates = [best_theta - delta, best_theta, best_theta + delta] best_dist = float('inf') for th in candidates: z = lmap.eval(np.exp(1j * th)) d = abs(z - target) if d < best_dist: best_dist = d best_theta = th thetas[j] = best_theta % (2 * np.pi) return thetas
[docs] def fit_laurent_map( export: SplineExport, cfg: Optional[LaurentFitConfig] = None, ) -> FitResult: """ Fit a Laurent map to a SplineExport curve. This is the main entry point for Stage 3. Parameters ---------- export : SplineExport The input curve data cfg : LaurentFitConfig, optional Configuration parameters Returns ------- FitResult The fitting result including the Laurent map if successful """ if cfg is None: cfg = LaurentFitConfig() # Load and prepare polyline poly = load_polyline_from_export(export) if len(poly) < 3: return FitResult(ok=False, failure_reason="Polyline too short") diameter = estimate_diameter(poly) if diameter < 1e-14: return FitResult(ok=False, failure_reason="Degenerate curve (zero diameter)") # Derive tolerances from diameter fit_tol_max = cfg.fit_tol_max_factor * diameter min_abs_deriv = cfg.min_abs_deriv_factor * diameter min_sep = cfg.min_sep_factor * diameter poly_eps = cfg.poly_eps_factor * diameter # Resample by arc length p = resample_closed_polyline(poly, cfg.m_samples) # Lambda scaling lam = cfg.lambda_init * diameter * diameter best_result: Optional[FitResult] = None for reg_try in range(cfg.max_lambda_tries): for N in range(cfg.N_min, cfg.N_max + 1): # Initialize uniform thetas thetas = np.linspace(0, 2 * np.pi, cfg.m_samples, endpoint=False) # Reparameterization iterations lmap = None for _ in range(cfg.reparam_iters): # Build matrix and solve A = build_laurent_matrix(thetas, N) coeffs = solve_tikhonov(A, p, N, lam) # Extract coefficients a0 = coeffs[0] a = coeffs[1:N + 1] b = coeffs[N + 1:2 * N + 1] lmap = LaurentMapResult(N=N, a0=a0, a=a, b=b) # Update thetas by closest point thetas = reparam_closest_theta( lmap, p, cfg.theta_search_grid, cfg.theta_refine_iters ) # Final solve with updated thetas A = build_laurent_matrix(thetas, N) coeffs = solve_tikhonov(A, p, N, lam) a0 = coeffs[0] a = coeffs[1:N + 1] b = coeffs[N + 1:2 * N + 1] lmap = LaurentMapResult(N=N, a0=a0, a=a, b=b) # Compute fit errors zeta_samples = np.exp(1j * thetas) z_fit = lmap.eval_array(zeta_samples) errors = np.abs(z_fit - p) max_err = np.max(errors) rms_err = np.sqrt(np.mean(errors ** 2)) # Check map quality checks = check_laurent_map( lmap, cfg.rho_in, cfg.rho_out, cfg.check_theta_grid, min_abs_deriv, min_sep, poly_eps ) if checks["ok"]: result = FitResult( ok=True, curve_scale=diameter, polyline_used=list(poly), laurent_map=lmap, fit_max_err=max_err, fit_rms_err=rms_err, simple_on_unit_circle=checks["simple_on_unit_circle"], min_abs_deriv_unit=checks["min_abs_deriv_unit"], min_sep_unit=checks["min_sep_unit"], min_sep_in=checks["min_sep_in"], min_sep_out=checks["min_sep_out"], ) # If fit is good enough and we prefer smallest N, return immediately if max_err <= fit_tol_max and cfg.prefer_smallest_N: return result # Otherwise, record as best if better than previous if best_result is None or max_err < best_result.fit_max_err: best_result = result # If we found any valid result, return the best if best_result is not None: return best_result # Increase regularization and retry lam *= cfg.lambda_increase_factor # All attempts failed return FitResult(ok=False, failure_reason="Failed to fit after max regularization tries")