Logging and Progress Tracking

The logging and progress modules provide infrastructure for tracking pipeline execution, reporting progress, and structured logging.

Pipeline Logger

class analytic_continuation.PipelineLogger[source]

Bases: object

Logger for the analytic continuation pipeline.

Provides: - Structured logging to console/file - SQLite persistence for session recovery - Progress tracking for UI updates

Parameters:
static __new__(cls, *args, **kwargs)[source]
__init__(db_path=None, log_level=20, log_file=None)[source]
Parameters:
register_progress_callback(callback)[source]

Register a callback for progress updates.

Parameters:

callback (callable)

unregister_progress_callback(callback)[source]

Unregister a progress callback.

Parameters:

callback (callable)

static compute_input_hash(expression=None, curve_data=None, zeros=None, poles=None)[source]

Compute a hash of the input parameters for session matching.

This allows finding previous sessions with identical inputs for potential resumption.

Parameters:
Return type:

str

find_resumable_session(expression=None, curve_data=None, zeros=None, poles=None)[source]

Find a previous session with matching inputs that can be resumed.

Returns session info including what stages were completed, or None if no matching session exists.

Parameters:
Return type:

Optional[Dict[str, Any]]

start_session(expression=None, curve_data=None, zeros=None, poles=None)[source]

Start a new pipeline session.

Parameters:
Return type:

str

update_session_stage(stage, session_id=None)[source]

Update the last completed stage for a session.

Parameters:
get_session(session_id)[source]

Retrieve a session by ID.

Parameters:

session_id (str)

Return type:

Optional[PipelineSession]

list_sessions(limit=20)[source]

List recent sessions.

Parameters:

limit (int)

Return type:

List[Dict[str, Any]]

start_task(task_id, name, session_id=None)[source]

Start tracking a new task.

Parameters:
Return type:

TaskProgress

update_task(task_id, progress=None, message=None, metadata=None, session_id=None)[source]

Update task progress.

Parameters:
complete_task(task_id, success=True, error=None, metadata=None, session_id=None)[source]

Mark a task as completed or failed.

Parameters:
cache_computation(cache_key, stage, data, session_id=None)[source]

Cache intermediate computation results for recovery.

Parameters:
get_cached_computation(cache_key, session_id=None)[source]

Retrieve cached computation.

Parameters:
Return type:

Optional[Dict[str, Any]]

end_session(success=True, result=None, error=None, session_id=None)[source]

End the current session.

Parameters:
get_current_progress()[source]

Get current session progress for UI display.

Return type:

Optional[Dict[str, Any]]

debug(msg, **kwargs)[source]
Parameters:

msg (str)

info(msg, **kwargs)[source]
Parameters:

msg (str)

warning(msg, **kwargs)[source]
Parameters:

msg (str)

error(msg, **kwargs)[source]
Parameters:

msg (str)

The pipeline logger provides structured logging for each stage:

from analytic_continuation import PipelineLogger, get_logger

# Get a logger instance
logger = get_logger("my_pipeline")

# Log stage start
logger.stage_start("stage_3_laurent_fit")

# Log progress
logger.progress("Fitting with N=16", progress=0.5)

# Log stage completion
logger.stage_complete("stage_3_laurent_fit", success=True)
analytic_continuation.get_logger()[source]

Get the global pipeline logger instance.

Return type:

PipelineLogger

Task Status

class analytic_continuation.TaskStatus[source]

Bases: str, Enum

Status of a pipeline task.

PENDING = 'pending'
IN_PROGRESS = 'in_progress'
COMPLETED = 'completed'
FAILED = 'failed'
SKIPPED = 'skipped'
__new__(value)
class analytic_continuation.TaskProgress[source]

Bases: object

Progress information for a single task.

Parameters:
task_id: str
name: str
status: TaskStatus = 'pending'
progress: float = 0.0
message: str = ''
started_at: str | None = None
completed_at: str | None = None
error: str | None = None
metadata: Dict[str, Any]
to_dict()[source]
Return type:

Dict[str, Any]

__init__(task_id, name, status=TaskStatus.PENDING, progress=0.0, message='', started_at=None, completed_at=None, error=None, metadata=<factory>)
Parameters:
Return type:

None

Track individual task progress:

from analytic_continuation import TaskStatus, TaskProgress

# Create task progress
task = TaskProgress(
    name="Laurent Fitting",
    status=TaskStatus.RUNNING,
    progress=0.0,
    message="Starting...",
)

# Update progress
task.progress = 0.75
task.message = "Checking quality metrics"

# Mark complete
task.status = TaskStatus.COMPLETED
task.progress = 1.0

Pipeline Session

class analytic_continuation.PipelineSession[source]

Bases: object

Represents a complete pipeline execution session.

Parameters:
session_id: str
created_at: str
expression: str | None = None
curve_data: Dict[str, Any] | None = None
zeros: List[Dict[str, float]]
poles: List[Dict[str, float]]
tasks: List[TaskProgress]
result: Dict[str, Any] | None = None
status: TaskStatus = 'pending'
to_dict()[source]
Return type:

Dict[str, Any]

__init__(session_id, created_at, expression=None, curve_data=None, zeros=<factory>, poles=<factory>, tasks=<factory>, result=None, status=TaskStatus.PENDING)
Parameters:
Return type:

None

Manages a complete pipeline execution session:

from analytic_continuation import PipelineSession

# Create a new session
session = PipelineSession()

# Start a stage
session.start_stage("preprocessing")

# Record metrics
session.record_metric("curve_points", 500)
session.record_metric("estimated_complexity", "medium")

# Complete stage
session.complete_stage("preprocessing", success=True)

# Get session summary
summary = session.get_summary()

Progress Tracker

class analytic_continuation.ProgressTracker[source]

Bases: object

Tracks progress through the analytic continuation pipeline.

Provides: - Stage-by-stage progress tracking - Real-time updates via callbacks or SSE - Checklist-style UI output

Parameters:

session_id (Optional[str])

__init__(session_id=None)[source]
Parameters:

session_id (Optional[str])

stages: Dict[str, StageInfo]
async subscribe()[source]

Subscribe to progress updates via SSE.

Return type:

AsyncGenerator[str, None]

get_state()[source]

Get current progress state for UI.

Return type:

Dict[str, Any]

async start_stage(stage_id, substeps_total=0, message='')[source]

Start a pipeline stage.

Parameters:
  • stage_id (str)

  • substeps_total (int)

  • message (str)

async update_stage(stage_id, progress=None, message=None, substeps_done=None)[source]

Update stage progress.

Parameters:
async complete_stage(stage_id, success=True, error=None, message=None)[source]

Complete a pipeline stage.

Parameters:
async skip_stage(stage_id, reason='')[source]

Skip a stage (e.g., when using cached results).

Parameters:
sync_start_stage(stage_id, substeps_total=0, message='')[source]

Synchronous version of start_stage for non-async contexts.

Parameters:
  • stage_id (str)

  • substeps_total (int)

  • message (str)

sync_update_stage(stage_id, progress=None, message=None, substeps_done=None)[source]

Synchronous version of update_stage.

Parameters:
sync_complete_stage(stage_id, success=True, error=None, message=None)[source]

Synchronous version of complete_stage.

Parameters:

High-level progress tracking across pipeline stages:

from analytic_continuation import ProgressTracker, PIPELINE_STAGES

# Create tracker
tracker = ProgressTracker()

# Track each stage
for stage in PIPELINE_STAGES:
    tracker.start_stage(stage.name)
    # ... do work ...
    tracker.complete_stage(stage.name)

# Get overall progress
print(f"Overall progress: {tracker.overall_progress * 100:.1f}%")

Stage Information

class analytic_continuation.StageInfo[source]

Bases: object

Information about a pipeline stage.

Parameters:
id: str
name: str
description: str
status: TaskStatus = 'pending'
progress: float = 0.0
message: str = ''
substeps_total: int = 0
substeps_done: int = 0
started_at: str | None = None
completed_at: str | None = None
error: str | None = None
to_dict()[source]
Return type:

Dict[str, Any]

__init__(id, name, description, status=TaskStatus.PENDING, progress=0.0, message='', substeps_total=0, substeps_done=0, started_at=None, completed_at=None, error=None)
Parameters:
Return type:

None

analytic_continuation.PIPELINE_STAGES = [('precheck', 'Pre-Check Contour', 'Quick validation of curve topology'), ('validate_input', 'Validate Input', 'Checking curve and function validity'), ('load_curve', 'Load Curve', 'Loading and preprocessing Jordan curve'), ('fit_laurent', 'Fit Laurent Map', 'Fitting Laurent series to curve'), ('analyze_complexity', 'Analyze Complexity', 'Computing Cesàro/Whewell forms for cost estimation'), ('check_holomorphic', 'Check Holomorphic', 'Verifying function is holomorphic on annulus'), ('prepare_render', 'Prepare Render', 'Extracting coefficients for rendering'), ('render', 'Render', 'Generating domain coloring visualization')]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Predefined pipeline stages:

from analytic_continuation import PIPELINE_STAGES

for stage in PIPELINE_STAGES:
    print(f"Stage {stage.index}: {stage.name}")
    print(f"  Description: {stage.description}")
    print(f"  Weight: {stage.weight}")

CLI Progress Formatting

analytic_continuation.format_cli_progress(tracker)[source]

Format progress as CLI-style checklist output.

Example output: ┌─────────────────────────────────────────────────┐ │ Analytic Continuation Pipeline │ ├─────────────────────────────────────────────────┤ │ ✓ Validate Input [████████████] 100% │ │ ✓ Load Curve [████████████] 100% │ │ ● Fit Laurent Map [████████░░░░] 67% │ │ Fitting N=24… │ │ ○ Check Holomorphic [░░░░░░░░░░░░] 0% │ │ ○ Prepare Render [░░░░░░░░░░░░] 0% │ │ ○ Render [░░░░░░░░░░░░] 0% │ └─────────────────────────────────────────────────┘

Parameters:

tracker (ProgressTracker)

Return type:

str

Format progress for command-line display:

from analytic_continuation import format_cli_progress, ProgressTracker

tracker = ProgressTracker()
tracker.start_stage("stage_3_laurent_fit")
tracker.update_progress(0.5)

# Get formatted output
output = format_cli_progress(tracker)
print(output)

# Output example:
# [===========         ] 55% Stage 3: Laurent Map Fitting

Integration Example

Complete example integrating logging and progress:

from analytic_continuation import (
    PipelineLogger,
    ProgressTracker,
    PipelineSession,
    PIPELINE_STAGES,
    format_cli_progress,
    fit_laurent_map,
    SplineExport,
)

# Set up infrastructure
logger = PipelineLogger("analytic_continuation")
tracker = ProgressTracker()
session = PipelineSession()

def run_pipeline(export: SplineExport):
    # Stage 3: Laurent Fitting
    stage = PIPELINE_STAGES[2]  # Stage 3
    tracker.start_stage(stage.name)
    session.start_stage(stage.name)
    logger.stage_start(stage.name)

    try:
        result = fit_laurent_map(export)

        if result.ok:
            session.record_metric("laurent_N", result.laurent_map.N)
            session.record_metric("fit_max_err", result.fit_max_err)
            tracker.complete_stage(stage.name)
            session.complete_stage(stage.name, success=True)
            logger.stage_complete(stage.name, success=True)
        else:
            tracker.fail_stage(stage.name, result.failure_reason)
            session.complete_stage(stage.name, success=False)
            logger.stage_complete(stage.name, success=False)

    except Exception as e:
        logger.error(f"Stage failed with exception: {e}")
        tracker.fail_stage(stage.name, str(e))
        session.complete_stage(stage.name, success=False)
        raise

    # Print progress
    print(format_cli_progress(tracker))

    return result

Callback-based Progress

For long-running operations, use callbacks:

def progress_callback(stage: str, progress: float, message: str):
    tracker.update_progress(progress)
    print(format_cli_progress(tracker), end='\r')

# Pass callback to fitting function (if supported)
result = fit_laurent_map(export, progress_callback=progress_callback)