Source code for mechanics_dsl.compiler

"""
Main compiler and system serialization for MechanicsDSL
"""

import gc
import json
import os
import pickle
import subprocess
import sys
import time
import traceback
from typing import Any, Dict, List, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
import sympy as sp

from .codegen.cpp import CppGenerator
from .parser import (
    ASTNode,
    BoundaryDef,
    ConstraintDef,
    DampingDef,
    DefineDef,
    Expression,
    FluidDef,
    ForceDef,
    HamiltonianDef,
    ImportDef,
    InitialCondition,
    LagrangianDef,
    MechanicsParser,
    NonHolonomicConstraintDef,
    ParameterDef,
    RayleighDef,
    RegionDef,
    SystemDef,
    TransformDef,
    VarDef,
    tokenize,
)
from .solver import NumericalSimulator
from .symbolic import SymbolicEngine
from .units import UnitSystem
from .utils import (
    LRUCache,
    _perf_monitor,
    config,
    is_likely_coordinate,
    logger,
    profile_function,
    validate_file_path,
)
from .visualization import MechanicsVisualizer

# Security module for input validation
try:
    from .security import InjectionError, validate_dsl_code

    SECURITY_AVAILABLE = True
except ImportError:
    SECURITY_AVAILABLE = False
    InjectionError = ValueError  # type: ignore[misc]

def _package_version() -> str:
    """Lazy lookup of the package version to dodge the circular import that
    would happen if we did ``from . import __version__`` at the module level
    (this file is imported by ``__init__.py`` before ``__version__`` is set)."""
    from . import __version__ as _v

    return _v


[docs] class SystemSerializer: """Serialize and deserialize compiled physics systems"""
[docs] @staticmethod def export_system(compiler: "PhysicsCompiler", filename: str, format: str = "json") -> bool: """ Export compiled system to file Args: compiler: PhysicsCompiler instance filename: Output filename format: Export format ('json' or 'pickle') Returns: True if successful """ try: state = { "version": _package_version(), "system_name": compiler.system_name, "variables": compiler.variables, "parameters": compiler.parameters_def, "initial_conditions": compiler.initial_conditions, "lagrangian": str(compiler.lagrangian) if compiler.lagrangian else None, "hamiltonian": str(compiler.hamiltonian) if compiler.hamiltonian else None, "coordinates": compiler.get_coordinates(), "use_hamiltonian": compiler.use_hamiltonian_formulation, "constraints": [str(c) for c in compiler.constraints], "transforms": {k: str(v) for k, v in compiler.transforms.items()}, } if format == "json": with open(filename, "w") as f: json.dump(state, f, indent=2) elif format == "pickle": with open(filename, "wb") as f: pickle.dump(state, f) else: raise ValueError(f"Unknown format: {format}") logger.info(f"System exported to {filename}") return True except (IOError, OSError, PermissionError, ValueError) as e: logger.error(f"Export failed: {e}") return False except Exception as e: logger.error(f"Unexpected export error: {type(e).__name__}: {e}") return False
[docs] @staticmethod def import_system(filename: str, allow_pickle: bool = False) -> Optional[dict]: """ Import system state from file with validation. Args: filename: Input filename (validated) allow_pickle: Required to be True to load .pkl/.pickle files. Pickle deserialization can execute arbitrary code, so this is refused by default. Only enable for fully trusted local files (e.g. ones you produced yourself). Returns: System state dictionary or None if failed Raises: TypeError: If filename is not a string ValueError: If filename is invalid FileNotFoundError: If file doesn't exist """ validate_file_path(filename, must_exist=True) try: if filename.endswith(".json"): with open(filename, "r", encoding="utf-8") as f: state = json.load(f) elif filename.endswith(".pkl") or filename.endswith(".pickle"): if not allow_pickle: logger.error( "import_system refused to load pickle file: pickle " "deserialization can execute arbitrary code. Pass " "allow_pickle=True only for fully trusted local files." ) return None logger.warning( f"Loading pickle from {filename}; allow_pickle=True was set." ) with open(filename, "rb") as f: # nosec B301: caller opted in via allow_pickle=True state = pickle.load(f) # nosec B301 else: # Try JSON first with open(filename, "r", encoding="utf-8") as f: state = json.load(f) logger.info(f"System imported from {filename}") return state except Exception as e: logger.error(f"Import failed: {e}") return None
[docs] class ParticleGenerator: """Generates discrete particle positions from geometric regions"""
[docs] @staticmethod def generate(region: RegionDef, spacing: float) -> List[Tuple[float, float]]: """ Generate grid of particles within a region. """ if region.shape == "rectangle": x_range = region.constraints.get("x", (0, 0)) y_range = region.constraints.get("y", (0, 0)) # Create grid x_points = np.arange(x_range[0], x_range[1], spacing) y_points = np.arange(y_range[0], y_range[1], spacing) # Meshgrid xx, yy = np.meshgrid(x_points, y_points) # Flatten to list of (x, y) tuples return list(zip(xx.flatten(), yy.flatten())) elif region.shape == "line": # Useful for boundaries x_range = region.constraints.get("x", (0, 0)) y_range = region.constraints.get("y", (0, 0)) if x_range[0] == x_range[1]: # Vertical line y_points = np.arange(y_range[0], y_range[1], spacing / 2.0) # Denser walls return [(x_range[0], y) for y in y_points] else: # Horizontal line x_points = np.arange(x_range[0], x_range[1], spacing / 2.0) return [(x, y_range[0]) for x in x_points] return []
[docs] class PhysicsCompiler: """ Main compiler class. Production-ready physics DSL compiler with comprehensive validation, cross-platform support, and security hardening. Features: - Cross-platform timeout support (Windows/Unix) - Safe AST-based parsing (no eval()) - Comprehensive input validation - Specific exception handling - Extensive type hints - Production-ready error recovery Example: >>> compiler = PhysicsCompiler() >>> result = compiler.compile_dsl("\\system{pendulum}\\lagrangian{x^2}") >>> if result['success']: ... solution = compiler.simulate((0, 10)) ... compiler.animate(solution) """ def __init__(self): self.ast: List[ASTNode] = [] self.variables: Dict[str, dict] = {} self.definitions: Dict[str, dict] = {} self.parameters_def: Dict[str, dict] = {} self.system_name: str = "unnamed_system" self.lagrangian: Optional[Expression] = None self.hamiltonian: Optional[Expression] = None self.transforms: Dict[str, dict] = {} self.constraints: List[Expression] = [] self.non_holonomic_constraints: List[Expression] = [] # Each entry is (target_coordinate_or_None, force_expression). self.forces: List[Tuple[Optional[str], Expression]] = [] self.damping_forces: List[Expression] = [] self.rayleigh_dissipation: Optional[Expression] = None self.initial_conditions: Dict[str, float] = {} self.fluid_particles: List[Dict[str, float]] = [] # [{'x': 1.0, 'y': 2.0, 'm': 0.01}, ...] self.boundary_particles: List[Dict[str, float]] = [] self.smoothing_length: float = 0.1 # Default 'h' self.symbolic = SymbolicEngine() self.simulator = NumericalSimulator(self.symbolic) self.visualizer = MechanicsVisualizer() self.unit_system = UnitSystem() self.compilation_time: Optional[float] = None self.equations: Optional[Any] = None self.use_hamiltonian_formulation: bool = False # Memory management if config.enable_memory_monitoring: gc.set_threshold(*config._gc_threshold) _perf_monitor.snapshot_memory("compiler_init") logger.debug("PhysicsCompiler initialized") def __enter__(self): """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit with cleanup""" self.cleanup() return False def _reset_system_state(self) -> None: """ Clear all per-system state so a single PhysicsCompiler instance can be reused for multiple compile_dsl() calls without leaking definitions (variables, constraints, forces, initial conditions, ...) from a previously compiled system into the next one. """ self.ast = [] self.variables = {} self.definitions = {} self.parameters_def = {} self.system_name = "unnamed_system" self.lagrangian = None self.hamiltonian = None self.transforms = {} self.constraints = [] self.non_holonomic_constraints = [] self.forces = [] self.damping_forces = [] self.rayleigh_dissipation = None self.initial_conditions = {} self.fluid_particles = [] self.boundary_particles = [] self.smoothing_length = 0.1 self.equations = None self.use_hamiltonian_formulation = False # Hamiltonian derived from a Lagrangian is cached on the instance; drop it if hasattr(self, "hamiltonian_expr"): del self.hamiltonian_expr # Import-cycle tracking is per-compilation if hasattr(self, "_imported_files"): self._imported_files = set() # The simulator accumulates via update()/append(); clear it in place so # external references to compiler.simulator stay valid. self.simulator.parameters.clear() self.simulator.initial_conditions.clear() self.simulator.equations = {} self.simulator.constraints = [] self.simulator.state_vars = [] self.simulator.coordinates = [] self.simulator.use_hamiltonian = False self.simulator.hamiltonian_equations = None
[docs] def cleanup(self) -> None: """Cleanup resources and trigger garbage collection.""" if config.enable_memory_monitoring: _perf_monitor.snapshot_memory("pre_cleanup") # Clear large caches if hasattr(self.symbolic, "_cache") and self.symbolic._cache: if isinstance(self.symbolic._cache, LRUCache): self.symbolic._cache.clear() # Clear compiled equations self.equations = None # Trigger garbage collection if config.enable_memory_monitoring: collected = gc.collect() logger.debug(f"Garbage collection: {collected} objects collected") _perf_monitor.snapshot_memory("post_cleanup")
[docs] @profile_function def compile_dsl( self, dsl_source: str, use_hamiltonian: bool = False, use_constraints: bool = True ) -> dict: """ Complete compilation pipeline with comprehensive validation. Args: dsl_source: DSL source code (must be non-empty string) use_hamiltonian: Force Hamiltonian formulation use_constraints: Apply constraint handling Returns: Compilation result dictionary with 'success' key Raises: TypeError: If dsl_source is not a string ValueError: If dsl_source is empty or invalid Example: >>> compiler = PhysicsCompiler() >>> result = compiler.compile_dsl(r"\\system{test}\\lagrangian{x^2}") >>> assert result['success'] """ # Comprehensive input validation if dsl_source is None: logger.error("compile_dsl: dsl_source is None") return {"success": False, "error": "dsl_source cannot be None", "compilation_time": 0.0} if not isinstance(dsl_source, str): error_msg = f"dsl_source must be str, got {type(dsl_source).__name__}" logger.error(f"compile_dsl: {error_msg}") raise TypeError(error_msg) dsl_source = dsl_source.strip() if not dsl_source: error_msg = "dsl_source cannot be empty" logger.error(f"compile_dsl: {error_msg}") raise ValueError(error_msg) if len(dsl_source) > 1_000_000: # 1MB limit error_msg = f"dsl_source too large ({len(dsl_source)} chars), max 1MB" logger.error(f"compile_dsl: {error_msg}") raise ValueError(error_msg) # Security validation - actually BLOCK dangerous patterns if SECURITY_AVAILABLE: try: dsl_source = validate_dsl_code(dsl_source) except InjectionError as e: logger.error(f"compile_dsl: Security violation - {e}") return { "success": False, "error": f"Security violation: {e}", "compilation_time": 0.0, } else: # Fallback: basic pattern check (warnings only) dangerous_patterns = ["__import__", "eval(", "exec(", "compile("] for pattern in dangerous_patterns: if pattern in dsl_source: logger.warning( f"compile_dsl: potentially dangerous pattern '{pattern}' detected in source" ) if not isinstance(use_hamiltonian, bool): error_msg = f"use_hamiltonian must be bool, got {type(use_hamiltonian).__name__}" logger.error(f"compile_dsl: {error_msg}") raise TypeError(error_msg) if not isinstance(use_constraints, bool): error_msg = f"use_constraints must be bool, got {type(use_constraints).__name__}" logger.error(f"compile_dsl: {error_msg}") raise TypeError(error_msg) # Reset any state left over from a previous compile_dsl() on this # instance so systems don't bleed into each other. self._reset_system_state() start_time = time.time() logger.info(f"Starting DSL compilation (source length: {len(dsl_source)} chars)") # Performance monitoring if config.enable_performance_monitoring: _perf_monitor.snapshot_memory("pre_compilation") _perf_monitor.start_timer("compilation") try: # Tokenize with error handling try: tokens = tokenize(dsl_source) if not tokens: raise ValueError("Tokenization produced no tokens") logger.info(f"Tokenized {len(tokens)} tokens") except Exception as e: logger.error(f"Tokenization failed: {e}", exc_info=True) raise ValueError(f"Tokenization failed: {e}") from e # Parse with error handling try: parser = MechanicsParser(tokens) self.ast = parser.parse() if parser.errors: logger.warning(f"Parser found {len(parser.errors)} errors") if len(parser.errors) >= config.max_parser_errors: raise ValueError(f"Too many parser errors ({len(parser.errors)})") except Exception as e: logger.error(f"Parsing failed: {e}", exc_info=True) raise ValueError(f"Parsing failed: {e}") from e # Semantic analysis with error handling try: self.analyze_semantics() self.process_fluids() except Exception as e: logger.error(f"Semantic analysis failed: {e}", exc_info=True) raise ValueError(f"Semantic analysis failed: {e}") from e # Determine formulation if self.hamiltonian is not None: use_hamiltonian = True elif use_hamiltonian and self.lagrangian is not None: coords = self.get_coordinates() L_sympy = self.symbolic.ast_to_sympy(self.lagrangian) self.hamiltonian_expr = self.symbolic.lagrangian_to_hamiltonian(L_sympy, coords) use_hamiltonian = True # Derive equations with error handling try: equations: Any = None if self.fluid_particles and self.lagrangian is None: logger.info("Fluid system detected: Skipping symbolic derivation") equations: Any = {} # No symbolic equations needed for SPH self.use_hamiltonian_formulation = False elif use_hamiltonian: equations = self.derive_hamiltonian_equations() self.use_hamiltonian_formulation = True logger.info("Using Hamiltonian formulation") # Only try this if have a Lagrangian elif self.lagrangian is not None: # Check for constraints if use_constraints and len(self.constraints) > 0: equations = self.derive_constrained_equations() logger.info( f"Using constrained Lagrangian with {len(self.constraints)} constraints" ) else: equations = self.derive_equations() logger.info("Using standard Lagrangian formulation") self.use_hamiltonian_formulation = False if equations is None: raise ValueError("Equation derivation returned None") self.equations = equations except Exception as e: logger.error(f"Equation derivation failed: {e}", exc_info=True) raise ValueError(f"Equation derivation failed: {e}") from e # Setup simulation with error handling try: self.setup_simulation(equations) except Exception as e: logger.error(f"Simulation setup failed: {e}", exc_info=True) raise ValueError(f"Simulation setup failed: {e}") from e self.compilation_time = time.time() - start_time # Performance monitoring if config.enable_performance_monitoring: _perf_monitor.stop_timer("compilation") _perf_monitor.snapshot_memory("post_compilation") # Collect diagnostics from the symbolic engine and the simulator so # callers can see when solve_for_accelerations or lambdify silently # fell back to zero, rather than treating "success" as "correct". warnings_list: List[str] = [] warnings_list.extend(getattr(self.symbolic, "last_solve_warnings", []) or []) warnings_list.extend(getattr(self.simulator, "last_compile_warnings", []) or []) result = { "success": True, "system_name": self.system_name, "coordinates": list(self.get_coordinates()), "equations": equations, "variables": self.variables, "parameters": self.simulator.parameters, "compilation_time": self.compilation_time, "ast_nodes": len(self.ast), "formulation": "Hamiltonian" if use_hamiltonian else "Lagrangian", "num_constraints": len(self.constraints) if use_constraints else 0, "warnings": warnings_list, } if warnings_list: logger.warning( f"Compilation produced {len(warnings_list)} diagnostic warning(s); " "results may be incomplete." ) logger.info(f"Compilation successful in {self.compilation_time:.4f}s") # Add performance metrics if available if config.enable_performance_monitoring: comp_stats = _perf_monitor.get_stats("compilation") if comp_stats: result["performance"] = comp_stats return result except Exception as e: error_trace = traceback.format_exc() logger.error(f"Compilation failed: {e}\n{error_trace}") return { "success": False, "error": str(e), "traceback": error_trace, "compilation_time": time.time() - start_time, }
[docs] def analyze_semantics(self): """Extract system information from AST""" logger.info("Analyzing semantics") for node in self.ast: if isinstance(node, SystemDef): self.system_name = node.name logger.debug(f"System name: {self.system_name}") elif isinstance(node, VarDef): self.variables[node.name] = { "type": node.vartype, "unit": node.unit, "vector": node.vector, } logger.debug(f"Variable: {node.name} ({node.vartype})") elif isinstance(node, ParameterDef): self.parameters_def[node.name] = {"value": node.value, "unit": node.unit} logger.debug(f"Parameter: {node.name} = {node.value}") elif isinstance(node, DefineDef): self.definitions[node.name] = {"args": node.args, "body": node.body} logger.debug(f"Definition: {node.name}") elif isinstance(node, LagrangianDef): self.lagrangian = node.expr logger.debug("Lagrangian defined") elif isinstance(node, HamiltonianDef): self.hamiltonian = node.expr logger.debug("Hamiltonian defined") elif isinstance(node, TransformDef): self.transforms[node.var] = {"type": node.coord_type, "expression": node.expr} logger.debug(f"Transform: {node.var}") elif isinstance(node, ConstraintDef): self.constraints.append(node.expr) logger.debug("Holonomic constraint added") elif isinstance(node, NonHolonomicConstraintDef): self.non_holonomic_constraints.append(node.expr) logger.debug("Non-holonomic constraint added") elif isinstance(node, ForceDef): self.forces.append((node.coordinate, node.expr)) logger.debug(f"Force added: {node.force_type} (coord={node.coordinate})") elif isinstance(node, DampingDef): self.damping_forces.append(node.expr) logger.debug("Damping force added") elif isinstance(node, RayleighDef): self.rayleigh_dissipation = node.expr logger.debug("Rayleigh dissipation function defined") elif isinstance(node, InitialCondition): self.initial_conditions.update(node.conditions) logger.debug(f"Initial conditions: {node.conditions}") elif isinstance(node, ImportDef): # Process file import - recursively parse imported file self._process_import(node.filename)
def _process_import(self, filename: str) -> None: """ Process an import directive by parsing the imported file. Handles \\import{file.mdsl} by reading and parsing the referenced file, then incorporating its definitions into the current compilation context. Args: filename: Path to the file to import (relative or absolute) Security: - Validates file path using validate_file_path - Only allows .mdsl and .txt extensions - Tracks imported files to prevent cycles """ # Initialize import tracking if needed if not hasattr(self, "_imported_files"): self._imported_files: set = set() # Normalize and validate path try: # Handle relative paths if not os.path.isabs(filename): # Try current directory first if os.path.exists(filename): filepath = os.path.abspath(filename) else: # Log warning and skip logger.warning(f"Import file not found: {filename}") return else: filepath = filename # Validate file path security validate_file_path(filepath, must_exist=True) # Check extension if not filepath.endswith((".mdsl", ".txt")): logger.warning(f"Import file has unsupported extension: {filename}") return # Cycle detection abs_path = os.path.abspath(filepath) if abs_path in self._imported_files: logger.warning(f"Circular import detected, skipping: {filename}") return self._imported_files.add(abs_path) logger.info(f"Processing import: {filename}") # Read file with open(filepath, "r", encoding="utf-8") as f: imported_source = f.read() # Tokenize and parse tokens = tokenize(imported_source) parser = MechanicsParser(tokens) imported_ast = parser.parse() # Insert imported AST nodes for semantic analysis # (but don't process ImportDefs from imported files to avoid deep recursion) for node in imported_ast: if isinstance(node, ImportDef): # Recursive import self._process_import(node.filename) else: # Process directly (duplicates analyze_semantics logic) self._process_imported_node(node) logger.debug(f"Imported {len(imported_ast)} nodes from {filename}") except FileNotFoundError: logger.warning(f"Import file not found: {filename}") except (ValueError, PermissionError) as e: logger.warning(f"Import file validation failed: {filename} - {e}") except Exception as e: logger.error(f"Failed to process import {filename}: {e}") def _process_imported_node(self, node: ASTNode) -> None: """Process a single AST node from an imported file.""" if isinstance(node, SystemDef): # Don't override main system name from imports logger.debug(f"Skipping system def from import: {node.name}") elif isinstance(node, VarDef): self.variables[node.name] = { "type": node.vartype, "unit": node.unit, "vector": node.vector, } elif isinstance(node, ParameterDef): self.parameters_def[node.name] = {"value": node.value, "unit": node.unit} elif isinstance(node, DefineDef): self.definitions[node.name] = {"args": node.args, "body": node.body} elif isinstance(node, LagrangianDef): # Imported Lagrangians can extend/override (last one wins) self.lagrangian = node.expr elif isinstance(node, HamiltonianDef): self.hamiltonian = node.expr elif isinstance(node, TransformDef): self.transforms[node.var] = {"type": node.coord_type, "expression": node.expr} elif isinstance(node, ConstraintDef): self.constraints.append(node.expr) elif isinstance(node, NonHolonomicConstraintDef): self.non_holonomic_constraints.append(node.expr) elif isinstance(node, ForceDef): self.forces.append((node.coordinate, node.expr)) elif isinstance(node, DampingDef): self.damping_forces.append(node.expr) elif isinstance(node, RayleighDef): self.rayleigh_dissipation = node.expr elif isinstance(node, InitialCondition): self.initial_conditions.update(node.conditions)
[docs] def get_coordinates(self) -> List[str]: """ Extract generalized coordinates (exclude constants). Uses the registry module's is_likely_coordinate() function to determine which variables are dynamic coordinates vs. constants/parameters. Returns: List of coordinate variable names """ coordinates = [] for var_name, var_info in self.variables.items(): # Use registry-based classification if is_likely_coordinate(var_name, var_info["type"]): coordinates.append(var_name) logger.debug(f"Coordinates: {coordinates}") return coordinates
[docs] def process_fluids(self): """Generate particles for all fluid and boundary definitions""" # 1. Try to find smoothing length 'h' in parameters # This determines particle spacing for param_name, param_info in self.parameters_def.items(): if param_name in ["h", "spacing", "dx"]: self.smoothing_length = param_info["value"] logger.info(f"Using particle spacing h={self.smoothing_length}") break for node in self.ast: if isinstance(node, FluidDef): logger.info(f"Generating fluid '{node.name}' in {node.region.shape}") # Use ParticleGenerator coords = ParticleGenerator.generate(node.region, self.smoothing_length) for x, y in coords: self.fluid_particles.append( {"x": x, "y": y, "vx": 0.0, "vy": 0.0, "mass": node.mass, "type": "fluid"} ) logger.info(f"Generated {len(coords)} fluid particles") elif isinstance(node, BoundaryDef): logger.info(f"Generating boundary '{node.name}'") # Boundaries are often denser (0.5 * h) to prevent leakage coords = ParticleGenerator.generate(node.region, self.smoothing_length) for x, y in coords: self.boundary_particles.append( { "x": x, "y": y, "vx": 0.0, "vy": 0.0, "mass": 1000.0, # Infinite mass essentially "type": "boundary", } )
[docs] def derive_equations(self) -> Dict[str, sp.Expr]: """Derive equations using Lagrangian formulation (Patched for Forces)""" if self.lagrangian is None: raise ValueError("No Lagrangian defined") L_sympy = self.symbolic.ast_to_sympy(self.lagrangian) coordinates = self.get_coordinates() if not coordinates: raise ValueError("No generalized coordinates found") # 1. Derive Standard LHS: d/dt(dL/dq_dot) - dL/dq eq_list = self.symbolic.derive_equations_of_motion(L_sympy, coordinates) # 2. Apply Non-Conservative Forces (LHS - Q = 0) # Note: Don't expand yet - keep derivative structure for acceleration extraction if self.forces: logger.info(f"Applying {len(self.forces)} non-conservative forces") # Untargeted forces are applied positionally in declaration order. positional_idx = 0 for coord_name, force_ast in self.forces: F_sym = self.symbolic.ast_to_sympy(force_ast) if coord_name is not None: # Force explicitly targets a named coordinate. if coord_name in coordinates: idx = coordinates.index(coord_name) eq_list[idx] = eq_list[idx] - F_sym else: logger.warning( f"Force targets unknown coordinate '{coord_name}'; " f"known coordinates: {coordinates}. Skipping." ) continue # Legacy positional application. if len(coordinates) > 1 and len(self.forces) == 1: logger.warning( "Applying an untargeted \\force to the first coordinate of a " "multi-DOF system. Use \\force{coord}{expr} to target a " "specific coordinate." ) if positional_idx < len(eq_list): # Subtract Force but don't expand yet - preserve derivative structure eq_list[positional_idx] = eq_list[positional_idx] - F_sym positional_idx += 1 # 3. Apply Rayleigh Dissipation: Q_i = -∂F/∂q̇_i # The dissipative generalized force is the negative partial derivative of # the Rayleigh dissipation function F with respect to the generalized velocity if self.rayleigh_dissipation is not None: logger.info("Applying Rayleigh dissipation function") F_dissip = self.symbolic.ast_to_sympy(self.rayleigh_dissipation) for i, q in enumerate(coordinates): q_dot_sym = self.symbolic.get_symbol(f"{q}_dot") # Dissipative force: Q_i = -∂F/∂q̇_i Q_dissip = -sp.diff(F_dissip, q_dot_sym) if Q_dissip != 0: logger.debug(f"Dissipation force for {q}: {Q_dissip}") # Add to equation: EL_i + Q_dissip = 0 (dissipation opposes motion) eq_list[i] = eq_list[i] - Q_dissip # 4. Solve for accelerations (this will handle derivative replacement) accelerations = self.symbolic.solve_for_accelerations(eq_list, coordinates) return accelerations
[docs] def derive_constrained_equations(self) -> Dict[str, sp.Expr]: """Derive equations with constraints using Lagrange multipliers""" if self.lagrangian is None: raise ValueError("No Lagrangian defined") if not self.constraints: logger.warning("No constraints defined, using standard formulation") return self.derive_equations() L_sympy = self.symbolic.ast_to_sympy(self.lagrangian) coordinates = self.get_coordinates() if not coordinates: raise ValueError("No generalized coordinates found") # Convert constraint expressions to SymPy constraint_exprs = [self.symbolic.ast_to_sympy(c) for c in self.constraints] # Derive constrained equations eq_list, extended_coords = self.symbolic.derive_equations_with_constraints( L_sympy, coordinates, constraint_exprs ) # Solve for accelerations and lambda multipliers accelerations = self.symbolic.solve_for_accelerations(eq_list, extended_coords) # Filter out only the coordinate accelerations (not lambda derivatives) coord_accelerations = { k: v for k, v in accelerations.items() if any(k.startswith(f"{c}_ddot") for c in coordinates) } return coord_accelerations
[docs] def derive_hamiltonian_equations(self) -> Tuple[List[sp.Expr], List[sp.Expr]]: """Derive equations using Hamiltonian formulation""" if self.hamiltonian is not None: H_sympy = self.symbolic.ast_to_sympy(self.hamiltonian) elif hasattr(self, "hamiltonian_expr"): H_sympy = self.hamiltonian_expr else: raise ValueError("No Hamiltonian defined or derived") coordinates = self.get_coordinates() if not coordinates: raise ValueError("No generalized coordinates found") q_dots, p_dots = self.symbolic.derive_hamiltonian_equations(H_sympy, coordinates) return (q_dots, p_dots)
[docs] def setup_simulation(self, equations): """Configure numerical simulator""" logger.info("Setting up simulation") # Collect parameters parameters = {} for param_name, param_info in self.parameters_def.items(): parameters[param_name] = param_info["value"] # Add default parameters for var_name, var_info in self.variables.items(): if var_info["type"] in ["Real", "Mass", "Length", "Acceleration", "Spring Constant"]: if var_name not in parameters: defaults = { "g": 9.81, "m": 1.0, "m1": 1.0, "m2": 1.0, "l": 1.0, "l1": 1.0, "l2": 1.0, "k": 1.0, } parameters[var_name] = defaults.get(var_name, 1.0) self.simulator.set_parameters(parameters) self.simulator.set_initial_conditions(self.initial_conditions) coordinates = self.get_coordinates() if self.use_hamiltonian_formulation: q_dots, p_dots = equations self.simulator.compile_hamiltonian_equations(q_dots, p_dots, coordinates) else: self.simulator.compile_equations(equations, coordinates)
[docs] def simulate( self, t_span: Tuple[float, float] = (0, 10), num_points: int = 1000, **kwargs ) -> dict: """Run numerical simulation""" return self.simulator.simulate(t_span, num_points, **kwargs)
[docs] def animate(self, solution: dict, show: bool = True): """Create animation from solution""" parameters = self.simulator.parameters anim = self.visualizer.animate(solution, parameters, self.system_name) if show and anim is not None: plt.show() return anim
[docs] def export_animation(self, solution: dict, filename: str, fps: int = 30, dpi: int = 100) -> str: """Export animation to file""" anim = self.animate(solution, show=False) if anim is None: raise RuntimeError("No animation available") ok = self.visualizer.save_animation_to_file(anim, filename, fps, dpi) if not ok: raise RuntimeError(f"Failed to save animation to {filename}") return filename
[docs] def plot_energy(self, solution: dict): """Plot energy analysis""" self.visualizer.plot_energy( solution, self.simulator.parameters, self.system_name, self.lagrangian )
[docs] def plot_phase_space(self, solution: dict, coordinate_index: int = 0): """Plot phase space""" self.visualizer.plot_phase_space(solution, coordinate_index)
[docs] def print_equations(self): """Print derived equations""" if self.equations is None: print("No equations derived yet.") return print(f"\n{'='*70}") print(f"Equations of Motion: {self.system_name}") print(f"Formulation: {'Hamiltonian' if self.use_hamiltonian_formulation else 'Lagrangian'}") print(f"{'='*70}\n") if self.use_hamiltonian_formulation: q_dots, p_dots = self.equations coords = self.get_coordinates() for i, q in enumerate(coords): print(f"d{q}/dt = {q_dots[i]}") print(f"dp_{q}/dt = {p_dots[i]}\n") else: for coord in self.get_coordinates(): accel_key = f"{coord}_ddot" if accel_key in self.equations: eq = self.equations[accel_key] print(f"{accel_key} = {eq}\n") print(f"{'='*70}\n")
[docs] def get_info(self) -> dict: """Get comprehensive system information""" return { "system_name": self.system_name, "coordinates": self.get_coordinates(), "variables": self.variables, "parameters": self.simulator.parameters, "initial_conditions": self.initial_conditions, "has_lagrangian": self.lagrangian is not None, "has_hamiltonian": self.hamiltonian is not None, "num_constraints": len(self.constraints), "compilation_time": self.compilation_time, "formulation": "Hamiltonian" if self.use_hamiltonian_formulation else "Lagrangian", }
[docs] def export_system(self, filename: str, format: str = "json") -> bool: """Export system state to file""" return SystemSerializer.export_system(self, filename, format)
# Mapping of target language -> code generator class, populated lazily # so that importing PhysicsCompiler doesn't pay the cost of pulling in # every backend. _GENERATOR_REGISTRY: Dict[str, str] = { "cpp": "CppGenerator", "python": "PythonGenerator", "rust": "RustGenerator", "julia": "JuliaGenerator", "matlab": "MatlabGenerator", "fortran": "FortranGenerator", "javascript": "JavaScriptGenerator", "cuda": "CudaGenerator", "openmp": "OpenMPGenerator", "wasm": "WasmGenerator", "arduino": "ArduinoGenerator", "arm": "ARMGenerator", }
[docs] def export(self, target: str, filename: str) -> str: """ Generate standalone simulation code for a target language. Args: target: Target language identifier (see _GENERATOR_REGISTRY for the list, e.g. 'cpp', 'python', 'rust', 'julia', 'cuda', ...). filename: Output file path for the generated source. Returns: Path to the generated file. Raises: ValueError: If target is not supported or the compiler has no equations yet. IOError: If the file cannot be written. """ target_key = (target or "").lower().strip() if target_key not in self._GENERATOR_REGISTRY: allowed = ", ".join(sorted(self._GENERATOR_REGISTRY)) raise ValueError(f"Unsupported export target '{target}'. Allowed: {allowed}") if self.equations is None: raise ValueError("No equations derived. Call compile_dsl() first.") # Lazy import - keeps PhysicsCompiler import cheap. from . import codegen as _codegen generator_cls = getattr(_codegen, self._GENERATOR_REGISTRY[target_key]) lagrangian_sympy = self.symbolic.ast_to_sympy(self.lagrangian) if self.lagrangian else None hamiltonian_sympy = ( self.symbolic.ast_to_sympy(self.hamiltonian) if self.hamiltonian else None ) generator = generator_cls( system_name=self.system_name, coordinates=self.get_coordinates(), parameters=self.simulator.parameters, initial_conditions=self.initial_conditions, equations=self.equations, lagrangian=lagrangian_sympy, hamiltonian=hamiltonian_sympy, ) return generator.generate(filename)
[docs] @staticmethod def import_system(filename: str, allow_pickle: bool = False) -> Optional["PhysicsCompiler"]: """ Import system state from file. See ``SystemSerializer.import_system`` for the semantics of ``allow_pickle``; pickle loading is refused by default. """ state = SystemSerializer.import_system(filename, allow_pickle=allow_pickle) if state is None: return None # Note: This creates a new compiler but doesn't fully reconstruct the equations # For full reconstruction, you'd need to re-compile the DSL source compiler = PhysicsCompiler() compiler.system_name = state.get("system_name", "imported_system") compiler.variables = state.get("variables", {}) compiler.parameters_def = state.get("parameters", {}) compiler.initial_conditions = state.get("initial_conditions", {}) logger.info(f"Imported system: {compiler.system_name}") logger.warning( "Note: Equations not reconstructed. Re-compile DSL source for full functionality." ) return compiler
[docs] def compile_to_cpp( self, filename: str = "simulation.cpp", target: str = "standard", compile_binary: bool = True, ) -> bool: """ Generate C++ code for multiple targets. Args: filename: Output filename target: 'standard', 'raylib', 'arduino', 'wasm', 'openmp', 'python' compile_binary: Whether to run the compiler (g++, emcc, etc.) """ if self.equations is None: logger.error("No equations derived. Compile DSL first.") return False try: generator = CppGenerator( system_name=self.system_name, coordinates=self.get_coordinates(), parameters=self.simulator.parameters, initial_conditions=self.initial_conditions, equations=self.equations, lagrangian=self.symbolic.ast_to_sympy(self.lagrangian) if self.lagrangian else None, fluid_particles=self.fluid_particles, boundary_particles=self.boundary_particles, ) # For Arduino, ensure extension is .ino if target == "arduino" and not filename.endswith(".ino"): filename = os.path.splitext(filename)[0] + ".ino" source_file = generator.generate(filename) # Ensure the source path can't be reinterpreted as a compiler flag # if the caller-supplied filename starts with '-' (argv injection # via a relative path like "-Wl,..."). if not os.path.isabs(source_file): source_file = os.path.abspath(source_file) if compile_binary and target != "arduino": # Arduino compilation typically requires IDE binary_name = os.path.splitext(source_file)[0] if os.name == "nt" and target != "wasm": binary_name += ".exe" elif target == "wasm": binary_name += ".js" # Emscripten outputs JS+Wasm elif target == "python": # Python extensions need specific suffixes like .cpython-38-x86_64-linux-gnu.so # We let setup.py handle this usually, but here is a simple attempt: binary_name += ( subprocess.check_output( [ sys.executable, "-c", "import sysconfig; print(sysconfig.get_config_var('EXT_SUFFIX'))", ] ) .decode() .strip() ) cmd = [] if target == "standard": cmd = ["g++", "-O3", source_file, "-o", binary_name] elif target == "raylib": # Assumes raylib is installed in standard paths cmd = [ "g++", "-O3", source_file, "-o", binary_name, "-lraylib", "-lm", "-lpthread", "-ldl", "-lrt", "-lX11", ] elif target == "openmp": cmd = ["g++", "-O3", "-fopenmp", source_file, "-o", binary_name] elif target == "wasm": # Needs emcc in PATH cmd = [ "emcc", source_file, "-o", binary_name, "-s", "EXPORTED_FUNCTIONS=['_init','_step','_get_state','_get_time']", "-s", "EXPORTED_RUNTIME_METHODS=['ccall','cwrap']", "-O3", ] elif target == "python": # Needs pybind11 headers includes = ( subprocess.check_output([sys.executable, "-m", "pybind11", "--includes"]) .decode() .strip() .split() ) cmd = ( ["g++", "-O3", "-shared", "-fPIC"] + includes + [source_file, "-o", binary_name] ) logger.info(f"Compiling: {' '.join(cmd)}") subprocess.check_call(cmd) logger.info(f"Successfully compiled: {binary_name}") elif target == "arduino": logger.info( f"Arduino sketch generated: {source_file}. Open in Arduino IDE to compile." ) return True except subprocess.CalledProcessError as e: logger.error(f"Compilation failed: {e}") return False except Exception as e: logger.error(f"Generation failed: {e}") return False