Source code for epimodels.validation.symbolic

"""
Symbolic model representation and analysis using SymPy.

Provides symbolic computation for:
- Parameter and variable symbols with assumptions
- R0 (basic reproduction number) calculation
- Equilibrium point analysis
- Stability analysis
"""

from typing import Any
from collections import OrderedDict

try:
    from sympy import (
        Symbol,
        symbols,
        simplify,
        solve,
        diff,
        Matrix,
        sympify,
        latex,
        Eq,
        Function,
        Derivative,
        sqrt,
        Abs,
        re,
        im,
        I,
        S,
        nsolve,
    )
    from sympy.core.assumptions import check_assumptions
    import numpy as np

    SYMPY_AVAILABLE = True
except ImportError:
    SYMPY_AVAILABLE = False
    Symbol = None
    symbols = None


[docs] class SymbolicModel: """ Symbolic representation of an epidemiological model. Enables symbolic analysis of model properties: - Compute R0 using next-generation matrix method - Find equilibrium points - Analyze stability Example: >>> model = SymbolicModel() >>> model.add_parameter("beta", positive=True) >>> model.add_parameter("gamma", positive=True) >>> model.add_variable("S", positive=True) >>> model.add_variable("I", positive=True) >>> model.add_variable("R", positive=True) >>> model.set_total_population("N") >>> model.define_ode("S", "-beta*S*I/N") >>> model.define_ode("I", "beta*S*I/N - gamma*I") >>> model.define_ode("R", "gamma*I") >>> R0 = model.compute_R0_next_generation() """ def __init__(self): if not SYMPY_AVAILABLE: raise ImportError( "SymPy is required for symbolic analysis. Install with: pip install sympy" ) self.parameters: dict[str, Symbol] = OrderedDict() self.variables: dict[str, Symbol] = OrderedDict() self.total_population: Symbol | None = None self.odes: dict[Symbol, Any] = OrderedDict() self.difference_equations: dict[Symbol, Any] = OrderedDict() self._is_discrete = False
[docs] def add_parameter( self, name: str, positive: bool = False, negative: bool = False, real: bool = True, **assumptions, ) -> Symbol: """ Add a parameter as a symbolic variable. Args: name: Parameter name positive: Whether parameter is positive negative: Whether parameter is negative real: Whether parameter is real (vs complex) **assumptions: Additional SymPy assumptions Returns: SymPy Symbol object """ assumption_dict = {"real": real, "positive": positive, "negative": negative, **assumptions} self.parameters[name] = symbols(name, **assumption_dict) return self.parameters[name]
[docs] def add_variable( self, name: str, positive: bool = False, negative: bool = False, real: bool = True, **assumptions, ) -> Symbol: """ Add a state variable as a symbolic variable. Args: name: Variable name positive: Whether variable is positive negative: Whether variable is negative real: Whether variable is real **assumptions: Additional SymPy assumptions Returns: SymPy Symbol object """ assumption_dict = {"real": real, "positive": positive, "negative": negative, **assumptions} self.variables[name] = symbols(name, **assumption_dict) return self.variables[name]
[docs] def set_total_population(self, name: str = "N") -> Symbol: """ Set the symbol representing total population. Args: name: Symbol name for total population (default: "N") Returns: SymPy Symbol for total population """ self.total_population = symbols(name, positive=True, real=True) return self.total_population
[docs] def define_ode(self, variable: str, rhs: str) -> None: """ Define an ODE for a state variable. Args: variable: Variable name (e.g., "S", "I", "R") rhs: Right-hand side expression as string (e.g., "-beta*S*I/N") Example: >>> model.define_ode("S", "-beta*S*I/N") """ if variable not in self.variables: raise ValueError(f"Unknown variable: {variable}") context = self._get_context() expr = sympify(rhs, locals=context) self.odes[self.variables[variable]] = expr
[docs] def define_difference_equation(self, variable: str, rhs: str) -> None: """ Define a difference equation for a discrete-time model. Args: variable: Variable name rhs: Right-hand side expression Example: >>> model.define_difference_equation("S", "S - beta*S*I/N + gamma*I") """ if variable not in self.variables: raise ValueError(f"Unknown variable: {variable}") context = self._get_context() expr = sympify(rhs, locals=context) self.difference_equations[self.variables[variable]] = expr self._is_discrete = True
[docs] def compute_R0_next_generation(self) -> Any: """ Compute basic reproduction number R0 using next-generation matrix method. This method: 1. Identifies infected compartments 2. Computes new infections matrix F and transitions matrix V 3. Computes R0 as spectral radius of F*V^(-1) evaluated at DFE The next-generation matrix method: - For dI/dt = F - V (F = new infections, V = transitions out) - Compute Jacobians: dF = ∂F/∂X, dV = ∂V/∂X (where X = infected compartments) - Evaluate at DFE (disease-free equilibrium) - R0 = spectral_radius(dF * dV^(-1)) Returns: Symbolic expression for R0 (evaluated at DFE) Note: For single infected compartment: R0 = (∂F/∂I)|DFE / (∂V/∂I)|DFE """ infected_vars = self._identify_infected_compartments() if not infected_vars: raise ValueError("Cannot identify infected compartments") # Get disease-free equilibrium for evaluation dfe = self.find_disease_free_equilibrium() if len(infected_vars) == 1: I_name = infected_vars[0] I_sym = self.variables[I_name] dI_dt = self.odes.get(I_sym) if dI_dt is None: raise ValueError(f"No ODE defined for {I_name}") # Decompose: dI/dt = F - V F, V = self._decompose_infection_terms(dI_dt) if F == 0: raise ValueError("Cannot identify new infection terms") # Compute partial derivatives with respect to infected compartment # This linearizes F and V around the equilibrium dF_dI = diff(F, I_sym) if hasattr(F, "diff") else 0 dV_dI = diff(V, I_sym) if hasattr(V, "diff") else 0 # Substitute DFE values (S=N, I=0, E=0, etc.) subs_dict = {} for var_name, var_sym in self.variables.items(): dfe_val = dfe.get(var_name, 0) subs_dict[var_sym] = dfe_val # Evaluate derivatives at DFE dF_at_dfe = dF_dI.subs(subs_dict) if hasattr(dF_dI, "subs") else dF_dI dV_at_dfe = dV_dI.subs(subs_dict) if hasattr(dV_dI, "subs") else dV_dI # R0 = dF/dI / dV/dI at DFE if dV_at_dfe != 0: R0 = simplify(dF_at_dfe / dV_at_dfe) else: R0 = simplify(dF_at_dfe) return R0 else: return self._compute_R0_multivariate(infected_vars, dfe)
def _identify_infected_compartments(self) -> list: """ Identify infected compartments based on naming conventions. Looks for variables with names containing: - 'I' (infectious) - 'E' (exposed) - 'A' (asymptomatic) """ infected = [] priority_order = ["I", "E", "A"] for name, var in self.variables.items(): if name.startswith("I") or "Infectious" in name: infected.append(name) if not infected: for name, var in self.variables.items(): if name.startswith("E") or "Exposed" in name: infected.append(name) if not infected: for name, var in self.variables.items(): if name.startswith("A") or "Asymptomatic" in name: infected.append(name) return infected def _decompose_infection_terms(self, ode_rhs: Any) -> tuple[Any, Any]: """ Decompose ODE right-hand side into new infections (F) and transitions (V). Convention: dI/dt = F - V - F: rate of new infections entering compartment (positive terms from transmission) - V: rate of transfer out of compartment (recovery, death, progression, etc.) For example, for dI/dt = beta*S*I/N - gamma*I - mu*I: - F = beta*S*I/N (new infections) - V = (gamma + mu)*I (transitions out: recovery + death) Returns: Tuple of (F, V) where dI/dt = F - V """ if ode_rhs is None: return 0, 0 rhs = sympify(ode_rhs) if not hasattr(rhs, "as_ordered_terms"): return rhs, 0 terms = rhs.as_ordered_terms() F_terms = [] V_terms = [] # Identify the infected variable this ODE is for infected_vars = self._identify_infected_compartments() for term in terms: term_str = str(term) # A term is a "new infection" if it represents influx from transmission # This typically has the pattern: transmission_param * S * I / N is_new_infection = False # Get the coefficient and check the term structure coeff, rest = term.as_coeff_Mul() # Check if term contains a transmission parameter multiplied by state variables has_transmission_param = any(str(p) in term_str for p in self.parameters.values()) has_susceptible = any(v.startswith("S") for v in self.variables.keys() if v in term_str) has_population = self.total_population and str(self.total_population) in term_str # New infections come FROM transmission (beta*S*I pattern), not from transitions # Terms like -gamma*I or -mu*I are transitions OUT, not new infections if has_transmission_param and (has_susceptible or has_population): # This looks like a transmission term if coeff > 0: # Positive transmission term: new infections (e.g., +beta*S*I/N) is_new_infection = True F_terms.append(term) else: # Negative transmission term: unusual but could be loss to infection V_terms.append(-term) else: # All other terms are transitions (recovery, death, progression) if coeff > 0: # Positive transition (unusual): goes to V as subtraction V_terms.append(term) else: # Negative transition (normal): e.g., -gamma*I becomes +gamma*I in V V_terms.append(-term) F = sum(F_terms) if F_terms else 0 V = sum(V_terms) if V_terms else 0 return sympify(F), sympify(V) def _compute_R0_multivariate( self, infected_vars: list, dfe: dict[str, Any] | None = None ) -> Any: """ Compute R0 for models with multiple infected compartments. Uses next-generation matrix method with evaluation at DFE. Args: infected_vars: List of infected compartment names dfe: Disease-free equilibrium values (optional, computed if not provided) """ if dfe is None: dfe = self.find_disease_free_equilibrium() n = len(infected_vars) x = [self.variables[name] for name in infected_vars] F_matrix = [] V_matrix = [] for i, var_name in enumerate(infected_vars): var = self.variables[var_name] ode = self.odes.get(var) if ode is None: F_row = [0] * n V_row = [0] * n else: F, V = self._decompose_infection_terms(ode) F_row = [] V_row = [] for j, x_j in enumerate(x): if hasattr(F, "diff"): F_ij = diff(F, x_j) else: F_ij = 0 F_row.append(F_ij) if hasattr(V, "diff"): V_ij = diff(V, x_j) else: V_ij = 0 V_row.append(V_ij) F_matrix.append(F_row) V_matrix.append(V_row) F_mat = Matrix(F_matrix) V_mat = Matrix(V_matrix) # Substitute DFE values into matrices subs_dict = {} for var_name, var_sym in self.variables.items(): dfe_val = dfe.get(var_name, 0) subs_dict[var_sym] = dfe_val F_mat = F_mat.subs(subs_dict) V_mat = V_mat.subs(subs_dict) try: V_inv = V_mat.inv() K = F_mat * V_inv eigenvalues = K.eigenvals() if eigenvalues: max_eigenvalue = max(eigenvalues.keys(), key=lambda e: Abs(complex(e.evalf()))) return simplify(max_eigenvalue) else: return None except Exception: return None
[docs] def find_disease_free_equilibrium(self) -> dict[str, Any]: """ Find the disease-free equilibrium (DFE) of the model. At DFE: - All infected compartments are zero - Susceptible compartment equals total population Returns: Dictionary mapping variable names to equilibrium values """ equilibrium = {} infected = self._identify_infected_compartments() for var_name in self.variables: if var_name in infected: equilibrium[var_name] = 0 elif var_name.startswith("S") or "Susceptible" in var_name: equilibrium[var_name] = self.total_population if self.total_population else 1 else: equilibrium[var_name] = 0 return equilibrium
[docs] def find_all_equilibria( self, params: dict[str, float] | None = None, numeric_fallback: bool = True, max_solutions: int = 10, ) -> list[dict[str, Any]]: """ Find all equilibrium points of the model. Solves the system of equations f(x) = 0 where f is the vector field. Includes both disease-free and endemic equilibria. Args: params: Parameter values for numeric solving (optional) numeric_fallback: If True, use numerical solver when symbolic fails max_solutions: Maximum number of solutions to return Returns: List of equilibrium dictionaries. Each dictionary contains: - Variable names mapped to equilibrium values - 'type': 'dfe' or 'endemic' - 'method': 'symbolic' or 'numeric' Example: >>> model.find_all_equilibria(params={'beta': 0.3, 'gamma': 0.1, 'N': 1000}) [ {'S': N, 'I': 0, 'R': 0, 'type': 'dfe', 'method': 'symbolic'}, {'S': N*gamma/beta, 'I': ..., 'R': ..., 'type': 'endemic', 'method': 'symbolic'} ] """ equilibria = [] # 1. Always include DFE dfe = self.find_disease_free_equilibrium() dfe["type"] = "dfe" dfe["method"] = "analytical" equilibria.append(dfe) # 2. Try symbolic solving for all equilibria symbolic_eqs = self._find_equilibria_symbolic(max_solutions) for eq in symbolic_eqs: if not self._is_equilibrium_duplicate(eq, equilibria): eq_type = self._classify_equilibrium(eq) eq["type"] = eq_type eq["method"] = "symbolic" equilibria.append(eq) # 3. If requested and params provided, try numeric solving if numeric_fallback and params and len(equilibria) < max_solutions: numeric_eqs = self._find_equilibria_numeric(params, max_solutions - len(equilibria)) for eq in numeric_eqs: if not self._is_equilibrium_duplicate(eq, equilibria): eq_type = self._classify_equilibrium(eq) eq["type"] = eq_type eq["method"] = "numeric" equilibria.append(eq) return equilibria[:max_solutions]
[docs] def find_endemic_equilibrium( self, params: dict[str, float] | None = None, numeric_fallback: bool = True ) -> dict[str, Any] | None: """ Find endemic equilibrium point. At endemic equilibrium, disease persists (I* > 0). Only exists when R0 > 1. Args: params: Parameter values for numeric solving and R0 calculation numeric_fallback: If True, use numerical solver when symbolic fails Returns: Equilibrium dictionary or None if no endemic equilibrium exists Example: >>> model.find_endemic_equilibrium({'beta': 0.3, 'gamma': 0.1, 'N': 1000}) {'S': 333.33, 'I': 666.67, 'R': 0, 'type': 'endemic', 'method': 'symbolic'} """ # Check if endemic equilibrium exists (R0 > 1) if params: try: R0_expr = self.compute_R0_next_generation() R0_val = float(self.substitute_values(R0_expr, params)) if R0_val <= 1: return None # No endemic equilibrium when R0 <= 1 except Exception: pass # Find all equilibria and filter for endemic equilibria = self.find_all_equilibria(params, numeric_fallback) for eq in equilibria: if eq.get("type") == "endemic": return eq return None
def _find_equilibria_symbolic(self, max_solutions: int) -> list[dict[str, Any]]: """ Find equilibria using symbolic solving. Args: max_solutions: Maximum number of solutions to find Returns: List of equilibrium dictionaries """ equilibria = [] if not self.odes: return equilibria try: # Build system of equations: dx/dt = 0 for all variables equations = [] for var_sym, ode_rhs in self.odes.items(): equations.append(Eq(ode_rhs, 0)) # Solve the system variables_list = list(self.variables.values()) solutions = solve(equations, variables_list, dict=True) # Handle single solution case if solutions and not isinstance(solutions, list): solutions = [solutions] for solution in solutions[:max_solutions]: eq = {} # Extract values for each variable for var_name, var_sym in self.variables.items(): if var_sym in solution: eq[var_name] = solution[var_sym] else: eq[var_name] = var_sym # Validate the equilibrium if self._validate_equilibrium(eq): # Check for duplicates before adding if not self._is_equilibrium_duplicate(eq, equilibria): equilibria.append(eq) except Exception as e: # Symbolic solving failed, will fall back to numeric pass # Only try analytical method if no endemic equilibrium found yet has_endemic = any(self._classify_equilibrium(eq) == "endemic" for eq in equilibria) if not has_endemic: # Try to find endemic equilibrium by solving with I != 0 constraint # This helps find solutions that SymPy might miss endemic_eq = self._find_endemic_equilibrium_analytical() if endemic_eq and not self._is_equilibrium_duplicate(endemic_eq, equilibria): equilibria.append(endemic_eq) return equilibria if endemic_eq and not self._is_equilibrium_duplicate(endemic_eq, equilibria): equilibria.append(endemic_eq) return equilibria def _find_endemic_equilibrium_analytical(self) -> dict[str, Any] | None: """ Try to find endemic equilibrium using analytical approach. For common model structures like SIR with vital dynamics, we can derive the endemic equilibrium analytically. Returns: Endemic equilibrium dictionary or None """ try: # Try solving with constraint that infected compartments are non-zero # For SIR with vital dynamics: # dS/dt = mu*N - beta*S*I/N - mu*S = 0 # dI/dt = beta*S*I/N - (gamma + mu)*I = 0 # dR/dt = gamma*I - mu*R = 0 # # From dI/dt = 0 with I != 0: S = N*(gamma + mu)/beta = N/R0 # From dS/dt = 0: I = (mu*N/gamma)*(R0 - 1) # From dR/dt = 0: R = gamma*I/mu infected_vars = self._identify_infected_compartments() if not infected_vars: return None # Get the susceptible variable susceptibles = [v for v in self.variables.keys() if v.startswith("S")] if not susceptibles: return None S_name = susceptibles[0] I_name = infected_vars[0] # Get the ODE for I I_sym = self.variables[I_name] dI_dt = self.odes.get(I_sym) if dI_dt is None: return None # Get DFE for reference dfe = self.find_disease_free_equilibrium() # Try to extract the endemic equilibrium form # For dI/dt = beta*S*I/N - (gamma + mu)*I = 0 # The endemic equilibrium satisfies: S = N*(gamma + mu)/beta # Compute R0 symbolically at DFE R0_expr = self.compute_R0_next_generation() # The endemic equilibrium has S = N/R0 # This means S = N * (gamma + mu) / beta for SIR with vital dynamics # Try to solve for endemic equilibrium S_sym = self.variables[S_name] dS_dt = self.odes.get(S_sym) if dS_dt is None: return None # Solve the system with the assumption I != 0 equations = [] for var_sym, ode_rhs in self.odes.items(): equations.append(Eq(ode_rhs, 0)) # Try to solve without assuming I = 0 solutions = solve(equations, list(self.variables.values()), dict=True) if solutions and not isinstance(solutions, list): solutions = [solutions] for solution in solutions: eq = {} for var_name, var_sym in self.variables.items(): if var_sym in solution: eq[var_name] = solution[var_sym] else: eq[var_name] = var_sym # Check if this is an endemic equilibrium (I > 0) I_val = eq.get(I_name, 0) if hasattr(I_val, "evalf"): try: I_numeric = float(I_val.evalf()) if I_numeric > 0 and self._validate_equilibrium(eq): eq["method"] = "symbolic" return eq except: # Symbolic value, check if it could be positive pass # Also check for symbolic solutions if I_val != 0 and self._validate_equilibrium(eq): eq["method"] = "symbolic" return eq return None except Exception: return None def _find_equilibria_numeric( self, params: dict[str, float], max_solutions: int ) -> list[dict[str, Any]]: """ Find equilibria using numerical solving. Args: params: Parameter values max_solutions: Maximum number of solutions to find Returns: List of equilibrium dictionaries """ equilibria = [] if not self.odes or not params: return equilibria try: import numpy as np from scipy.optimize import fsolve # Substitute parameter values into ODEs odes_numeric = {} for var_sym, ode_rhs in self.odes.items(): odes_numeric[var_sym] = self.substitute_values(ode_rhs, params) # Define system of equations for fsolve def equations(x): result = [] for i, (var_sym, ode_rhs) in enumerate(odes_numeric.items()): # Substitute variable values subs_dict = {} for j, (v_name, v_sym) in enumerate(self.variables.items()): subs_dict[v_sym] = x[j] val = float(ode_rhs.subs(subs_dict)) result.append(val) return result # Try multiple initial guesses n_vars = len(self.variables) initial_guesses = self._generate_initial_guesses(params, n_vars, max_solutions) found_solutions = set() for x0 in initial_guesses: try: solution, info, ier, msg = fsolve(equations, x0, full_output=True) if ier == 1: # Solution found # Round to avoid duplicates solution_key = tuple(round(x, 6) for x in solution) if solution_key not in found_solutions: found_solutions.add(solution_key) eq = {} for i, (var_name, var_sym) in enumerate(self.variables.items()): eq[var_name] = float(solution[i]) # Validate if self._validate_equilibrium(eq, tolerance=1e-6): equilibria.append(eq) if len(equilibria) >= max_solutions: break except Exception: continue except ImportError: # scipy not available pass except Exception: pass return equilibria def _generate_initial_guesses( self, params: dict[str, float], n_vars: int, n_guesses: int ) -> list[list[float]]: """ Generate initial guesses for numeric equilibrium finding. Args: params: Parameter values n_vars: Number of variables n_guesses: Number of guesses to generate Returns: List of initial guess vectors """ guesses = [] N = params.get("N", params.get(str(self.total_population), 1000)) # 1. DFE dfe_guess = [0.0] * n_vars for i, var_name in enumerate(self.variables.keys()): if var_name.startswith("S") or "Susceptible" in var_name: dfe_guess[i] = N guesses.append(dfe_guess) # 2. Endemic-like guess based on R0 if available endemic_guess = [0.0] * n_vars try: # Compute R0 to get a better endemic guess R0_expr = self.compute_R0_next_generation() R0_val = float(self.substitute_values(R0_expr, params)) if R0_val > 1: # For SIR with vital dynamics: S* = N/R0, I* = (mu*N/gamma)*(R0-1) # Use approximate values for the guess for i, var_name in enumerate(self.variables.keys()): if var_name.startswith("S") or "Susceptible" in var_name: endemic_guess[i] = N / R0_val elif var_name.startswith("I") or "Infectious" in var_name: # Approximate I based on model structure mu = params.get("mu", 0.01) gamma = params.get("gamma", 0.1) I_star = (mu * N / gamma) * (R0_val - 1) if gamma > 0 else N * 0.1 endemic_guess[i] = min(I_star, N * 0.5) # Cap at 50% of population elif var_name.startswith("E") or "Exposed" in var_name: # For SEIR models endemic_guess[i] = N * 0.05 elif var_name.startswith("R") or "Removed" in var_name: endemic_guess[i] = N * 0.1 else: endemic_guess[i] = N * 0.05 else: # R0 <= 1, use generic endemic guess for i, var_name in enumerate(self.variables.keys()): if var_name.startswith("S") or "Susceptible" in var_name: endemic_guess[i] = N * 0.8 elif var_name.startswith("I") or "Infectious" in var_name: endemic_guess[i] = N * 0.1 elif var_name.startswith("R") or "Removed" in var_name: endemic_guess[i] = N * 0.1 except Exception: # Fall back to generic endemic guess for i, var_name in enumerate(self.variables.keys()): if var_name.startswith("S") or "Susceptible" in var_name: endemic_guess[i] = N * 0.8 elif var_name.startswith("I") or "Infectious" in var_name: endemic_guess[i] = N * 0.1 elif var_name.startswith("R") or "Removed" in var_name: endemic_guess[i] = N * 0.1 guesses.append(endemic_guess) # 3. Multiple varied endemic guesses # Try different proportions of susceptible population for S_ratio in [0.3, 0.5, 0.6]: varied_guess = [0.0] * n_vars remaining = N * (1 - S_ratio) for i, var_name in enumerate(self.variables.keys()): if var_name.startswith("S") or "Susceptible" in var_name: varied_guess[i] = N * S_ratio elif var_name.startswith("I") or "Infectious" in var_name: varied_guess[i] = remaining * 0.5 elif var_name.startswith("E") or "Exposed" in var_name: varied_guess[i] = remaining * 0.2 elif var_name.startswith("R") or "Removed" in var_name: varied_guess[i] = remaining * 0.3 else: varied_guess[i] = remaining * 0.1 guesses.append(varied_guess) # 4. Random guesses np.random.seed(42) # Reproducibility for _ in range(max(1, n_guesses - len(guesses))): guess = np.random.uniform(0, N, n_vars) # Ensure sum equals N for conserved models if self.total_population: guess = guess / guess.sum() * N guesses.append(guess.tolist()) return guesses[:n_guesses] def _classify_equilibrium(self, eq: dict[str, Any]) -> str: """ Classify equilibrium as disease-free or endemic. Args: eq: Equilibrium dictionary tolerance: Tolerance for considering infected compartment as zero Returns: 'dfe' or 'endemic' """ infected_vars = self._identify_infected_compartments() tolerance = 1e-10 for var_name in infected_vars: value = eq.get(var_name, 0) # Convert symbolic to numeric if needed if hasattr(value, "evalf"): try: value = float(value.evalf()) except Exception: # Symbolic and non-zero, likely endemic return "endemic" if abs(value) > tolerance: return "endemic" return "dfe" def _validate_equilibrium(self, eq: dict[str, Any], tolerance: float = 1e-10) -> bool: """ Validate that a point is actually an equilibrium. Args: eq: Equilibrium dictionary tolerance: Tolerance for checking dx/dt ≈ 0 Returns: True if valid equilibrium, False otherwise """ if not self.odes: return True try: # Substitute equilibrium values into ODEs subs_dict = {} for var_name, value in eq.items(): if var_name in self.variables: subs_dict[self.variables[var_name]] = value for var_sym, ode_rhs in self.odes.items(): residual = ode_rhs.subs(subs_dict) # Try to simplify the residual if hasattr(residual, "simplify"): residual = residual.simplify() # Check if residual is exactly zero (symbolic) if hasattr(residual, "is_zero") and residual.is_zero: continue # Try to evaluate numerically if hasattr(residual, "evalf"): try: residual_numeric = float(residual.evalf()) if abs(residual_numeric) > tolerance: return False except (TypeError, ValueError): # Residual contains free symbols - check if it simplifies to zero # For symbolic equilibria, the residual should be exactly 0 if hasattr(residual, "simplify"): simplified = residual.simplify() if simplified != 0 and not ( hasattr(simplified, "is_zero") and simplified.is_zero ): # Check if it's a symbolic expression that could be zero # For now, assume symbolic solutions from solve() are valid pass return True except Exception: return False def _is_equilibrium_duplicate( self, eq: dict[str, Any], existing: list[dict[str, Any]], tolerance: float = 1e-6 ) -> bool: """ Check if equilibrium is duplicate of existing ones. Args: eq: Equilibrium to check existing: List of existing equilibria tolerance: Tolerance for comparison Returns: True if duplicate, False otherwise """ for existing_eq in existing: is_duplicate = True for var_name in self.variables.keys(): val1 = eq.get(var_name, 0) val2 = existing_eq.get(var_name, 0) # Convert to float for comparison try: if hasattr(val1, "evalf"): val1 = float(val1.evalf()) elif hasattr(val1, "__float__"): val1 = float(val1) except (ValueError, TypeError): # Cannot convert to float, treat as different is_duplicate = False break try: if hasattr(val2, "evalf"): val2 = float(val2.evalf()) elif hasattr(val2, "__float__"): val2 = float(val2) except (ValueError, TypeError): # Cannot convert to float, treat as different is_duplicate = False break if abs(val1 - val2) > tolerance: is_duplicate = False break if is_duplicate: return True return False
[docs] def check_stability_at_dfe(self, R0: Any) -> str: """ Check stability of disease-free equilibrium based on R0. Args: R0: Basic reproduction number (symbolic or numeric) Returns: Stability classification: "stable", "unstable", or "neutral" """ if R0 is None: return "unknown" try: if hasattr(R0, "is_real") and R0.is_real: if hasattr(R0, "evalf"): R0_val = float(R0.evalf()) else: R0_val = float(R0) if R0_val < 1: return "stable" elif R0_val > 1: return "unstable" else: return "neutral" if hasattr(R0, "subs"): return "symbolic (depends on parameter values)" return "unknown" except Exception: return "unknown"
[docs] def compute_jacobian( self, equilibrium: dict[str, float | Symbol], substitute_values: bool = False ) -> Matrix: """ Compute Jacobian matrix at an equilibrium point. The Jacobian matrix J has entries J_ij = ∂f_i/∂x_j where: - f_i is the ODE for variable i - x_j is the j-th state variable Args: equilibrium: Dictionary mapping variable names to values. Can be symbolic or numeric values. substitute_values: If True, substitute equilibrium values into Jacobian. If False, keep symbolic form. Returns: SymPy Matrix representing the Jacobian Example: >>> dfe = model.find_disease_free_equilibrium() >>> J = model.compute_jacobian(dfe) >>> J Matrix([ [0, -beta, 0], [0, beta - gamma, 0], [0, gamma, 0] ]) """ if not self.odes: raise ValueError("No ODEs defined for this model") # Build Jacobian matrix n_vars = len(self.variables) var_names = list(self.variables.keys()) var_syms = list(self.variables.values()) jacobian_entries = [] for i, var_name_i in enumerate(var_names): var_sym_i = self.variables[var_name_i] ode_i = self.odes.get(var_sym_i) if ode_i is None: # No ODE for this variable - all zeros jacobian_entries.append([0] * n_vars) continue row = [] for j, var_name_j in enumerate(var_names): var_sym_j = self.variables[var_name_j] # Compute partial derivative ∂f_i/∂x_j try: partial = diff(ode_i, var_sym_j) partial = simplify(partial) except Exception: partial = 0 row.append(partial) jacobian_entries.append(row) J = Matrix(jacobian_entries) # Substitute equilibrium values if requested if substitute_values and equilibrium: subs_dict = {} for var_name, value in equilibrium.items(): if var_name in self.variables: subs_dict[self.variables[var_name]] = value J = J.subs(subs_dict) J = simplify(J) return J
[docs] def compute_eigenvalues( self, jacobian: Matrix, numeric: bool = False, params: dict[str, float] | None = None ) -> list[Any]: """ Compute eigenvalues of the Jacobian matrix. Eigenvalues determine local stability: - All Re(λ) < 0: stable equilibrium - Any Re(λ) > 0: unstable equilibrium - Re(λ) = 0: neutral or bifurcation point Args: jacobian: Jacobian matrix (from compute_jacobian) numeric: Force numeric evaluation of eigenvalues params: Parameter values for numeric evaluation Returns: List of eigenvalues (symbolic or numeric complex numbers) Example: >>> J = model.compute_jacobian(dfe, substitute_values=True) >>> eigenvalues = model.compute_eigenvalues(J) >>> eigenvalues [0, -gamma, -beta + gamma] """ eigenvalues = [] try: if numeric or params: # Substitute parameter values if provided if params: subs_dict = {} for param_name, value in params.items(): if param_name in self.parameters: subs_dict[self.parameters[param_name]] = value jacobian = jacobian.subs(subs_dict) # Try numeric eigenvalue computation try: # Convert to numpy array for numeric computation jac_np = np.array(jacobian.tolist(), dtype=float) eigenvalues_np = np.linalg.eigvals(jac_np) eigenvalues = [complex(ev) for ev in eigenvalues_np] except Exception: # Fall back to symbolic pass if not eigenvalues: # Try symbolic eigenvalue computation eigenvalue_dict = jacobian.eigenvals() if eigenvalue_dict: for eigenvalue, multiplicity in eigenvalue_dict.items(): eigenvalue = simplify(eigenvalue) # Add according to multiplicity for _ in range(multiplicity): eigenvalues.append(eigenvalue) else: # If eigenvals() fails, try more robust method n = jacobian.shape[0] char_poly = jacobian.charpoly() roots = solve(char_poly.as_expr()) eigenvalues = roots if isinstance(roots, list) else [roots] except Exception as e: # Last resort: return empty list pass return eigenvalues
[docs] def analyze_stability_full( self, equilibrium: dict[str, float | Symbol], params: dict[str, float] | None = None, tolerance: float = 1e-10, ) -> dict[str, Any]: """ Perform full stability analysis at an equilibrium point. Computes and analyzes: - Jacobian matrix - Eigenvalues - Stability classification - Bifurcation indicators - Detailed classification (node, focus, saddle, etc.) Args: equilibrium: Equilibrium point (variable names to values) params: Parameter values for numeric evaluation tolerance: Tolerance for eigenvalue zero detection Returns: Dictionary with comprehensive stability information: - 'jacobian': Jacobian matrix (symbolic or numeric) - 'eigenvalues': List of eigenvalues - 'eigenvalues_numeric': Numeric eigenvalues (if params provided) - 'stability': 'stable', 'unstable', 'neutral', or 'saddle' - 'classification': Detailed type (e.g., 'stable_node', 'unstable_focus') - 'max_real_part': Maximum real part of eigenvalues - 'min_real_part': Minimum real part of eigenvalues - 'has_complex': Boolean indicating complex eigenvalues - 'near_bifurcation': Boolean indicating proximity to bifurcation - 'bifurcation_type': Type of bifurcation if detected Example: >>> result = model.analyze_stability_full(dfe, params={'beta': 0.3, 'gamma': 0.1}) >>> result['stability'] 'unstable' >>> result['classification'] 'unstable_node' >>> result['max_real_part'] 0.2 """ result = { "jacobian": None, "eigenvalues": [], "eigenvalues_numeric": [], "stability": "unknown", "classification": "unknown", "max_real_part": None, "min_real_part": None, "has_complex": False, "near_bifurcation": False, "bifurcation_type": None, } try: # 1. Compute Jacobian J = self.compute_jacobian(equilibrium, substitute_values=False) result["jacobian"] = J # 2. Compute symbolic eigenvalues eigenvalues_sym = self.compute_eigenvalues(J, numeric=False) result["eigenvalues"] = eigenvalues_sym # 3. Compute numeric eigenvalues if params provided if params: J_numeric = self.compute_jacobian(equilibrium, substitute_values=True) eigenvalues_num = self.compute_eigenvalues(J_numeric, numeric=True, params=params) result["eigenvalues_numeric"] = eigenvalues_num # Use numeric eigenvalues for analysis eigenvalues_for_analysis = eigenvalues_num else: eigenvalues_for_analysis = eigenvalues_sym # 4. Analyze eigenvalue spectrum if eigenvalues_for_analysis: real_parts = [] imag_parts = [] for ev in eigenvalues_for_analysis: if hasattr(ev, "evalf"): try: ev_complex = complex(ev.evalf()) real_parts.append(ev_complex.real) imag_parts.append(ev_complex.imag) except Exception: # Symbolic eigenvalue pass elif isinstance(ev, complex): real_parts.append(ev.real) imag_parts.append(ev.imag) elif isinstance(ev, (int, float)): real_parts.append(ev) imag_parts.append(0.0) if real_parts: result["max_real_part"] = max(real_parts) result["min_real_part"] = min(real_parts) result["has_complex"] = any(abs(im) > tolerance for im in imag_parts) # 5. Classify stability result["stability"] = self._classify_stability( real_parts, imag_parts, tolerance ) # 6. Detailed classification result["classification"] = self._classify_stability_detailed( real_parts, imag_parts, tolerance ) # 7. Detect bifurcations result["near_bifurcation"], result["bifurcation_type"] = ( self._detect_bifurcation(real_parts, imag_parts, tolerance) ) except Exception as e: result["error"] = str(e) return result
def _classify_stability( self, real_parts: list[float], imag_parts: list[float], tolerance: float ) -> str: """ Classify stability based on eigenvalue real parts. Args: real_parts: Real parts of eigenvalues imag_parts: Imaginary parts of eigenvalues tolerance: Tolerance for zero detection Returns: 'stable', 'unstable', 'saddle', or 'neutral' """ max_real = max(real_parts) min_real = min(real_parts) # Check for zero eigenvalues (neutral) if abs(max_real) < tolerance and abs(min_real) < tolerance: return "neutral" # Check for mixed signs (saddle point) if max_real > tolerance and min_real < -tolerance: return "saddle" # Check for all negative (stable) if max_real < -tolerance: return "stable" # Check for all positive (unstable) if min_real > tolerance: return "unstable" # Some eigenvalues near zero if abs(max_real) < tolerance or abs(min_real) < tolerance: return "neutral" return "unknown" def _classify_stability_detailed( self, real_parts: list[float], imag_parts: list[float], tolerance: float ) -> str: """ Detailed stability classification. Args: real_parts: Real parts of eigenvalues imag_parts: Imaginary parts of eigenvalues tolerance: Tolerance for zero detection Returns: Detailed classification string """ max_real = max(real_parts) min_real = min(real_parts) has_complex = any(abs(im) > tolerance for im in imag_parts) # Determine base stability if max_real < -tolerance: base = "stable" elif min_real > tolerance: base = "unstable" elif max_real > tolerance and min_real < -tolerance: base = "saddle" elif abs(max_real) < tolerance: base = "neutral" else: base = "unknown" # Determine type (node vs focus) if has_complex: type_str = "focus" else: type_str = "node" # Special case: saddle if base == "saddle": return f"saddle_point" # Combine if base in ["stable", "unstable"]: return f"{base}_{type_str}" elif base == "neutral": if has_complex: return "center" else: return "neutral" return base def _detect_bifurcation( self, real_parts: list[float], imag_parts: list[float], tolerance: float ) -> tuple[bool, str | None]: """ Detect if system is near a bifurcation point. Bifurcations occur when eigenvalues cross the imaginary axis. Args: real_parts: Real parts of eigenvalues imag_parts: Imaginary parts of eigenvalues tolerance: Tolerance for detection Returns: Tuple of (is_near_bifurcation, bifurcation_type) """ # Check if any eigenvalue is near imaginary axis near_bifurcation = False bifurcation_type = None for i, (re, im) in enumerate(zip(real_parts, imag_parts)): if abs(re) < tolerance * 10: # Near zero real part near_bifurcation = True if abs(im) < tolerance: # Real eigenvalue crossing zero bifurcation_type = "transcritical_or_saddle_node" else: # Complex pair crossing imaginary axis bifurcation_type = "hopf" break return near_bifurcation, bifurcation_type
[docs] def compute_sensitivity_matrix( self, params: dict[str, float] | None = None, output_vars: list[str] | None = None, param_list: list[str] | None = None, perturbation: float = 0.01, ) -> dict[str, dict[str, Any]]: """ Compute sensitivity of equilibrium values to parameters. S_ij = ∂y_i/∂p_j where y is equilibrium value and p is parameter. Uses numerical perturbation when symbolic computation is not feasible. Args: params: Parameter values dict (required for numeric computation) output_vars: Variables to analyze (default: all) param_list: Parameters to analyze (default: all from params) perturbation: Relative perturbation size for numerical derivatives (default: 1%) Returns: Nested dictionary: {output_var: {param: sensitivity_value}} Example: >>> S = model.compute_sensitivity_matrix({'beta': 0.3, 'gamma': 0.1, 'N': 1000}) >>> S['I']['beta'] 100.0 # ∂I*/∂beta at equilibrium """ if output_vars is None: output_vars = list(self.variables.keys()) sensitivities = {var: {} for var in output_vars} if params is None: # Without params, we can only return placeholder for output_var in output_vars: for param_name in param_list or []: sensitivities[output_var][param_name] = None return sensitivities if param_list is None: param_list = list(params.keys()) # Find base equilibrium try: equilibria = self.find_all_equilibria(params, numeric_fallback=True) base_eq = None # Prefer endemic equilibrium if available for eq in equilibria: if eq.get("type") == "endemic": base_eq = eq break if base_eq is None: # Fall back to DFE base_eq = self.find_disease_free_equilibrium() # Get base equilibrium values base_values = {} for var_name in output_vars: val = base_eq.get(var_name, 0) if hasattr(val, "evalf"): try: val = float(val.evalf()) except (TypeError, ValueError): val = 0 base_values[var_name] = val # Compute sensitivities using finite differences for param_name in param_list: if param_name not in params: continue param_val = params[param_name] delta = param_val * perturbation # Perturb parameter params_perturbed = params.copy() params_perturbed[param_name] = param_val + delta # Find perturbed equilibrium equilibria_perturbed = self.find_all_equilibria( params_perturbed, numeric_fallback=True ) perturbed_eq = None # Find equilibrium of same type for eq in equilibria_perturbed: if eq.get("type") == base_eq.get("type"): perturbed_eq = eq break if perturbed_eq is None: for var_name in output_vars: sensitivities[var_name][param_name] = None continue # Compute sensitivity for var_name in output_vars: perturbed_val = perturbed_eq.get(var_name, 0) if hasattr(perturbed_val, "evalf"): try: perturbed_val = float(perturbed_val.evalf()) except (TypeError, ValueError): perturbed_val = 0 # Sensitivity = dy/dp = (y_perturbed - y_base) / delta if delta != 0: sensitivities[var_name][param_name] = ( perturbed_val - base_values[var_name] ) / delta else: sensitivities[var_name][param_name] = None except Exception as e: # Return empty sensitivities on failure pass return sensitivities
[docs] def compute_elasticity_indices( self, params: dict[str, float], output_vars: list[str] | None = None ) -> dict[str, dict[str, float]]: """ Compute elasticity indices (normalized sensitivity). E_ij = (∂y_i/y_i) / (∂p_j/p_j) = (p_j/y_i) * (∂y_i/∂p_j) Interpreted as: percentage change in output per 1% change in parameter. Args: params: Parameter values (numeric) output_vars: Variables to analyze Returns: Nested dictionary: {output_var: {param: elasticity}} Example: >>> E = model.compute_elasticity_indices({'beta': 0.3, 'gamma': 0.1, 'N': 1000}) >>> E['I']['beta'] 1.0 # 1% increase in beta → 1% increase in I* """ elasticities = {} if output_vars is None: output_vars = list(self.variables.keys()) # Find equilibrium values try: equilibria = self.find_all_equilibria(params) equilibrium = None # Prefer endemic equilibrium if available for eq in equilibria: if eq.get("type") == "endemic": equilibrium = eq break if equilibrium is None: # Fall back to DFE equilibrium = self.find_disease_free_equilibrium() # Compute elasticities at equilibrium for output_var in output_vars: var_elasticities = {} # Get equilibrium value eq_value = equilibrium.get(output_var, 0) if eq_value is None: continue if hasattr(eq_value, "evalf"): try: eq_value = float(eq_value.evalf()) except (TypeError, ValueError): continue if eq_value == 0: elasticities[output_var] = {} continue # Compute elasticity for each parameter for param, param_val in params.items(): if param_val == 0 or eq_value == 0: var_elasticities[param] = 0.0 continue # Perturb parameter by small amount delta_p = param_val * 0.01 # 1% perturbation params_perturbed = params.copy() params_perturbed[param] = param_val + delta_p # Find new equilibrium try: equilibria_perturbed = self.find_all_equilibria(params_perturbed) eq_perturbed = None # Find equilibrium of same type for eq_p in equilibria_perturbed: if eq_p.get("type") == equilibrium.get("type"): eq_perturbed = eq_p break if eq_perturbed is None: var_elasticities[param] = 0.0 continue # Get perturbed equilibrium value eq_perturbed_value = eq_perturbed.get(output_var, 0) if eq_perturbed_value is None: var_elasticities[param] = 0.0 continue if hasattr(eq_perturbed_value, "evalf"): try: eq_perturbed_value = float(eq_perturbed_value.evalf()) except (TypeError, ValueError): var_elasticities[param] = 0.0 continue if eq_perturbed_value == 0: var_elasticities[param] = 0.0 continue # Compute elasticity: % change in output / % change in param percent_change_param = (delta_p / param_val) * 100.0 percent_change_output = ((eq_perturbed_value - eq_value) / eq_value) * 100.0 # Elasticity = percent_change_output / percent_change_param if abs(percent_change_param) > 1e-10: var_elasticities[param] = percent_change_output / percent_change_param else: var_elasticities[param] = 0.0 except Exception: var_elasticities[param] = 0.0 elasticities[output_var] = var_elasticities except Exception as e: # Return zeros if computation fails elasticities = {var: {} for var in output_vars} return elasticities
[docs] def perform_perturbation_analysis( self, params: dict[str, float], equilibrium: dict[str, float], perturbation: float = 0.01, output_vars: list[str] | None = None, ) -> dict[str, dict[str, float]]: """ Numerical perturbation analysis. For each parameter p: 1. Perturb p by (1 + perturbation) 2. Recompute equilibrium 3. Calculate % change in outputs Args: params: Base parameter values equilibrium: Base equilibrium values perturbation: Perturbation size (default: 1%) output_vars: Variables to analyze Returns: Nested dictionary: {output_var: {param: percent_change}} Example: >>> PA = model.perform_perturbation_analysis(params, dfe, 0.01) >>> PA['I']['beta'] 0.98 # 1% increase in beta → 0.98% increase in I """ perturbations = {} if output_vars is None: output_vars = list(self.variables.keys()) # Get base equilibrium values base_eq = {} for var_name in output_vars: val = equilibrium.get(var_name, 0) if val is None: continue if hasattr(val, "evalf"): try: val = float(val.evalf()) except (TypeError, ValueError): continue base_eq[var_name] = val # For each parameter for param_name in params.keys(): param_val = params[param_name] # Create perturbed params params_perturbed = params.copy() params_perturbed[param_name] = param_val * (1 + perturbation) # Find equilibrium with perturbed params try: equilibria_perturbed = self.find_all_equilibria(params_perturbed) eq_perturbed = None # Find equilibrium of same type for eq in equilibria_perturbed: if eq.get("type") == equilibrium.get("type"): eq_perturbed = eq break if eq_perturbed is None: continue # Get perturbed equilibrium values for var_name in output_vars: if var_name not in perturbations: perturbations[var_name] = {} val_perturbed = eq_perturbed.get(var_name, 0) if val_perturbed is None: perturbations[var_name][param_name] = 0.0 continue if hasattr(val_perturbed, "evalf"): try: val_perturbed = float(val_perturbed.evalf()) except (TypeError, ValueError): perturbations[var_name][param_name] = 0.0 continue # Calculate percent change if base_eq.get(var_name, 0) != 0: percent_change = ( (val_perturbed - base_eq[var_name]) / base_eq[var_name] ) * 100.0 else: percent_change = 0.0 perturbations[var_name][param_name] = percent_change except Exception: for var_name in output_vars: if var_name not in perturbations: perturbations[var_name] = {} perturbations[var_name][param_name] = 0.0 return perturbations
[docs] def rank_parameter_importance( self, params: dict[str, float], output_var: str, method: str = "elasticity" ) -> list[tuple[str, float]]: """ Rank parameters by importance for a given output. Args: params: Parameter values output_var: Variable to analyze method: 'elasticity' or 'perturbation' Returns: List of (parameter, importance_score) tuples, sorted by absolute importance Example: >>> model.rank_parameter_importance(params, 'I') [('beta', 1.0), ('gamma', -1.0), ('N', 0.0)] # beta most important, gamma second, N has no effect """ # Compute sensitivity based on method if method == "elasticity": sensitivities = self.compute_elasticity_indices(params, [output_var]) elif method == "perturbation": dfe = self.find_disease_free_equilibrium() sensitivities = self.perform_perturbation_analysis(params, dfe, 0.01, [output_var]) else: # Unknown method return [] # Rank by absolute importance if output_var not in sensitivities: return [] ranking = sorted(sensitivities[output_var].items(), key=lambda x: abs(x[1]), reverse=True) return ranking
def _get_context(self) -> dict: """ Get context dictionary for sympify. Returns: Dictionary mapping names to SymPy symbols """ context = {} context.update(self.parameters) context.update(self.variables) if self.total_population: context[str(self.total_population)] = self.total_population return context
[docs] def get_parameter_symbol(self, name: str) -> Symbol: """Get SymPy Symbol for a parameter.""" if name not in self.parameters: raise KeyError(f"Unknown parameter: {name}") return self.parameters[name]
[docs] def get_variable_symbol(self, name: str) -> Symbol: """Get SymPy Symbol for a variable.""" if name not in self.variables: raise KeyError(f"Unknown variable: {name}") return self.variables[name]
[docs] def substitute_values(self, expression: Any, values: dict[str, float]) -> Any: """ Substitute numeric values into a symbolic expression. Args: expression: SymPy expression values: Dictionary mapping parameter/variable names to values Returns: Expression with substitutions applied """ subs_dict = {} for name, value in values.items(): if name in self.parameters: subs_dict[self.parameters[name]] = value elif name in self.variables: subs_dict[self.variables[name]] = value elif self.total_population and name == str(self.total_population): subs_dict[self.total_population] = value return expression.subs(subs_dict)
[docs] def to_latex(self, expression: Any) -> str: """ Convert a SymPy expression to LaTeX. Args: expression: SymPy expression Returns: LaTeX string """ return latex(expression)