Source code for heterodyne.optimization.classical

"""
Classical Optimization Methods for Heterodyne Scattering Analysis
==================================================================

This module contains multiple classical optimization algorithms for
parameter estimation in heterodyne scattering analysis:

1. **Nelder-Mead Simplex**: Derivative-free optimization algorithm that
   works well for noisy objective functions and doesn't require gradient
   information, making it ideal for correlation function fitting.

2. **Gurobi Quadratic Programming**: Advanced optimization using quadratic
   approximation of the chi-squared objective function. Particularly effective
   for smooth problems with parameter bounds constraints. Requires Gurobi license.

Key Features:
- Consistent parameter bounds across all optimization methods
- Automatic Gurobi detection and graceful fallback
- Optimized configurations for different analysis modes
- Comprehensive error handling and status reporting

Authors: Wei Chen, Hongrui He
Institution: Argonne National Laboratory
"""

import logging
import time
from typing import Any

# Use lazy loading for heavy dependencies
from ..core.lazy_imports import scientific_deps

# Lazy-loaded numpy and scipy
np = scientific_deps.get("numpy")
scipy_optimize = scientific_deps.get("scipy_optimize")

# Fallback import for scipy.optimize if lazy loading fails
if scipy_optimize is None or not hasattr(scipy_optimize, "OptimizeResult"):
    try:
        from scipy import optimize as scipy_optimize
    except ImportError:
        scipy_optimize = None

# Import shared optimization utilities
from ..core.optimization_utils import get_optimization_counter
from ..core.optimization_utils import reset_optimization_counter

try:
    import gurobipy as gp
    from gurobipy import GRB

    GUROBI_AVAILABLE = True
except ImportError:
    GUROBI_AVAILABLE = False
    # Type stubs for when Gurobi is not available
    gp = None  # type: ignore
    GRB = None  # type: ignore

# Import robust optimization with graceful degradation
try:
    from .robust import RobustHeterodyneOptimizer  # type: ignore
    from .robust import create_robust_optimizer

    ROBUST_OPTIMIZATION_AVAILABLE = True
except ImportError:
    RobustHeterodyneOptimizer = None  # type: ignore
    create_robust_optimizer = None  # type: ignore
    ROBUST_OPTIMIZATION_AVAILABLE = False

logger = logging.getLogger(__name__)


[docs] class ClassicalOptimizer: """ Classical optimization algorithms for parameter estimation. This class provides robust parameter estimation using multiple optimization algorithms: 1. Nelder-Mead simplex method: Well-suited for noisy objective functions and doesn't require derivative information. 2. Gurobi quadratic programming (optional): Uses quadratic approximation of the chi-squared objective function for potentially faster convergence on smooth problems with bounds constraints. Requires Gurobi license. The Gurobi method approximates the objective function using finite differences to estimate gradients and Hessian, then solves the resulting quadratic program. This approach can be particularly effective for least squares problems where the objective function is approximately quadratic near the optimum. Important: Both optimization methods use the same parameter bounds defined in the configuration's parameter_space.bounds section, ensuring consistency with robust optimization and maintaining the same physical constraints across all optimization methods. """
[docs] def __init__(self, analysis_core, config: dict[str, Any]): """ Initialize classical optimizer. Parameters ---------- analysis_core : HeterodyneAnalysisCore Core analysis engine instance config : dict[str, Any] Configuration dictionary """ self.core = analysis_core self.config = config self.best_params_classical = None # Extract optimization configuration self.optimization_config = config.get("optimization_config", {}).get( "classical_optimization", {} )
[docs] def run_optimization( self, initial_params: np.ndarray | None = None, phi_angles: np.ndarray | None = None, c2_experimental: np.ndarray | None = None, return_tuple: bool = False, **kwargs, ) -> tuple[np.ndarray | None, Any] | dict[str, Any]: """ Main optimization interface for CLI compatibility. This method provides a standard interface that delegates to the appropriate optimization method. Parameters ---------- initial_params : np.ndarray, optional Starting parameters for optimization phi_angles : np.ndarray, optional Array of phi angles c2_experimental : np.ndarray, optional Experimental correlation data return_tuple : bool, default=False If True, return (params, result). If False, return result dict. **kwargs Additional optimization parameters Returns ------- tuple | dict If return_tuple=True: (parameters, result_object) If return_tuple=False: result dictionary """ # Filter kwargs to only include supported parameters supported_kwargs = {} supported_params = {"methods", "bounds", "options", "objective_func"} for key, value in kwargs.items(): if key in supported_params: supported_kwargs[key] = value params, result = self.run_classical_optimization_optimized( initial_parameters=initial_params, phi_angles=phi_angles, c2_experimental=c2_experimental, **supported_kwargs, ) if return_tuple: return params, result # Convert to dict format for tests result_dict = { "success": getattr(result, "success", False), "parameters": ( params if params is not None else getattr(result, "x", None) ), "chi_squared": getattr(result, "fun", float("inf")), "initial_parameters": ( initial_params if initial_params is not None else kwargs.get("initial_parameters") ), } # Add all other attributes from result if hasattr(result, "__dict__"): for key, value in result.__dict__.items(): if key not in result_dict: result_dict[key] = value # Ensure initial_parameters is set if result_dict["initial_parameters"] is None: if ( "initial_parameters" in self.config and "values" in self.config["initial_parameters"] ): result_dict["initial_parameters"] = self.config["initial_parameters"][ "values" ] else: # Use 14-parameter heterodyne defaults result_dict["initial_parameters"] = [ 100.0, -0.5, 10.0, # D0_ref, alpha_ref, D_offset_ref 100.0, -0.5, 10.0, # D0_sample, alpha_sample, D_offset_sample 0.1, 0.0, 0.01, # v0, beta, v_offset 0.5, 0.0, 50.0, 0.3, # f0, f1, f2, f3 0.0, # phi0 ] return result_dict
[docs] def optimize( self, c2_experimental: np.ndarray | None = None, phi_angles: np.ndarray | None = None, t1_array: np.ndarray | None = None, t2_array: np.ndarray | None = None, **kwargs, ) -> dict[str, Any]: """ Backward compatibility wrapper for optimize() method. This method provides backward compatibility for tests that expect an optimize() method instead of run_optimization(). Parameters ---------- c2_experimental : np.ndarray, optional Experimental correlation data phi_angles : np.ndarray, optional Array of phi angles t1_array : np.ndarray, optional Array of t1 time values t2_array : np.ndarray, optional Array of t2 time values **kwargs Additional optimization parameters Returns ------- dict[str, Any] Optimization result dictionary with keys: - 'initial_parameters': Initial parameter values - 'parameters': Optimal parameter values - 'chi_squared': Final chi-squared value - 'success': Whether optimization succeeded - Additional fields from optimization result """ # Call run_optimization and request dict format result_dict = self.run_optimization( initial_params=kwargs.get("initial_params"), phi_angles=phi_angles, c2_experimental=c2_experimental, t1_array=t1_array, t2_array=t2_array, return_tuple=False, **kwargs, ) return result_dict
[docs] def run_classical_optimization_optimized( self, initial_parameters: np.ndarray | None = None, methods: list[str] | None = None, phi_angles: np.ndarray | None = None, c2_experimental: np.ndarray | None = None, ) -> tuple[np.ndarray | None, Any]: """ Run Nelder-Mead optimization method. This method uses the Nelder-Mead simplex algorithm for parameter estimation. Nelder-Mead is well-suited for noisy objective functions and doesn't require gradient information. Parameters ---------- initial_parameters : np.ndarray, optional Starting parameters for optimization methods : list, optional List of optimization methods to try phi_angles : np.ndarray, optional Scattering angles c2_experimental : np.ndarray, optional Experimental data Returns ------- tuple (best_parameters, optimization_result) Raises ------ RuntimeError If all classical methods fail """ logger.info("Starting classical optimization") start_time = time.time() print("\n═══ Classical Optimization ═══") # Determine analysis mode and effective parameter count if hasattr(self.core, "config_manager") and self.core.config_manager: # Check for deprecated static mode in config (will raise error if found) _ = self.core.config_manager.is_static_mode_enabled() analysis_mode = self.core.config_manager.get_analysis_mode() effective_param_count = ( self.core.config_manager.get_effective_parameter_count() ) else: # Fallback to heterodyne defaults analysis_mode = "heterodyne" effective_param_count = 14 print(f" Analysis mode: {analysis_mode} ({effective_param_count} parameters)") logger.info( f"Classical optimization using {analysis_mode} mode with {effective_param_count} parameters" ) # Load defaults if not provided if methods is None: available_methods = ["Nelder-Mead"] if GUROBI_AVAILABLE: available_methods.append("Gurobi") methods = self.optimization_config.get( "methods", available_methods, ) # Ensure methods is not None for type checker assert methods is not None, "Optimization methods list cannot be None" if initial_parameters is None: # Try to get initial parameters from config, with fallback if ( "initial_parameters" in self.config and "values" in self.config["initial_parameters"] ): initial_parameters = np.array( self.config["initial_parameters"]["values"], dtype=np.float64 ) else: # Create default initial parameters for 14-parameter heterodyne model logger.warning( "No initial parameters in config, using 14-parameter heterodyne defaults" ) initial_parameters = np.array( [ 100.0, -0.5, 10.0, # D0_ref, alpha_ref, D_offset_ref 100.0, -0.5, 10.0, # D0_sample, alpha_sample, D_offset_sample 0.1, 0.0, 0.01, # v0, beta, v_offset 0.5, 0.0, 50.0, 0.3, # f0, f1, f2, f3 0.0, # phi0 ], dtype=np.float64, ) # Validate parameter count - only 14-parameter heterodyne mode supported if len(initial_parameters) != 14: raise ValueError( f"Invalid parameter count: {len(initial_parameters)}. " "Only 14-parameter heterodyne mode is supported. " "Parameters: [D0_ref, alpha_ref, D_offset_ref, " "D0_sample, alpha_sample, D_offset_sample, " "v0, beta, v_offset, f0, f1, f2, f3, phi0]" ) if phi_angles is None or c2_experimental is None: c2_experimental, _, phi_angles, _ = self.core.load_experimental_data() # Type assertion after loading data to satisfy type checker assert ( phi_angles is not None and c2_experimental is not None ), "Failed to load experimental data" # Apply subsampling if enabled (to speed up optimization for large datasets) phi_angles, c2_experimental, (angle_indices, time_indices) = ( self._subsample_data_for_memory(phi_angles, c2_experimental) ) # Update core's time_length to match subsampled data dimensions # This is critical for chi-squared calculation to work correctly original_time_length = self.core.time_length if c2_experimental.shape[1] != original_time_length: logger.info( f"Updating core.time_length from {original_time_length} to {c2_experimental.shape[1]} " f"to match subsampled data dimensions" ) self.core.time_length = c2_experimental.shape[1] # Update time_array to use the actual subsampled time points # CRITICAL: Must use time_indices, not dense array, to match experimental data times # Check if dt is a valid numeric value (not Mock or invalid) try: dt_value = float(self.core.dt) # Use the actual subsampled time indices (e.g., [0, 4, 8, 12, ...]) # NOT a dense array [0, 1, 2, 3, ...], which would cause time mismatch self.core.time_array = time_indices * dt_value except (TypeError, ValueError, AttributeError): # dt is Mock or invalid - use default value or skip update # For tests with Mock objects, time_array update is not critical logger.debug( f"Skipping time_array update - dt is not a valid numeric value: {type(self.core.dt)}" ) best_result = None best_params = None best_chi2 = np.inf best_method = None # Track which method produced the best result all_results = [] # Store all results for analysis # Create objective function using utility method objective = self.create_objective_function( phi_angles, c2_experimental, f"Classical-{analysis_mode.capitalize()}", ) # Try each method for method in methods: print(f" Trying {method}...") try: start = time.time() # Use single method utility success, result = self.run_single_method( method=method, objective_func=objective, initial_parameters=initial_parameters, bounds=None, # Nelder-Mead doesn't use bounds method_options=self.optimization_config.get( "method_options", {} ).get(method, {}), ) elapsed = time.time() - start # Store result for analysis if success and isinstance(result, scipy_optimize.OptimizeResult): # Add timing info to result object result.execution_time = elapsed all_results.append((method, result)) if result.fun < best_chi2: best_result = result best_params = result.x best_chi2 = result.fun best_method = method # Track which method produced this result print( f" ✓ New best: χ²_red = {result.fun:.6e} ({ elapsed:.1f}s)" ) else: print(f" χ²_red = {result.fun:.6e} ({elapsed:.1f}s)") else: # Store exception for analysis all_results.append((method, result)) print(f" ✗ Failed: {result}") logger.warning( f"Classical optimization method {method} failed: {result}" ) except Exception as e: all_results.append((method, e)) print(f" ✗ Failed: {e}") logger.warning(f"Classical optimization method {method} failed: {e}") logger.exception(f"Full traceback for {method} optimization failure:") if ( best_result is not None and best_params is not None and len(best_params) > 0 and isinstance(best_result, scipy_optimize.OptimizeResult) ): total_time = time.time() - start_time # best_method is already tracked when the best result was found if best_method is None: best_method = "Unknown" # Generate comprehensive summary (for future use) summary = self.get_optimization_summary( best_params, best_result, total_time, best_method ) summary["optimization_method"] = best_method summary["all_methods_tried"] = [method for method, _ in all_results] # Create method-specific results dictionary method_results = {} for method, result in all_results: if hasattr(result, "fun"): # Successful result method_results[method] = { "parameters": ( result.x.tolist() if hasattr(result, "x") else None ), "chi_squared": result.fun, "success": ( result.success if hasattr(result, "success") else True ), "iterations": getattr(result, "nit", None), "function_evaluations": getattr(result, "nfev", None), "message": getattr(result, "message", ""), "method": getattr(result, "method", method), } else: # Failed result (exception) method_results[method] = { "parameters": None, "chi_squared": float("inf"), "success": False, "error": str(result), } # Log results logger.info( f"Classical optimization completed in {total_time:.2f}s, best χ²_red = { best_chi2:.6e} (method: {best_method})" ) print(f" Best result: χ²_red = {best_chi2:.6e} (method: {best_method})") # Store best parameters self.best_params_classical = best_params # Log detailed analysis if debug logging is enabled if logger.isEnabledFor(logging.DEBUG): analysis = self.analyze_optimization_results( [ (method, True, result) for method, result in all_results if hasattr(result, "fun") ] ) logger.debug(f"Classical optimization analysis: {analysis}") # Restore original time_length before returning if c2_experimental.shape[1] != original_time_length: logger.info( f"Restoring core.time_length from {self.core.time_length} back to {original_time_length}" ) self.core.time_length = original_time_length # Restore time_array to full resolution try: dt_value = float(self.core.dt) self.core.time_array = np.arange(self.core.time_length) * dt_value except (TypeError, ValueError, AttributeError): # dt is Mock or invalid - skip time_array restoration logger.debug( f"Skipping time_array restoration - dt is not a valid numeric value: {type(self.core.dt)}" ) # Return enhanced result with method information enhanced_result = best_result enhanced_result.method_results = ( method_results # Add method-specific results ) enhanced_result.best_method = best_method # Add best method info return best_params, enhanced_result # If we reach here, no valid results were obtained total_time = time.time() - start_time # Restore original time_length before raising exception if c2_experimental.shape[1] != original_time_length: logger.info( f"Restoring core.time_length from {self.core.time_length} back to {original_time_length}" ) self.core.time_length = original_time_length # Restore time_array to full resolution try: dt_value = float(self.core.dt) self.core.time_array = np.arange(self.core.time_length) * dt_value except (TypeError, ValueError, AttributeError): # dt is Mock or invalid - skip time_array restoration logger.debug( f"Skipping time_array restoration - dt is not a valid numeric value: {type(self.core.dt)}" ) # Log detailed failure information logger.error( f"Classical optimization failed after {total_time:.2f}s - all methods failed" ) logger.error(f"Attempted methods: {[method for method, _ in all_results]}") logger.error( f"Best result status: best_result={best_result is not None}, " f"best_params={best_params is not None}, " f"best_params_len={len(best_params) if best_params is not None else 0}" ) # Analyze failed results failed_analysis = self.analyze_optimization_results( [(method, False, result) for method, result in all_results] ) logger.error(f"Failure analysis: {failed_analysis}") # Log individual method failures for method, result in all_results: if isinstance(result, Exception): logger.error(f" {method}: {type(result).__name__}: {result}") elif hasattr(result, "message"): logger.error(f" {method}: {result.message}") else: logger.error(f" {method}: {result}") raise RuntimeError( f"All classical methods failed to produce valid results. " f"Attempted methods: {[method for method, _ in all_results]}. " f"Check logs for detailed failure information." )
[docs] def get_available_methods(self) -> list[str]: """ Get list of available classical optimization methods. Returns ------- list[str] List containing available optimization methods """ methods = ["Nelder-Mead"] # Nelder-Mead simplex algorithm if GUROBI_AVAILABLE: methods.append("Gurobi") # Gurobi quadratic programming solver if ROBUST_OPTIMIZATION_AVAILABLE: methods.extend( ["Robust-Wasserstein", "Robust-Scenario", "Robust-Ellipsoidal"] ) return methods
[docs] def validate_method_compatibility( self, method: str, has_bounds: bool = True ) -> bool: """ Validate if optimization method is compatible with current setup. Parameters ---------- method : str Optimization method name has_bounds : bool Whether parameter bounds are defined (unused but kept for compatibility) Returns ------- bool True if method is supported """ # Note: has_bounds parameter is unused but kept for API compatibility _ = has_bounds # Explicitly mark as unused for type checker if method == "Nelder-Mead": return True if method == "Gurobi": return GUROBI_AVAILABLE return False
[docs] def get_method_recommendations(self) -> dict[str, list[str]]: """ Get method recommendations based on problem characteristics. Returns ------- dict[str, list[str]] Dictionary mapping scenarios to recommended methods """ recommendations = { "with_bounds": ( ["Gurobi", "Nelder-Mead"] if GUROBI_AVAILABLE else ["Nelder-Mead"] ), "without_bounds": ["Nelder-Mead"], "high_dimensional": ["Nelder-Mead"], "low_dimensional": ( ["Gurobi", "Nelder-Mead"] if GUROBI_AVAILABLE else ["Nelder-Mead"] ), # Excellent for noisy functions "noisy_objective": ["Nelder-Mead"], "smooth_objective": ( ["Gurobi", "Nelder-Mead"] if GUROBI_AVAILABLE else ["Nelder-Mead"] ), } return recommendations
[docs] def validate_parameters( self, parameters: np.ndarray, method_name: str = "" ) -> tuple[bool, str]: """ Validate physical parameters and bounds. Parameters ---------- parameters : np.ndarray Model parameters to validate method_name : str Name of optimization method for logging (currently unused) Returns ------- tuple[bool, str] (is_valid, reason_if_invalid) """ _ = method_name # Suppress unused parameter warning # Get validation configuration validation = ( self.config.get("advanced_settings", {}) .get("chi_squared_calculation", {}) .get("validity_check", {}) ) # Extract parameter sections num_diffusion_params = getattr(self.core, "num_diffusion_params", 3) num_shear_params = getattr(self.core, "num_shear_rate_params", 3) # Ensure these are integers, not Mock objects try: num_diffusion_params = int(num_diffusion_params) except (TypeError, ValueError): num_diffusion_params = 3 try: num_shear_params = int(num_shear_params) except (TypeError, ValueError): num_shear_params = 3 diffusion_params = parameters[:num_diffusion_params] shear_params = parameters[ num_diffusion_params : num_diffusion_params + num_shear_params ] # Check positive D0 (transport coefficient J₀, only if we have this parameter) if ( validation.get("check_positive_D0", True) and len(diffusion_params) > 0 and diffusion_params[0] <= 0 ): return False, f"Negative D0: {diffusion_params[0]}" # Check positive gamma_dot_t0 (only if we have shear parameters) if ( validation.get("check_positive_gamma_dot_t0", True) and len(shear_params) > 0 and shear_params[0] <= 0 ): return False, f"Negative gamma_dot_t0: {shear_params[0]}" # Check parameter bounds if validation.get("check_parameter_bounds", True): bounds = self.config.get("parameter_space", {}).get("bounds", []) for i, bound in enumerate(bounds): if i < len(parameters): param_val = parameters[i] param_min = bound.get("min", -np.inf) param_max = bound.get("max", np.inf) if not (param_min <= param_val <= param_max): param_name = bound.get("name", f"p{i}") return ( False, f"Parameter {param_name} out of bounds: {param_val} not in [{param_min}, {param_max}]", ) return True, ""
[docs] def create_objective_function( self, phi_angles: np.ndarray, c2_experimental: np.ndarray, method_name: str = "Classical", ): """ Create objective function for optimization. Parameters ---------- phi_angles : np.ndarray Scattering angles c2_experimental : np.ndarray Experimental correlation data method_name : str Name for logging purposes Returns ------- callable Objective function for optimization """ # Get angle filtering setting from configuration use_angle_filtering = True if hasattr(self.core, "config_manager") and self.core.config_manager: use_angle_filtering = self.core.config_manager.is_angle_filtering_enabled() elif "angle_filtering" in self.config.get("optimization_config", {}): use_angle_filtering = self.config["optimization_config"][ "angle_filtering" ].get("enabled", True) def objective(params): return self.core.calculate_chi_squared_optimized( params, phi_angles, c2_experimental, method_name, filter_angles_for_optimization=use_angle_filtering, ) return objective
def _subsample_data_for_memory( self, phi_angles: np.ndarray, c2_experimental: np.ndarray ) -> tuple[np.ndarray, np.ndarray, tuple[np.ndarray, np.ndarray]]: """ Subsample experimental data to reduce memory usage and speed up classical optimization. This method reduces the data size by subsampling angles and time points to keep the optimization problem manageable for large datasets (>1M points). Parameters ---------- phi_angles : np.ndarray Full array of angular positions c2_experimental : np.ndarray Full experimental correlation data (n_angles, n_times, n_times) Returns ------- tuple[np.ndarray, np.ndarray, tuple[np.ndarray, np.ndarray]] (subsampled_phi_angles, subsampled_c2_data, (angle_indices, time_indices)) """ # Get subsampling configuration from classical_optimization config subsampling_config = self.optimization_config.get("subsampling", {}) # Default values similar to robust optimization max_data_points = subsampling_config.get("max_data_points", 100000) time_subsample = subsampling_config.get("time_subsample_factor", 4) angle_subsample = subsampling_config.get("angle_subsample_factor", 2) enabled = subsampling_config.get("enabled", False) # Calculate current data size n_angles, n_times, _ = c2_experimental.shape current_size = n_angles * n_times * n_times logger.info( f"Classical optimization data size: {current_size:,} points ({n_angles} angles x {n_times}^2 times)" ) # If subsampling is disabled or data is already small enough, return as-is if not enabled or current_size <= max_data_points: if not enabled: logger.info("Subsampling disabled - using full dataset") else: logger.info("Data size acceptable - no subsampling needed") # Return full indices for no subsampling case angle_indices = np.arange(len(phi_angles)) time_indices = np.arange(n_times) return phi_angles, c2_experimental, (angle_indices, time_indices) # Don't subsample angles if there are fewer than 4 angles (preserve angular information) if n_angles < 4: angle_subsample = 1 logger.info( f"Skipping angle subsampling (only {n_angles} angles available - preserving all for angular analysis)" ) # Subsample angles (every angle_subsample_factor angles) angle_indices = np.arange(0, len(phi_angles), angle_subsample) subsampled_phi_angles = phi_angles[angle_indices] subsampled_c2_angles = c2_experimental[angle_indices] # Subsample time points (every time_subsample_factor points) time_indices = np.arange(0, n_times, time_subsample) subsampled_c2_data = subsampled_c2_angles[:, time_indices, :][ :, :, time_indices ] new_n_angles, new_n_times, _ = subsampled_c2_data.shape new_size = new_n_angles * new_n_times * new_n_times reduction_factor = current_size / new_size logger.info( f"Subsampled data size: {new_size:,} points ({new_n_angles} angles x {new_n_times}^2 times)" ) logger.info(f"Memory/computation reduction: {reduction_factor:.1f}x smaller") # If still too large, apply more aggressive subsampling if new_size > max_data_points: additional_time_factor = int(np.ceil(np.sqrt(new_size / max_data_points))) logger.warning( f"Data still too large, applying additional {additional_time_factor}x time subsampling" ) time_indices_2 = np.arange(0, new_n_times, additional_time_factor) subsampled_c2_data = subsampled_c2_data[:, time_indices_2, :][ :, :, time_indices_2 ] # Update time_indices to reflect the additional subsampling time_indices = time_indices[time_indices_2] final_n_angles, final_n_times, _ = subsampled_c2_data.shape final_size = final_n_angles * final_n_times * final_n_times final_reduction = current_size / final_size logger.info( f"Final data size: {final_size:,} points ({final_n_angles} angles x {final_n_times}^2 times)" ) logger.info( f"Total memory/computation reduction: {final_reduction:.1f}x smaller" ) return subsampled_phi_angles, subsampled_c2_data, (angle_indices, time_indices)
[docs] def run_single_method( self, method: str, objective_func, initial_parameters: np.ndarray, bounds: list[tuple[float, float]] | None = None, method_options: dict[str, Any] | None = None, ) -> tuple[bool, scipy_optimize.OptimizeResult | Exception]: """ Run a single optimization method. Parameters ---------- method : str Optimization method name objective_func : callable Objective function to minimize initial_parameters : np.ndarray Starting parameters bounds : list[tuple[float, float]], optional Parameter bounds method_options : dict[str, Any], optional Method-specific options Returns ------- tuple[bool, OptimizeResult | Exception] (success, result_or_exception) """ try: if method == "Gurobi": return self._run_gurobi_optimization( objective_func, initial_parameters, bounds, method_options ) if method.startswith("Robust-"): return self._run_robust_optimization( method, objective_func, initial_parameters, bounds, method_options, ) # Filter out comment fields (keys starting with '_' and ending # with '_note') filtered_options = {} if method_options: filtered_options = { k: v for k, v in method_options.items() if not (k.startswith("_") and k.endswith("_note")) } kwargs = { "fun": objective_func, "x0": initial_parameters, "method": method, "options": filtered_options, } # Nelder-Mead doesn't use explicit bounds # The method handles constraints through the objective function result = scipy_optimize.minimize(**kwargs) # Validate that we got a valid result with parameters if not hasattr(result, "x") or result.x is None or len(result.x) == 0: error_msg = ( f"Optimization method '{method}' returned empty or invalid parameter array. " f"Result status: success={getattr(result, 'success', 'N/A')}, " f"message={getattr(result, 'message', 'N/A')}, " f"nit={getattr(result, 'nit', 'N/A')}, " f"nfev={getattr(result, 'nfev', 'N/A')}" ) logger.error(error_msg) return False, ValueError(error_msg) return True, result except Exception as e: return False, e
def _initialize_gurobi_options( self, method_options: dict[str, Any] | float | None = None ) -> dict[str, Any]: """ Initialize Gurobi optimization options with defaults and user overrides. Parameters ---------- method_options : dict[str, Any] | float | None User-specified Gurobi options (dict) or tolerance value (float) Returns ------- dict[str, Any] Combined Gurobi options """ # Default Gurobi options with iterative settings gurobi_options = { "max_iterations": 50, # Outer iterations for SQP "tolerance": 1e-6, "output_flag": 0, # Suppress output by default "method": 2, # Use barrier method for QP "time_limit": 300, # 5 minute time limit "trust_region_initial": 0.1, # Initial trust region radius "trust_region_min": 1e-8, # Minimum trust region radius "trust_region_max": 1.0, # Maximum trust region radius } # Update with user options if method_options: if isinstance(method_options, dict): filtered_options = { k: v for k, v in method_options.items() if not (k.startswith("_") and k.endswith("_note")) } gurobi_options.update(filtered_options) elif isinstance(method_options, (int, float)): # If a scalar is passed, treat it as tolerance gurobi_options["tolerance"] = float(method_options) return gurobi_options def _estimate_gradient( self, objective_func, x_current: np.ndarray, base_epsilon: float, return_tuple: bool = False, ) -> np.ndarray | tuple[np.ndarray, int]: """ Estimate gradient using finite differences. Parameters ---------- objective_func : callable Objective function to differentiate x_current : np.ndarray Current parameter values base_epsilon : float Base epsilon for finite difference return_tuple : bool, optional If True, return (gradient, function_evaluations). If False, return just gradient. Default False. Returns ------- np.ndarray | tuple[np.ndarray, int] Just gradient array if return_tuple=False (default), otherwise (gradient, function_evaluations) """ n_params = len(x_current) grad = np.zeros(n_params) function_evaluations = 0 for i in range(n_params): epsilon = base_epsilon * max(1.0, abs(x_current[i])) x_plus = x_current.copy() x_plus[i] += epsilon x_minus = x_current.copy() x_minus[i] -= epsilon f_plus = objective_func(x_plus) f_minus = objective_func(x_minus) grad[i] = (f_plus - f_minus) / (2 * epsilon) function_evaluations += 2 # Sanitize NaN/Inf values that can cause Gurobi to fail if not np.isfinite(grad[i]): # Try one-sided difference as fallback f_current = objective_func(x_current) function_evaluations += 1 if np.isfinite(f_plus): grad[i] = (f_plus - f_current) / epsilon elif np.isfinite(f_minus): grad[i] = (f_current - f_minus) / epsilon else: # If all attempts fail, use zero gradient for this parameter grad[i] = 0.0 if return_tuple: return grad, function_evaluations return grad def _estimate_hessian_diagonal( self, objective_func, x_current: np.ndarray, f_current: float, base_epsilon: float, ) -> tuple[np.ndarray, int]: """ Estimate diagonal Hessian approximation (BFGS-like). Parameters ---------- objective_func : callable Objective function x_current : np.ndarray Current parameter values f_current : float Current function value base_epsilon : float Base epsilon for finite difference Returns ------- tuple[np.ndarray, int] (hessian_diagonal, function_evaluations) """ n_params = len(x_current) hessian_diag = np.ones(n_params) function_evaluations = 0 for i in range(n_params): epsilon = base_epsilon * max(1.0, abs(x_current[i])) x_plus = x_current.copy() x_plus[i] += epsilon x_minus = x_current.copy() x_minus[i] -= epsilon f_plus = objective_func(x_plus) f_minus = objective_func(x_minus) second_deriv = (f_plus - 2 * f_current + f_minus) / (epsilon**2) # Sanitize NaN/Inf values and ensure positive diagonal if np.isfinite(second_deriv): hessian_diag[i] = max(1e-6, second_deriv) else: # Use default value if second derivative is invalid hessian_diag[i] = 1.0 function_evaluations += 2 return hessian_diag, function_evaluations def _create_gurobi_model( self, gurobi_options: dict, grad: np.ndarray, hessian_diag: np.ndarray, trust_radius: float, x_current: np.ndarray, bounds: list[tuple[float, float]], ): """ Create and configure Gurobi model for trust region subproblem. Parameters ---------- gurobi_options : dict Gurobi optimization options grad : np.ndarray Gradient at current point hessian_diag : np.ndarray Diagonal Hessian approximation trust_radius : float Current trust region radius x_current : np.ndarray Current parameter values bounds : list[tuple[float, float]] Parameter bounds Returns ------- tuple (env, model, step_variables) """ n_params = len(x_current) tolerance = gurobi_options["tolerance"] # Create Gurobi environment and model env = gp.Env(empty=True) if gurobi_options.get("output_flag", 0) == 0: env.setParam("OutputFlag", 0) env.start() model = gp.Model(env=env) # Set Gurobi parameters model.setParam(GRB.Param.OptimalityTol, tolerance) model.setParam(GRB.Param.Method, gurobi_options.get("method", 2)) model.setParam(GRB.Param.TimeLimit, gurobi_options.get("time_limit", 300)) # Create decision variables for step step = model.addVars( n_params, lb=-gp.GRB.INFINITY, ub=gp.GRB.INFINITY, name="step", ) # Trust region constraint: ||step||_2 <= trust_radius model.addQConstr( gp.quicksum(step[i] * step[i] for i in range(n_params)) <= trust_radius**2, "trust_region", ) # Parameter bounds constraints for i in range(n_params): if i < len(bounds): lb, ub = bounds[i] if lb != -np.inf: model.addConstr( step[i] >= lb - x_current[i], f"lower_bound_{i}", ) if ub != np.inf: model.addConstr( step[i] <= ub - x_current[i], f"upper_bound_{i}", ) # Quadratic approximation: grad^T * step + 0.5 * step^T * H_diag * step obj = gp.LinExpr() for i in range(n_params): obj += grad[i] * step[i] # Linear term obj += 0.5 * hessian_diag[i] * step[i] * step[i] # Quadratic term model.setObjective(obj, GRB.MINIMIZE) return env, model, step def _update_trust_region( self, trust_radius: float, step_values: np.ndarray, actual_reduction: float, gurobi_options: dict, ) -> tuple[float, bool]: """ Update trust region radius based on step performance. Parameters ---------- trust_radius : float Current trust region radius step_values : np.ndarray Step taken actual_reduction : float Actual objective function reduction gurobi_options : dict Gurobi options with trust region bounds Returns ------- tuple[float, bool] (new_trust_radius, accept_step) """ step_norm = np.linalg.norm(step_values) if actual_reduction > 0: # Accept step accept_step = True # Expand trust region if step is successful and near boundary if step_norm > 0.8 * trust_radius: new_trust_radius = min( gurobi_options["trust_region_max"], 2 * trust_radius, ) else: new_trust_radius = trust_radius else: # Reject step and shrink trust region accept_step = False new_trust_radius = max( gurobi_options["trust_region_min"], 0.5 * trust_radius, ) return new_trust_radius, accept_step def _create_optimization_result( self, x_current: np.ndarray, f_current: float, success: bool, iteration: int, function_evaluations: int, max_iter: int, grad_norm: float, tolerance: float, ) -> scipy_optimize.OptimizeResult: """ Create optimization result object. Parameters ---------- x_current : np.ndarray Final parameter values f_current : float Final objective value success : bool Whether optimization succeeded iteration : int Number of iterations completed function_evaluations : int Total function evaluations max_iter : int Maximum allowed iterations grad_norm : float Final gradient norm tolerance : float Convergence tolerance Returns ------- scipy_optimize.OptimizeResult Optimization result """ if success and (iteration < max_iter or grad_norm < tolerance): result = scipy_optimize.OptimizeResult( x=x_current, fun=f_current, success=True, status=0, message=f"Iterative Gurobi optimization converged after {iteration} iterations.", nit=iteration, nfev=function_evaluations, method="Gurobi-Iterative-QP", ) logger.debug( f"Gurobi optimization completed: χ² = {f_current:.6e} after {iteration} iterations" ) else: result = scipy_optimize.OptimizeResult( x=x_current, fun=f_current, success=False, status=1, message=f"Iterative Gurobi optimization reached maximum iterations ({max_iter}).", nit=iteration, nfev=function_evaluations, method="Gurobi-Iterative-QP", ) return result def _run_gurobi_optimization( self, objective_func, initial_parameters: np.ndarray, bounds: list[tuple[float, float]] | None = None, method_options: dict[str, Any] | None = None, ) -> tuple[bool, scipy_optimize.OptimizeResult | Exception]: """ Run iterative Gurobi-based optimization using trust region approach. This method uses successive quadratic approximations (SQP-like approach) where: 1. Build quadratic approximation around current point 2. Solve QP subproblem with trust region constraints 3. Evaluate actual objective at new point 4. Update trust region and iterate until convergence Refactoring (Task 3.5): Broken into 7 focused helper methods using extract method pattern: - _initialize_gurobi_options(): Options setup and validation - _estimate_gradient(): Finite difference gradient computation - _estimate_hessian_diagonal(): Diagonal Hessian approximation - _create_gurobi_model(): QP subproblem model creation - _update_trust_region(): Trust region radius management - _create_optimization_result(): Result object construction Parameters ---------- objective_func : callable Chi-squared objective function to minimize initial_parameters : np.ndarray Starting parameters bounds : list[tuple[float, float]], optional Parameter bounds for optimization. If None, extracts bounds from the same configuration section (parameter_space.bounds). method_options : dict[str, Any], optional Gurobi-specific options Returns ------- tuple[bool, OptimizeResult | Exception] (success, result_or_exception) """ try: if not GUROBI_AVAILABLE or gp is None or GRB is None: raise ImportError("Gurobi is not available. Please install gurobipy.") # Get parameter bounds if bounds is None: bounds = self.get_parameter_bounds() n_params = len(initial_parameters) # Step 1: Initialize Gurobi options gurobi_options = self._initialize_gurobi_options(method_options) # Step 2: Initialize optimization state x_current = initial_parameters.copy() f_current = objective_func(x_current) trust_radius = gurobi_options["trust_region_initial"] # Convergence tracking iteration = 0 max_iter = int(gurobi_options["max_iterations"]) tolerance = gurobi_options["tolerance"] function_evaluations = 1 # Already evaluated f_current grad_norm = float("inf") # Initialize for later use logger.debug( f"Starting Gurobi iterative optimization with initial χ² = {f_current:.6e}" ) # Step 3: Iterative trust region optimization for iteration in range(max_iter): # Choose appropriate epsilon based on parameter magnitudes and trust region base_epsilon = max(1e-8, trust_radius / 100) # Step 3a: Estimate gradient using finite differences grad, grad_evals = self._estimate_gradient( objective_func, x_current, base_epsilon, return_tuple=True ) function_evaluations += grad_evals # Check for convergence based on gradient norm grad_norm = float(np.linalg.norm(grad)) if grad_norm < tolerance: logger.debug( f"Gurobi optimization converged at iteration {iteration}: ||grad|| = {grad_norm:.2e}" ) break # Step 3b: Estimate diagonal Hessian approximation hessian_diag, hess_evals = self._estimate_hessian_diagonal( objective_func, x_current, f_current, base_epsilon ) function_evaluations += hess_evals try: # Step 3c: Create and solve Gurobi QP subproblem env, model, step = self._create_gurobi_model( gurobi_options, grad, hessian_diag, trust_radius, x_current, bounds, ) try: # Optimize subproblem model.optimize() if model.status == GRB.OPTIMAL: # Extract step step_values = np.array( [step[i].x for i in range(n_params)] # type: ignore[attr-defined] ) x_new = x_current + step_values f_new = objective_func(x_new) function_evaluations += 1 # Step 3d: Update trust region and accept/reject step actual_reduction = f_current - f_new trust_radius, accept_step = self._update_trust_region( trust_radius, step_values, actual_reduction, gurobi_options, ) if accept_step: step_norm = np.linalg.norm(step_values) logger.debug( f"Iteration {iteration}: χ² = {f_new:.6e} (improvement: {actual_reduction:.2e}, step: {step_norm:.3f})" ) x_current = x_new f_current = f_new else: logger.debug( f"Iteration {iteration}: Step rejected, shrinking trust region to {trust_radius:.6f}" ) # Check convergence if ( actual_reduction > 0 and abs(actual_reduction) < tolerance ): logger.debug( f"Gurobi optimization converged at iteration {iteration}: improvement = {actual_reduction:.2e}" ) break if trust_radius < gurobi_options["trust_region_min"]: logger.debug( f"Gurobi optimization terminated: trust region too small ({trust_radius:.2e})" ) break else: # QP solve failed, shrink trust region and try again trust_radius = max( gurobi_options["trust_region_min"], 0.25 * trust_radius, ) logger.debug( f"QP subproblem failed with status {model.status}, shrinking trust region to {trust_radius:.6f}" ) if trust_radius < gurobi_options["trust_region_min"]: break finally: # Clean up Gurobi resources model.dispose() env.dispose() except Exception as e: logger.warning( f"Gurobi subproblem failed at iteration {iteration}: {e}" ) break # Step 4: Create final result success = iteration < max_iter or grad_norm < tolerance result = self._create_optimization_result( x_current, f_current, success, iteration, function_evaluations, max_iter, grad_norm, tolerance, ) return success, result except Exception as e: logger.error(f"Gurobi optimization failed: {e}") return False, e def _run_robust_optimization( self, method: str, objective_func, initial_parameters: np.ndarray, bounds: ( list[tuple[float, float]] | None ) = None, # Used by robust optimizer internally method_options: dict[str, Any] | None = None, ) -> tuple[bool, scipy_optimize.OptimizeResult | Exception]: """ Run robust optimization using CVXPY + Gurobi. Parameters ---------- method : str Robust optimization method ("Robust-Wasserstein", "Robust-Scenario", "Robust-Ellipsoidal") objective_func : callable Chi-squared objective function to minimize initial_parameters : np.ndarray Starting parameters bounds : list[tuple[float, float]], optional Parameter bounds for optimization method_options : dict[str, Any], optional Robust optimization specific options Returns ------- tuple[bool, OptimizeResult | Exception] (success, result_or_exception) """ try: if not ROBUST_OPTIMIZATION_AVAILABLE or create_robust_optimizer is None: raise ImportError( "Robust optimization not available. Please install cvxpy." ) # Create robust optimizer instance robust_optimizer = create_robust_optimizer(self.core, self.config) # Extract phi_angles and c2_experimental from the objective function context # Check both the direct attributes and the cached versions phi_angles = getattr(self.core, "phi_angles", None) c2_experimental = getattr(self.core, "c2_experimental", None) # If not found, try the cached versions from load_experimental_data if phi_angles is None: phi_angles = getattr(self.core, "_last_phi_angles", None) if c2_experimental is None: c2_experimental = getattr(self.core, "_last_experimental_data", None) if phi_angles is None or c2_experimental is None: raise ValueError( "Robust optimization requires phi_angles and c2_experimental " "to be available in the analysis core. " f"Found phi_angles: { 'present' if phi_angles is not None else 'missing' }, " f"c2_experimental: { 'present' if c2_experimental is not None else 'missing' }" ) # Map method names to robust optimization types method_mapping = { "Robust-Wasserstein": "wasserstein", "Robust-Scenario": "scenario", "Robust-Ellipsoidal": "ellipsoidal", } robust_method = method_mapping.get(method) if robust_method is None: raise ValueError(f"Unknown robust optimization method: {method}") # Run robust optimization optimal_params, info = robust_optimizer.run_robust_optimization( initial_parameters=initial_parameters, phi_angles=phi_angles, c2_experimental=c2_experimental, method=robust_method, **(method_options or {}), ) if optimal_params is not None: # Create OptimizeResult compatible object result = scipy_optimize.OptimizeResult( x=optimal_params, fun=info.get("final_chi_squared", objective_func(optimal_params)), success=True, status=info.get("status", "success"), message=f"Robust optimization ({robust_method}) converged", nit=info.get("n_iterations"), nfev=info.get("function_evaluations", None), method=f"Robust-{robust_method.capitalize()}", ) return True, result # Optimization failed error_msg = info.get( "error", f"Robust optimization ({robust_method}) failed" ) result = scipy_optimize.OptimizeResult( x=initial_parameters, fun=float("inf"), success=False, status=info.get("status", "failed"), message=error_msg, method=f"Robust-{robust_method.capitalize()}", ) return False, result except Exception as e: logger.error(f"Robust optimization error: {e}") return False, e
[docs] def analyze_optimization_results( self, results: list[tuple[str, bool, scipy_optimize.OptimizeResult | Exception]], ) -> dict[str, Any]: """ Analyze and summarize optimization results from Nelder-Mead method. Parameters ---------- results : list[tuple[str, bool, OptimizeResult | Exception]] List of (method_name, success, result_or_exception) tuples (typically one entry for Nelder-Mead) Returns ------- dict[str, Any] Analysis summary including best method, convergence stats, etc. """ successful_results = [] failed_methods = [] for method, success, result in results: if success and hasattr(result, "fun"): successful_results.append((method, result)) else: failed_methods.append((method, result)) if not successful_results: return { "success": False, "failed_methods": failed_methods, "error": "All methods failed", } # Find best result best_method, best_result = min(successful_results, key=lambda x: x[1].fun) # Compute statistics chi2_values = [result.fun for _, result in successful_results] return { "success": True, "best_method": best_method, "best_result": best_result, "best_chi2": best_result.fun, "successful_methods": len(successful_results), "failed_methods": failed_methods, "chi2_statistics": { "min": np.min(chi2_values), "max": np.max(chi2_values), "mean": np.mean(chi2_values), "std": np.std(chi2_values), }, "convergence_info": { method: { "converged": ( getattr(result, "success", False) if not isinstance(result, Exception) else False ), "iterations": getattr(result, "nit", None), "function_evaluations": getattr(result, "nfev", None), "message": getattr(result, "message", None), } for method, result in successful_results }, }
[docs] def get_parameter_bounds( self, effective_param_count: int | None = None, ) -> list[tuple[float, float]]: """ Extract parameter bounds from configuration (unused by Nelder-Mead). This method is kept for compatibility but is not used by Nelder-Mead optimization since it doesn't support explicit bounds. Parameters ---------- effective_param_count : int, optional Number of parameters to use (always 14 for heterodyne model) Returns ------- list[tuple[float, float]] List of (min, max) bounds for each parameter """ bounds = [] param_bounds = self.config.get("parameter_space", {}).get("bounds", []) # Determine effective parameter count if not provided if effective_param_count is None: if hasattr(self.core, "config_manager") and self.core.config_manager: effective_param_count = ( self.core.config_manager.get_effective_parameter_count() ) else: effective_param_count = 14 # Default to heterodyne # Ensure effective_param_count is not None for type checking if effective_param_count is None: effective_param_count = 14 # Final fallback to heterodyne # Extract bounds for the effective parameters for i, bound in enumerate(param_bounds): if i < effective_param_count: bounds.append((bound.get("min", -np.inf), bound.get("max", np.inf))) # Ensure we have enough bounds while len(bounds) < effective_param_count: bounds.append((-np.inf, np.inf)) return bounds[:effective_param_count]
[docs] def compare_optimization_results( self, results: list[tuple[str, scipy_optimize.OptimizeResult | Exception]], ) -> dict[str, Any]: """ Compare optimization results (typically just Nelder-Mead). Parameters ---------- results : list[tuple[str, OptimizeResult | Exception]] List of (method_name, result) tuples (typically one entry for Nelder-Mead) Returns ------- dict[str, Any] Comparison summary with rankings and statistics """ successful_results = [] failed_methods = [] for method, result in results: if isinstance(result, scipy_optimize.OptimizeResult) and result.success: successful_results.append((method, result)) else: failed_methods.append(method) if not successful_results: return {"error": "No successful optimizations to compare"} # Sort by chi-squared value successful_results.sort(key=lambda x: x[1].fun) comparison = { "ranking": [ { "rank": i + 1, "method": method, "chi_squared": result.fun, "converged": result.success, "iterations": getattr(result, "nit", None), "function_evaluations": getattr(result, "nfev", None), "time_elapsed": getattr(result, "execution_time", None), } for i, (method, result) in enumerate(successful_results) ], "best_method": successful_results[0][0], "best_chi_squared": successful_results[0][1].fun, "failed_methods": failed_methods, "success_rate": len(successful_results) / len(results), } return comparison
[docs] def get_optimization_summary( self, best_params: np.ndarray, best_result: scipy_optimize.OptimizeResult, total_time: float, method_name: str = "unknown", ) -> dict[str, Any]: """ Generate comprehensive optimization summary. Parameters ---------- best_params : np.ndarray Best parameters found best_result : OptimizeResult Best optimization result total_time : float Total optimization time in seconds Returns ------- dict[str, Any] Comprehensive optimization summary """ # Parameter names (if available in config) param_names = [] param_bounds = self.config.get("parameter_space", {}).get("bounds", []) for i, bound in enumerate(param_bounds): param_names.append(bound.get("name", f"param_{i}")) summary = { "optimization_successful": True, "best_chi_squared": best_result.fun, "best_parameters": { (param_names[i] if i < len(param_names) else f"param_{i}"): float(param) for i, param in enumerate(best_params) }, "optimization_details": { "method": method_name, "converged": best_result.success, "iterations": getattr(best_result, "nit", None), "function_evaluations": getattr(best_result, "nfev", None), "message": getattr(best_result, "message", None), }, "timing": { "total_time_seconds": total_time, "average_evaluation_time": ( total_time / (getattr(best_result, "nfev", None) or 1) ), }, "parameter_validation": {}, } # Add parameter validation info is_valid, reason = self.validate_parameters(best_params, "Summary") summary["parameter_validation"] = { "valid": is_valid, "reason": (reason if not is_valid else "All parameters within bounds"), } return summary
def _run_scipy_optimization( self, objective_func, initial_parameters: np.ndarray, method: str = "Nelder-Mead", method_options: dict[str, Any] | None = None, ) -> Any: """ Run SciPy optimization method. This is a helper method that wraps scipy.scipy_optimize.minimize for testing purposes. Parameters ---------- objective_func : callable Objective function to minimize initial_parameters : np.ndarray Initial parameter values method : str, optional Optimization method name method_options : dict, optional Method-specific options Returns ------- Any Optimization result from scipy.scipy_optimize.minimize """ if method_options is None: method_options = {} # Filter out comment fields for actual optimization filtered_options = { k: v for k, v in method_options.items() if not (k.startswith("_") and k.endswith("_note")) } return scipy_optimize.minimize( fun=objective_func, x0=initial_parameters, method=method, options=filtered_options, ) def _calculate_chi_squared(self, parameters: np.ndarray) -> float: """ Calculate chi-squared value for given parameters. This is a helper method that delegates to the analysis core for testing purposes. Parameters ---------- parameters : np.ndarray Model parameters Returns ------- float Chi-squared value """ if hasattr(self.core, "_calculate_chi_squared"): # Use the core's chi-squared calculation method if available try: # We need experimental data for the calculation if hasattr(self, "_cached_experimental_data"): return self.core._calculate_chi_squared( parameters, self._cached_experimental_data ) # Load data if not cached c2_experimental, _, phi_angles, _ = self.core.load_experimental_data() self._cached_experimental_data = c2_experimental self._cached_phi_angles = phi_angles return self.core._calculate_chi_squared(parameters, c2_experimental) except Exception as e: logger.warning(f"Chi-squared calculation failed: {e}") return np.inf else: # Fallback calculation logger.warning("Core chi-squared method not available, using fallback") return np.sum((parameters - 1.0) ** 2) # Simple fallback for tests
[docs] def reset_optimization_counter(self): """Reset the global optimization counter.""" reset_optimization_counter()
[docs] def get_optimization_counter(self) -> int: """Get current optimization counter value.""" return get_optimization_counter()
# Module-level wrapper functions for CLI and test compatibility
[docs] def run_classical_optimization_optimized( analyzer, initial_params, phi_angles=None, c2_experimental=None, **kwargs ) -> tuple[np.ndarray | None, Any]: """Optimized classical optimization function. This is a module-level convenience function that creates a ClassicalOptimizer instance and runs the optimization. Parameters ---------- analyzer : HeterodyneAnalysisCore Analysis core instance initial_params : array-like Initial parameter values for optimization phi_angles : array-like, optional Angular positions for analysis c2_experimental : array-like, optional Experimental correlation data **kwargs Additional optimization parameters Returns ------- tuple Optimization results (optimized_params, optimization_result) """ # Extract config from analyzer if available if hasattr(analyzer, "config"): config = analyzer.config elif hasattr(analyzer, "config_dict"): config = analyzer.config_dict else: # Create minimal config for basic operation config = { "initial_parameters": {"values": list(initial_params)}, "optimization_config": {}, "parameter_space": {"bounds": []}, "advanced_settings": {}, } # Convert initial_params to numpy array if needed if hasattr(np, "array") and initial_params is not None: initial_params = np.array(initial_params) # Create optimizer and run optimization optimizer = ClassicalOptimizer(analyzer, config) return optimizer.run_classical_optimization_optimized( initial_parameters=initial_params, phi_angles=phi_angles, c2_experimental=c2_experimental, **kwargs, )
[docs] def run_classical_optimization_optimized_full_signature( analysis_core, config: dict[str, Any], initial_parameters: np.ndarray | None = None, methods: list[str] | None = None, phi_angles: np.ndarray | None = None, c2_experimental: np.ndarray | None = None, ) -> tuple[np.ndarray | None, Any]: """ Module-level wrapper for classical optimization with full signature. This function provides a convenient interface for running classical optimization without explicitly instantiating the ClassicalOptimizer class. Parameters ---------- analysis_core : HeterodyneAnalysisCore Core analysis engine instance config : dict[str, Any] Configuration dictionary initial_parameters : np.ndarray, optional Starting parameters for optimization methods : list[str], optional List of optimization methods to try phi_angles : np.ndarray, optional Scattering angles c2_experimental : np.ndarray, optional Experimental data Returns ------- tuple (best_parameters, optimization_result) """ optimizer = ClassicalOptimizer(analysis_core, config) return optimizer.run_classical_optimization_optimized( initial_parameters=initial_parameters, methods=methods, phi_angles=phi_angles, c2_experimental=c2_experimental, )