Package Architecture
This document describes the overall architecture and design patterns used in the heterodyne package.
Core Architecture
The package follows a layered architecture:
User Interface Layer
├── CLI Scripts (run_heterodyne.py)
└── Python API (HeterodyneAnalysisCore)
│
Analysis Layer
├── Configuration Management
├── Data Loading & Validation
└── Result Generation
│
Optimization Layer
├── Classical Optimization (Nelder-Mead, Iterative Gurobi Trust Region)
├── Robust Optimization (Wasserstein DRO, Scenario-based, Ellipsoidal)
│
Model Layer
├── Physical Models
└── Mathematical Functions
│
Utility Layer
├── Data I/O
├── Plotting
└── Error Handling
Key Components
1. Configuration System
class ConfigManager:
"""Centralized configuration management"""
def __init__(self, config_path: str)
def validate(self) -> bool
def get_analysis_settings(self) -> dict
2. Analysis Core
class HeterodyneAnalysisCore:
"""Main analysis orchestrator"""
def __init__(self, config: ConfigManager)
def load_experimental_data(self)
def optimize_classical(self) -> OptimizeResult
def optimize_robust(self) -> dict
def optimize_all(self) -> dict
3. Model System
# Functional interface for models
def heterodyne_model(tau, params, q, phi)
4. Optimization Backends
class ClassicalOptimizer:
"""SciPy and Gurobi-based optimization"""
class RobustHeterodyneOptimizer:
"""CVXPY-based robust optimization"""
5. Gurobi Trust Region Implementation
The Gurobi optimization uses an iterative trust region approach for enhanced convergence:
def _run_gurobi_optimization(self, objective_func, initial_parameters):
"""
Iterative trust region SQP optimization:
1. Build quadratic approximation around current point
2. Solve QP subproblem with trust region constraints
3. Evaluate actual objective and update trust region
4. Iterate until convergence
"""
x_current = initial_parameters.copy()
trust_radius = 0.1 # Initial trust region
for iteration in range(max_iterations):
# Estimate gradient and diagonal Hessian
grad = self._compute_gradient(objective_func, x_current)
hessian_diag = self._compute_hessian_diagonal(objective_func, x_current)
# Solve trust region QP subproblem with Gurobi
step = self._solve_trust_region_qp(grad, hessian_diag, trust_radius)
# Evaluate and accept/reject step
x_new = x_current + step
if objective_func(x_new) < objective_func(x_current):
x_current = x_new # Accept step
trust_radius = min(1.0, 2 * trust_radius) # Expand region
else:
trust_radius = max(1e-8, 0.5 * trust_radius) # Shrink region
Design Patterns
1. Strategy Pattern - Optimization Methods
Different optimization strategies are encapsulated:
class OptimizationStrategy(ABC):
@abstractmethod
def optimize(self, objective_func, initial_params):
pass
class NelderMeadStrategy(OptimizationStrategy):
def optimize(self, objective_func, initial_params):
return minimize(objective_func, initial_params, method='Nelder-Mead')
def optimize(self, objective_func, initial_params):
2. Factory Pattern - Model Creation
Models are created based on configuration:
class ModelFactory:
@staticmethod
def create_model(analysis_mode: str):
if analysis_mode == "heterodyne":
return HeterodyneModel()
else:
raise ValueError(f"Unknown mode: {analysis_mode}")
3. Observer Pattern - Progress Tracking
class ProgressObserver:
def update(self, stage: str, progress: float):
pass
class ConsoleProgressObserver(ProgressObserver):
def update(self, stage: str, progress: float):
print(f"{stage}: {progress:.1%}")
4. Command Pattern - Analysis Pipeline
class AnalysisCommand(ABC):
@abstractmethod
def execute(self):
pass
class LoadDataCommand(AnalysisCommand):
def execute(self):
# Load experimental data
class OptimizeCommand(AnalysisCommand):
def execute(self):
# Run optimization
Data Flow
Configuration File
│
▼
ConfigManager ────────────► Validation
│
▼
HeterodyneAnalysisCore ─────► Data Loading
│
▼
Model Selection ──────────► Parameter Setup
│
▼
│
▼
Results Processing ───────► Output Generation
Error Handling Strategy
Hierarchical Error Classes:
class HeterodyneError(Exception):
"""Base exception for all heterodyne errors"""
class ConfigurationError(HeterodyneError):
"""Configuration-related errors"""
class DataFormatError(HeterodyneError):
"""Data format and loading errors"""
class OptimizationError(HeterodyneError):
"""Optimization convergence errors"""
Error Recovery:
def robust_optimization(self):
"""Optimization with fallback strategies"""
try:
return self.primary_optimization()
except OptimizationError:
logger.warning("Primary optimization failed, trying fallback")
return self.fallback_optimization()
Performance Architecture
1. Lazy Loading
Data and computations are loaded only when needed:
class LazyDataLoader:
def __init__(self, file_path):
self.file_path = file_path
self._data = None
@property
def data(self):
if self._data is None:
self._data = load_data_file(self.file_path)
return self._data
2. Caching Strategy
Expensive computations are cached:
from functools import lru_cache
@lru_cache(maxsize=128)
def compute_model_expensive(tau_tuple, params_tuple, q):
# Expensive model computation
pass
3. Parallel Processing
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
# Parallel data processing
num_workers = multiprocessing.cpu_count()
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = executor.map(process_angle_data, angle_chunks)
# Parallel optimization runs
with ProcessPoolExecutor(max_workers=4) as executor:
optimization_results = executor.map(
run_optimization,
parameter_sets
)
Plugin Architecture
The package supports extensions through plugins:
class ModelPlugin(ABC):
@abstractmethod
def get_model_name(self) -> str:
pass
@abstractmethod
def compute_correlation(self, tau, params, q, phi=None):
pass
class CustomFlowModel(ModelPlugin):
def get_model_name(self) -> str:
return "custom_flow"
def compute_correlation(self, tau, params, q, phi=None):
# Custom model implementation
pass
Testing Architecture
Test Organization:
tests/
├── unit/ # Unit tests for individual components
│ ├── test_config.py
│ ├── test_models.py
│ └── test_optimization.py
├── integration/ # Integration tests
│ ├── test_full_workflow.py
└── fixtures/ # Test data and fixtures
├── sample_config.json
└── test_data.h5
Test Fixtures:
@pytest.fixture
def sample_config():
return {
"analysis_settings": {
"static_mode": True,
"static_submode": "isotropic"
},
"initial_parameters": {
"values": [1000, -0.5, 100]
}
}
@pytest.fixture
def synthetic_data():
tau = np.logspace(-6, 1, 100)
g1 = np.exp(-tau**0.8)
return tau, g1
Memory Management
Large Dataset Handling:
class ChunkedDataProcessor:
def __init__(self, chunk_size: int = 1000):
self.chunk_size = chunk_size
def process_large_dataset(self, data):
for chunk in self.chunk_data(data):
yield self.process_chunk(chunk)
def chunk_data(self, data):
for i in range(0, len(data), self.chunk_size):
yield data[i:i + self.chunk_size]
Memory Monitoring:
import psutil
def monitor_memory_usage(func):
def wrapper(*args, **kwargs):
initial_memory = psutil.Process().memory_info().rss / 1024**2
result = func(*args, **kwargs)
final_memory = psutil.Process().memory_info().rss / 1024**2
print(f"Memory usage: {final_memory - initial_memory:.1f} MB")
return result
return wrapper
Future Architecture Considerations
Distributed Computing: Support for multi-node cluster computing
Advanced CPU Optimization: Further vectorization and JIT improvements
Streaming Data: Real-time analysis capabilities
Cloud Integration: Cloud storage and computing support
Web Interface: Browser-based analysis frontend