"""
Numerical simulation engine for MechanicsDSL
"""
import asyncio
import concurrent.futures
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np
import sympy as sp
from scipy.integrate import solve_ivp
from ..symbolic import SymbolicEngine
from ..utils import (
TimeoutError,
_perf_monitor,
config,
logger,
profile_function,
safe_array_access,
safe_float_conversion,
timeout,
validate_array_safe,
validate_finite,
validate_time_span,
)
__all__ = ["NumericalSimulator"]
[docs]
class NumericalSimulator:
"""Enhanced numerical simulator with better stability and diagnostics"""
def __init__(self, symbolic_engine: SymbolicEngine):
self.symbolic = symbolic_engine
self.equations: Dict[str, Callable] = {}
self.parameters: Dict[str, float] = {}
self.initial_conditions: Dict[str, float] = {}
self.constraints: List[sp.Expr] = []
self.state_vars: List[str] = []
self.coordinates: List[str] = []
self.use_hamiltonian: bool = False
self.hamiltonian_equations: Optional[Dict[str, List[Tuple]]] = None
# Warnings recorded by the most recent equation-compilation pass so
# callers can surface lambdify/eval failures instead of accepting the
# silent zero fallback.
self.last_compile_warnings: List[str] = []
[docs]
def set_parameters(self, params: Dict[str, float]):
"""Set physical parameters"""
self.parameters.update(params)
logger.debug(f"Set parameters: {params}")
[docs]
def set_initial_conditions(self, conditions: Dict[str, float]):
"""Set initial conditions"""
self.initial_conditions.update(conditions)
logger.debug(f"Set initial conditions: {conditions}")
[docs]
def add_constraint(self, constraint_expr: sp.Expr):
"""Add a constraint equation"""
self.constraints.append(constraint_expr)
logger.debug(f"Added constraint: {constraint_expr}")
[docs]
@profile_function
def compile_equations(self, accelerations: Dict[str, sp.Expr], coordinates: List[str]):
"""Compile symbolic equations to numerical functions"""
logger.info(f"Compiling equations for {len(coordinates)} coordinates")
self.last_compile_warnings = []
state_vars = []
for q in coordinates:
state_vars.extend([q, f"{q}_dot"])
param_subs = {self.symbolic.get_symbol(k): v for k, v in self.parameters.items()}
compiled_equations = {}
for q in coordinates:
accel_key = f"{q}_ddot"
if accel_key in accelerations:
eq = accelerations[accel_key].subs(param_subs)
# Attempt simplification with timeout
try:
if config.simplification_timeout > 0:
with timeout(config.simplification_timeout):
eq = sp.simplify(eq)
else:
eq = sp.simplify(eq)
except TimeoutError:
logger.debug(f"Simplification timeout for {accel_key}, skipping")
except (ValueError, TypeError, AttributeError) as e:
logger.debug(f"Simplification error for {accel_key}: {e}, skipping")
eq = self._replace_derivatives(eq, coordinates)
free_symbols = eq.free_symbols
ordered_symbols = []
symbol_indices = []
has_time = False
# Check if equation depends on time
time_sym = self.symbolic.time_symbol
if time_sym in free_symbols:
has_time = True
ordered_symbols.append(time_sym)
symbol_indices.append(-1) # Special index for time
for i, var_name in enumerate(state_vars):
sym = self.symbolic.get_symbol(var_name)
if sym in free_symbols:
ordered_symbols.append(sym)
symbol_indices.append(i)
if ordered_symbols:
try:
func = sp.lambdify(ordered_symbols, eq, modules=["numpy", "math"])
def make_wrapper(func, indices, has_time_flag):
# Force capture by value
def wrapper(
*args_with_time,
_func=func,
_indices=indices,
_has_time=has_time_flag,
):
try:
# FIX: Always split time from state vector
# args_with_time comes from solve_ivp, so it is ALWAYS (t, y0, y1, ...) # noqa: E501
if len(args_with_time) < 1:
return 0.0
t_val = float(args_with_time[0])
state_vector = args_with_time[1:]
func_args = []
for idx in _indices:
if idx == -1: # Time index
func_args.append(t_val)
elif idx >= 0: # State variable index
# Safe access to state vector
if idx < len(state_vector):
func_args.append(state_vector[idx])
else:
func_args.append(0.0)
# Execute the lambda function
if len(func_args) == len(_indices):
result = _func(*func_args)
# Ensure we return a float
if isinstance(result, np.ndarray):
result = (
float(result.item())
if result.size == 1
else float(result.flat[0])
)
return float(result) if np.isfinite(result) else 0.0
return 0.0
except (
ValueError,
TypeError,
ZeroDivisionError,
OverflowError,
) as e:
logger.debug(f"Evaluation error: {e}")
return 0.0
return wrapper
compiled_equations[accel_key] = make_wrapper(func, symbol_indices, has_time)
logger.debug(f"Compiled {accel_key}")
except (
ValueError,
TypeError,
AttributeError,
NotImplementedError,
SyntaxError,
) as e:
msg = f"Compilation failed for {accel_key}: {e}; using zero fallback."
logger.error(msg)
self.last_compile_warnings.append(msg)
compiled_equations[accel_key] = lambda *args: 0.0
except Exception as e:
msg = (
f"Unexpected compilation error for {accel_key}: "
f"{type(e).__name__}: {e}; using zero fallback."
)
logger.error(msg)
self.last_compile_warnings.append(msg)
compiled_equations[accel_key] = lambda *args: 0.0
else:
try:
const_value = float(sp.N(eq))
# IMPORTANT: Use default argument to capture const_value by value
# Without this, all constant lambdas share the same value (closure bug)
compiled_equations[accel_key] = lambda *args, _val=const_value: _val
logger.debug(f"{accel_key} is constant: {const_value}")
except (ValueError, TypeError) as e:
msg = (
f"Could not evaluate constant for {accel_key}: {e}; "
"using zero fallback."
)
logger.warning(msg)
self.last_compile_warnings.append(msg)
compiled_equations[accel_key] = lambda *args: 0.0
self.equations = compiled_equations
self.state_vars = state_vars
self.coordinates = coordinates
logger.info("Equation compilation complete")
[docs]
def compile_hamiltonian_equations(
self, q_dots: List[sp.Expr], p_dots: List[sp.Expr], coordinates: List[str]
):
"""Compile Hamiltonian equations"""
logger.info("Compiling Hamiltonian equations")
self.use_hamiltonian = True
state_vars = []
for q in coordinates:
state_vars.extend([q, f"p_{q}"])
param_subs = {self.symbolic.get_symbol(k): v for k, v in self.parameters.items()}
self.hamiltonian_equations = {"q_dots": [], "p_dots": []}
for i, q in enumerate(coordinates):
q_dot_eq = q_dots[i].subs(param_subs)
p_dot_eq = p_dots[i].subs(param_subs)
# Compile q_dot
free_syms = q_dot_eq.free_symbols
ordered_syms = []
indices = []
for j, var_name in enumerate(state_vars):
sym = self.symbolic.get_symbol(var_name)
if sym in free_syms:
ordered_syms.append(sym)
indices.append(j)
if ordered_syms:
func = sp.lambdify(ordered_syms, q_dot_eq, modules=["numpy", "math"])
self.hamiltonian_equations["q_dots"].append((func, indices))
else:
const_val = float(sp.N(q_dot_eq))
self.hamiltonian_equations["q_dots"].append((lambda *args, v=const_val: v, []))
# Compile p_dot
free_syms = p_dot_eq.free_symbols
ordered_syms = []
indices = []
for j, var_name in enumerate(state_vars):
sym = self.symbolic.get_symbol(var_name)
if sym in free_syms:
ordered_syms.append(sym)
indices.append(j)
if ordered_syms:
func = sp.lambdify(ordered_syms, p_dot_eq, modules=["numpy", "math"])
self.hamiltonian_equations["p_dots"].append((func, indices))
else:
const_val = float(sp.N(p_dot_eq))
self.hamiltonian_equations["p_dots"].append((lambda *args, v=const_val: v, []))
self.state_vars = state_vars
self.coordinates = coordinates
logger.info("Hamiltonian compilation complete")
def _replace_derivatives(self, expr: sp.Expr, coordinates: List[str]) -> sp.Expr:
"""Replace Derivative objects with corresponding symbols"""
derivs = list(expr.atoms(sp.Derivative))
for d in derivs:
try:
base = d.args[0]
order = 1
if len(d.args) >= 2:
arg2 = d.args[1]
if isinstance(arg2, tuple) and len(arg2) >= 2:
order = int(arg2[1])
base_name = str(base)
if base_name in coordinates:
if order == 1:
repl = self.symbolic.get_symbol(f"{base_name}_dot")
elif order == 2:
repl = self.symbolic.get_symbol(f"{base_name}_ddot")
else:
continue
expr = expr.xreplace({d: repl})
except (AttributeError, TypeError, ValueError) as e:
logger.debug(f"Could not replace derivative: {e}")
continue
return expr
[docs]
def equations_of_motion(self, t: float, y: np.ndarray) -> np.ndarray:
"""
ODE system for numerical integration with comprehensive bounds checking and validation.
Args:
t: Current time
y: State vector
Returns:
Derivative vector dydt
"""
# Comprehensive input validation
if not isinstance(t, (int, float)) or not np.isfinite(t):
logger.error(f"equations_of_motion: invalid time t={t}, using 0.0")
t = 0.0
if y is None:
logger.error("equations_of_motion: y is None, returning zeros")
if self.coordinates:
return np.zeros(2 * len(self.coordinates))
return np.zeros(1)
if not isinstance(y, np.ndarray):
logger.error(f"equations_of_motion: y is not numpy.ndarray, got {type(y).__name__}")
try:
y = np.array(y, dtype=float)
except Exception as e:
logger.error(f"equations_of_motion: cannot convert y to array: {e}")
if self.coordinates:
return np.zeros(2 * len(self.coordinates))
return np.zeros(1)
if not validate_array_safe(y, "state_vector", min_size=1, check_finite=False):
logger.warning(
"equations_of_motion: state vector validation failed, attempting recovery"
)
# Try to fix non-finite values
y = np.nan_to_num(y, nan=0.0, posinf=1e10, neginf=-1e10)
if self.use_hamiltonian:
return self._hamiltonian_ode(t, y)
# Validate expected size
expected_size = 2 * len(self.coordinates) if self.coordinates else 1
if y.size != expected_size:
logger.warning(
f"equations_of_motion: state vector size {y.size} != expected {expected_size}"
)
# Try to pad or truncate
if y.size < expected_size:
y = np.pad(y, (0, expected_size - y.size), mode="constant", constant_values=0.0)
else:
y = y[:expected_size]
try:
dydt = np.zeros_like(y)
# Position derivatives = velocities (with comprehensive bounds checking)
for i in range(len(self.coordinates)):
pos_idx = 2 * i
vel_idx = 2 * i + 1
if vel_idx < len(y) and pos_idx < len(dydt):
vel_value = safe_array_access(y, vel_idx, 0.0)
dydt[pos_idx] = vel_value
elif pos_idx < len(dydt):
dydt[pos_idx] = 0.0
for i, q in enumerate(self.coordinates):
accel_key = f"{q}_ddot"
vel_idx = 2 * i + 1
if accel_key in self.equations and vel_idx < len(dydt):
try:
eq_func = self.equations.get(accel_key)
if eq_func is None:
logger.warning(
f"equations_of_motion: equation function for {accel_key} is None"
)
dydt[vel_idx] = 0.0
continue
try:
# Pass time and state vector - wrapper handles whether time is needed
accel_value = eq_func(t, *y)
accel_value = safe_float_conversion(accel_value)
if np.isfinite(accel_value):
dydt[vel_idx] = accel_value
else:
dydt[vel_idx] = 0.0
logger.warning(f"Non-finite acceleration for {q} at t={t:.6f}")
except (
ValueError,
TypeError,
ZeroDivisionError,
IndexError,
OverflowError,
) as e:
dydt[vel_idx] = 0.0
logger.debug(f"Evaluation error for {q} at t={t:.6f}: {e}")
except Exception as e:
dydt[vel_idx] = 0.0
logger.error(f"Unexpected error evaluating {accel_key}: {e}", exc_info=True)
elif vel_idx < len(dydt):
dydt[vel_idx] = 0.0
# Final validation of result
if not validate_array_safe(dydt, "dydt", check_finite=True):
logger.warning(
"equations_of_motion: result validation failed, fixing non-finite values"
)
dydt = np.nan_to_num(dydt, nan=0.0, posinf=1e10, neginf=-1e10)
return dydt
except Exception as e:
logger.error(f"equations_of_motion: unexpected error: {e}", exc_info=True)
# Return safe default
if self.coordinates:
return np.zeros(2 * len(self.coordinates))
return np.zeros(1)
def _hamiltonian_ode(self, t: float, y: np.ndarray) -> np.ndarray:
"""
ODE system for Hamiltonian formulation with comprehensive validation.
Args:
t: Current time
y: State vector
Returns:
Derivative vector dydt
"""
# Input validation
if not isinstance(t, (int, float)) or not np.isfinite(t):
logger.error(f"_hamiltonian_ode: invalid time t={t}, using 0.0")
t = 0.0
if y is None:
logger.error("_hamiltonian_ode: y is None")
if self.coordinates:
return np.zeros(2 * len(self.coordinates))
return np.zeros(1)
if not isinstance(y, np.ndarray):
logger.error(f"_hamiltonian_ode: y is not numpy.ndarray, got {type(y).__name__}")
try:
y = np.array(y, dtype=float)
except Exception as e:
logger.error(f"_hamiltonian_ode: cannot convert y to array: {e}")
if self.coordinates:
return np.zeros(2 * len(self.coordinates))
return np.zeros(1)
if not validate_array_safe(y, "hamiltonian_state_vector", min_size=1, check_finite=False):
logger.warning("_hamiltonian_ode: state vector validation failed, attempting recovery")
y = np.nan_to_num(y, nan=0.0, posinf=1e10, neginf=-1e10)
try:
dydt = np.zeros_like(y)
if self.hamiltonian_equations is None:
logger.error("_hamiltonian_ode: hamiltonian_equations is None, cannot compute ODE")
return dydt
if not isinstance(self.hamiltonian_equations, dict):
logger.error(
f"_hamiltonian_ode: hamiltonian_equations is not dict, got {type(self.hamiltonian_equations).__name__}" # noqa: E501
)
return dydt
if (
"q_dots" not in self.hamiltonian_equations
or "p_dots" not in self.hamiltonian_equations
):
logger.error("_hamiltonian_ode: hamiltonian_equations missing required keys")
return dydt
for i, q in enumerate(self.coordinates):
if not isinstance(q, str):
logger.warning(
f"_hamiltonian_ode: coordinate {i} is not string: {type(q).__name__}"
)
continue
if i >= len(self.hamiltonian_equations["q_dots"]):
logger.warning(
f"_hamiltonian_ode: Index {i} out of range for q_dots (len={len(self.hamiltonian_equations['q_dots'])})" # noqa: E501
)
continue
try:
q_dot_data = self.hamiltonian_equations["q_dots"][i]
if not isinstance(q_dot_data, tuple) or len(q_dot_data) != 2:
logger.error(f"_hamiltonian_ode: invalid q_dot data structure at index {i}")
continue
func, indices = q_dot_data
if func is None:
logger.warning(f"_hamiltonian_ode: function is None for d{q}/dt")
continue
if not isinstance(indices, (list, tuple)):
logger.warning(f"_hamiltonian_ode: indices is not list/tuple for d{q}/dt")
indices = []
q_idx = 2 * i
if q_idx < len(dydt):
try:
args = [
safe_array_access(y, j, 0.0)
for j in indices
if isinstance(j, int) and j >= 0
]
if len(args) == len(indices):
result = func(*args)
dydt[q_idx] = safe_float_conversion(result)
if not np.isfinite(dydt[q_idx]):
logger.warning(
f"_hamiltonian_ode: non-finite d{q}/dt, setting to 0.0"
)
dydt[q_idx] = 0.0
else:
dydt[q_idx] = 0.0
logger.warning(
f"_hamiltonian_ode: Incomplete arguments for d{q}/dt (got {len(args)}, expected {len(indices)})" # noqa: E501
)
except (
ValueError,
TypeError,
ZeroDivisionError,
IndexError,
OverflowError,
) as e:
dydt[q_idx] = 0.0
logger.debug(f"_hamiltonian_ode: Evaluation error for d{q}/dt: {e}")
except Exception as e:
dydt[q_idx] = 0.0
logger.error(
f"_hamiltonian_ode: Unexpected error for d{q}/dt: {e}",
exc_info=True,
)
except (IndexError, TypeError, ValueError) as e:
logger.error(f"_hamiltonian_ode: Error accessing q_dots[{i}]: {e}")
continue
if i >= len(self.hamiltonian_equations["p_dots"]):
logger.warning(
f"_hamiltonian_ode: Index {i} out of range for p_dots (len={len(self.hamiltonian_equations['p_dots'])})" # noqa: E501
)
continue
try:
p_dot_data = self.hamiltonian_equations["p_dots"][i]
if not isinstance(p_dot_data, tuple) or len(p_dot_data) != 2:
logger.error(f"_hamiltonian_ode: invalid p_dot data structure at index {i}")
continue
func, indices = p_dot_data
if func is None:
logger.warning(f"_hamiltonian_ode: function is None for dp_{q}/dt")
continue
if not isinstance(indices, (list, tuple)):
logger.warning(f"_hamiltonian_ode: indices is not list/tuple for dp_{q}/dt")
indices = []
p_idx = 2 * i + 1
if p_idx < len(dydt):
try:
args = [
safe_array_access(y, j, 0.0)
for j in indices
if isinstance(j, int) and j >= 0
]
if len(args) == len(indices):
result = func(*args)
dydt[p_idx] = safe_float_conversion(result)
if not np.isfinite(dydt[p_idx]):
logger.warning(
f"_hamiltonian_ode: non-finite dp_{q}/dt, setting to 0.0"
)
dydt[p_idx] = 0.0
else:
dydt[p_idx] = 0.0
logger.warning(
f"_hamiltonian_ode: Incomplete arguments for dp_{q}/dt (got {len(args)}, expected {len(indices)})" # noqa: E501
)
except (
ValueError,
TypeError,
ZeroDivisionError,
IndexError,
OverflowError,
) as e:
dydt[p_idx] = 0.0
logger.debug(f"_hamiltonian_ode: Evaluation error for dp_{q}/dt: {e}")
except Exception as e:
dydt[p_idx] = 0.0
logger.error(
f"_hamiltonian_ode: Unexpected error for dp_{q}/dt: {e}",
exc_info=True,
)
except (IndexError, TypeError, ValueError) as e:
logger.error(f"_hamiltonian_ode: Error accessing p_dots[{i}]: {e}")
continue
if not validate_array_safe(dydt, "hamiltonian_dydt", check_finite=True):
logger.warning(
"_hamiltonian_ode: result validation failed, fixing non-finite values"
)
dydt = np.nan_to_num(dydt, nan=0.0, posinf=1e10, neginf=-1e10)
return dydt
except Exception as e:
logger.error(f"_hamiltonian_ode: unexpected error: {e}", exc_info=True)
if self.coordinates:
return np.zeros(2 * len(self.coordinates))
return np.zeros(1)
def _select_optimal_solver(self, t_span: Tuple[float, float], y0: np.ndarray) -> str:
"""Intelligently select the optimal solver based on system characteristics."""
if not config.enable_adaptive_solver:
# Default to LSODA for better stability in CI environments
return "LSODA"
# Analyze system characteristics
n_dof = len(self.coordinates)
time_span = t_span[1] - t_span[0]
# Large systems benefit from implicit methods
if n_dof > 10:
return "LSODA"
# Long time spans may need more stable methods
if time_span > 100:
return "LSODA"
# Small, simple systems can use fast explicit methods
if n_dof <= 2 and time_span < 10:
return "RK45"
# Default to LSODA for better stability (especially in CI)
return "LSODA"
[docs]
@profile_function
def simulate(
self,
t_span: Tuple[float, float],
num_points: int = 1000,
method: str = None,
rtol: float = None,
atol: float = None,
detect_stiff: bool = True,
) -> dict:
"""
Run numerical simulation with adaptive integration and diagnostics.
Args:
t_span: Time span (t_start, t_end) where t_start < t_end
num_points: Number of output points (must be >= 2)
method: Integration method ('RK45', 'LSODA', 'Radau', etc.)
rtol: Relative tolerance (must be in (0, 1))
atol: Absolute tolerance (must be positive)
detect_stiff: Whether to detect stiff systems
Returns:
Dictionary with solution data and metadata, always contains 'success' key
Raises:
TypeError: If arguments have wrong types
ValueError: If arguments are out of valid ranges
Example:
>>> solution = simulator.simulate((0, 10), num_points=1000)
>>> if solution['success']:
... t = solution['t']
... y = solution['y']
"""
# Comprehensive input validation
validate_time_span(t_span)
if not isinstance(num_points, int):
raise TypeError(f"num_points must be int, got {type(num_points).__name__}")
if num_points < 2:
raise ValueError(f"num_points must be >= 2, got {num_points}")
if num_points > 10_000_000:
raise ValueError(f"num_points too large (>{10_000_000}), got {num_points}")
# Validate method if provided, but don't set default yet (will be set by solver selection)
if method is not None:
if not isinstance(method, str):
raise TypeError(f"method must be str, got {type(method).__name__}")
valid_methods = ["RK45", "RK23", "DOP853", "Radau", "BDF", "LSODA"]
if method not in valid_methods:
raise ValueError(f"method must be one of {valid_methods}, got {method}")
if rtol is not None:
if not isinstance(rtol, (int, float)):
raise TypeError(f"rtol must be numeric, got {type(rtol).__name__}")
if rtol <= 0 or rtol >= 1:
raise ValueError(f"rtol must be in (0, 1), got {rtol}")
if atol is not None:
if not isinstance(atol, (int, float)):
raise TypeError(f"atol must be numeric, got {type(atol).__name__}")
if atol <= 0:
raise ValueError(f"atol must be positive, got {atol}")
if not isinstance(detect_stiff, bool):
raise TypeError(f"detect_stiff must be bool, got {type(detect_stiff).__name__}")
# START PERFORMANCE TIMER
if config.enable_performance_monitoring:
_perf_monitor.start_timer("simulation")
rtol = rtol or config.default_rtol
atol = atol or config.default_atol
# Build initial state vector first to get y0 for solver selection
y0 = []
for q in self.coordinates:
if self.use_hamiltonian:
pos_val = self.initial_conditions.get(q, 0.0)
y0.append(pos_val)
mom_key = f"p_{q}"
mom_val = self.initial_conditions.get(mom_key, 0.0)
y0.append(mom_val)
else:
pos_val = self.initial_conditions.get(q, 0.0)
y0.append(pos_val)
vel_key = f"{q}_dot"
vel_val = self.initial_conditions.get(vel_key, 0.0)
y0.append(vel_val)
y0 = np.array(y0, dtype=float)
# FIX: Actually select the optimal solver if none provided
if method is None:
method = self._select_optimal_solver(t_span, y0)
logger.info(f"Adaptive solver selected: {method}")
elif method == "RK45" and config.enable_adaptive_solver:
# Check if we should switch to a more stable method
pass
else:
pass
t_eval = np.linspace(t_span[0], t_span[1], num_points)
# Validate initial conditions
if not validate_finite(y0, "Initial conditions"):
return {"success": False, "error": "Initial conditions contain non-finite values"}
# Test initial evaluation
try:
dydt_test = self.equations_of_motion(t_span[0], y0)
if not validate_finite(dydt_test, "Initial derivatives"):
return {"success": False, "error": "Initial derivatives contain non-finite values"}
except (ValueError, TypeError, AttributeError, RuntimeError) as e:
logger.error(f"Initial evaluation failed: {e}")
return {"success": False, "error": f"Initial evaluation failed: {str(e)}"}
except Exception as e:
logger.error(f"Unexpected error in initial evaluation: {type(e).__name__}: {e}")
return {"success": False, "error": f"Initial evaluation failed: {str(e)}"}
# Stiffness detection
is_stiff = False
if detect_stiff and method == "RK45":
try:
test_sol = solve_ivp(
self.equations_of_motion,
(t_span[0], t_span[0] + 0.01),
y0,
method="RK45",
max_step=0.001,
)
if not test_sol.success:
is_stiff = True
logger.warning("System may be stiff. Consider using 'LSODA' or 'Radau' method.")
except (ValueError, RuntimeError, AttributeError) as e:
logger.debug(f"Stiffness detection test failed: {e}")
# Run integration
try:
solution = solve_ivp(
self.equations_of_motion,
t_span,
y0,
t_eval=t_eval,
method=method,
rtol=rtol,
atol=atol,
max_step=min(0.01, (t_span[1] - t_span[0]) / 100),
)
logger.info(
f"Simulation complete: {solution.nfev} evaluations, "
f"status={'success' if solution.success else 'failed'}"
)
# Performance monitoring
if config.enable_performance_monitoring:
_perf_monitor.stop_timer("simulation")
_perf_monitor.snapshot_memory("post_simulation")
result = {
"success": solution.success,
"t": solution.t,
"y": solution.y,
"coordinates": self.coordinates,
"state_vars": self.state_vars,
"message": solution.message if hasattr(solution, "message") else "",
"nfev": solution.nfev if hasattr(solution, "nfev") else 0,
"is_stiff": is_stiff,
"use_hamiltonian": self.use_hamiltonian,
"method_used": method, # Track which method was used
}
# Add performance metrics if available
if config.enable_performance_monitoring:
sim_stats = _perf_monitor.get_stats("simulation")
if sim_stats:
result["performance"] = sim_stats
return result
except Exception as e:
logger.error(f"Simulation failed: {e}")
return {"success": False, "error": str(e)}
[docs]
async def simulate_async(
self,
t_span: Tuple[float, float],
num_points: int = 1000,
method: str = None,
rtol: float = None,
atol: float = None,
detect_stiff: bool = True,
executor: Optional[concurrent.futures.Executor] = None,
) -> Dict[str, Any]:
"""
Run numerical simulation asynchronously.
This method runs the simulation in a thread pool executor to avoid
blocking the event loop. Useful for:
- Jupyter notebooks (keeps the kernel responsive)
- Async web backends (FastAPI, aiohttp)
- GUI applications with async event loops
Args:
t_span: Time span (t_start, t_end)
num_points: Number of output points
method: Integration method ('RK45', 'LSODA', 'Radau', etc.)
rtol: Relative tolerance
atol: Absolute tolerance
detect_stiff: Whether to detect stiff systems
executor: Optional custom executor (defaults to ThreadPoolExecutor)
Returns:
Dictionary with solution data and metadata
Example:
>>> import asyncio
>>> async def main():
... result = await simulator.simulate_async((0, 10), num_points=1000)
... print(f"Success: {result['success']}")
>>> asyncio.run(main())
Note:
For CPU-bound simulations, consider using ProcessPoolExecutor
for true parallelism (requires picklable equations).
"""
loop = asyncio.get_event_loop()
# Use provided executor or create a new thread pool
if executor is None:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
result = await loop.run_in_executor(
pool,
lambda: self.simulate(
t_span=t_span,
num_points=num_points,
method=method,
rtol=rtol,
atol=atol,
detect_stiff=detect_stiff,
),
)
else:
result = await loop.run_in_executor(
executor,
lambda: self.simulate(
t_span=t_span,
num_points=num_points,
method=method,
rtol=rtol,
atol=atol,
detect_stiff=detect_stiff,
),
)
return result
[docs]
async def simulate_batch_async(
self, simulations: List[Dict[str, Any]], max_concurrent: int = 4
) -> List[Dict[str, Any]]:
"""
Run multiple simulations concurrently.
Useful for parameter sweeps, sensitivity analysis, or ensemble
simulations where many similar systems need to be simulated.
Args:
simulations: List of simulation parameter dicts, each containing:
- t_span: Tuple[float, float] (required)
- num_points: int (optional, default 1000)
- method: str (optional)
- rtol: float (optional)
- atol: float (optional)
max_concurrent: Maximum concurrent simulations
Returns:
List of simulation results in the same order as input
Example:
>>> simulations = [
... {'t_span': (0, 10), 'num_points': 500},
... {'t_span': (0, 20), 'num_points': 1000},
... ]
>>> results = await simulator.simulate_batch_async(simulations)
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def run_with_semaphore(sim_params: Dict[str, Any]) -> Dict[str, Any]:
async with semaphore:
return await self.simulate_async(**sim_params)
tasks = [run_with_semaphore(sim) for sim in simulations]
results = await asyncio.gather(*tasks)
return list(results)