"""
Symbolic model representation and analysis using SymPy.
Provides symbolic computation for:
- Parameter and variable symbols with assumptions
- R0 (basic reproduction number) calculation
- Equilibrium point analysis
- Stability analysis
"""
from typing import Any
from collections import OrderedDict
try:
from sympy import (
Symbol,
symbols,
simplify,
solve,
diff,
Matrix,
sympify,
latex,
Eq,
Function,
Derivative,
sqrt,
Abs,
re,
im,
I,
S,
nsolve,
)
from sympy.core.assumptions import check_assumptions
import numpy as np
SYMPY_AVAILABLE = True
except ImportError:
SYMPY_AVAILABLE = False
Symbol = None
symbols = None
[docs]
class SymbolicModel:
"""
Symbolic representation of an epidemiological model.
Enables symbolic analysis of model properties:
- Compute R0 using next-generation matrix method
- Find equilibrium points
- Analyze stability
Example:
>>> model = SymbolicModel()
>>> model.add_parameter("beta", positive=True)
>>> model.add_parameter("gamma", positive=True)
>>> model.add_variable("S", positive=True)
>>> model.add_variable("I", positive=True)
>>> model.add_variable("R", positive=True)
>>> model.set_total_population("N")
>>> model.define_ode("S", "-beta*S*I/N")
>>> model.define_ode("I", "beta*S*I/N - gamma*I")
>>> model.define_ode("R", "gamma*I")
>>> R0 = model.compute_R0_next_generation()
"""
def __init__(self):
if not SYMPY_AVAILABLE:
raise ImportError(
"SymPy is required for symbolic analysis. Install with: pip install sympy"
)
self.parameters: dict[str, Symbol] = OrderedDict()
self.variables: dict[str, Symbol] = OrderedDict()
self.total_population: Symbol | None = None
self.odes: dict[Symbol, Any] = OrderedDict()
self.difference_equations: dict[Symbol, Any] = OrderedDict()
self._is_discrete = False
[docs]
def add_parameter(
self,
name: str,
positive: bool = False,
negative: bool = False,
real: bool = True,
**assumptions,
) -> Symbol:
"""
Add a parameter as a symbolic variable.
Args:
name: Parameter name
positive: Whether parameter is positive
negative: Whether parameter is negative
real: Whether parameter is real (vs complex)
**assumptions: Additional SymPy assumptions
Returns:
SymPy Symbol object
"""
assumption_dict = {"real": real, "positive": positive, "negative": negative, **assumptions}
self.parameters[name] = symbols(name, **assumption_dict)
return self.parameters[name]
[docs]
def add_variable(
self,
name: str,
positive: bool = False,
negative: bool = False,
real: bool = True,
**assumptions,
) -> Symbol:
"""
Add a state variable as a symbolic variable.
Args:
name: Variable name
positive: Whether variable is positive
negative: Whether variable is negative
real: Whether variable is real
**assumptions: Additional SymPy assumptions
Returns:
SymPy Symbol object
"""
assumption_dict = {"real": real, "positive": positive, "negative": negative, **assumptions}
self.variables[name] = symbols(name, **assumption_dict)
return self.variables[name]
[docs]
def set_total_population(self, name: str = "N") -> Symbol:
"""
Set the symbol representing total population.
Args:
name: Symbol name for total population (default: "N")
Returns:
SymPy Symbol for total population
"""
self.total_population = symbols(name, positive=True, real=True)
return self.total_population
[docs]
def define_ode(self, variable: str, rhs: str) -> None:
"""
Define an ODE for a state variable.
Args:
variable: Variable name (e.g., "S", "I", "R")
rhs: Right-hand side expression as string (e.g., "-beta*S*I/N")
Example:
>>> model.define_ode("S", "-beta*S*I/N")
"""
if variable not in self.variables:
raise ValueError(f"Unknown variable: {variable}")
context = self._get_context()
expr = sympify(rhs, locals=context)
self.odes[self.variables[variable]] = expr
[docs]
def define_difference_equation(self, variable: str, rhs: str) -> None:
"""
Define a difference equation for a discrete-time model.
Args:
variable: Variable name
rhs: Right-hand side expression
Example:
>>> model.define_difference_equation("S", "S - beta*S*I/N + gamma*I")
"""
if variable not in self.variables:
raise ValueError(f"Unknown variable: {variable}")
context = self._get_context()
expr = sympify(rhs, locals=context)
self.difference_equations[self.variables[variable]] = expr
self._is_discrete = True
[docs]
def compute_R0_next_generation(self) -> Any:
"""
Compute basic reproduction number R0 using next-generation matrix method.
This method:
1. Identifies infected compartments
2. Computes new infections matrix F and transitions matrix V
3. Computes R0 as spectral radius of F*V^(-1) evaluated at DFE
The next-generation matrix method:
- For dI/dt = F - V (F = new infections, V = transitions out)
- Compute Jacobians: dF = ∂F/∂X, dV = ∂V/∂X (where X = infected compartments)
- Evaluate at DFE (disease-free equilibrium)
- R0 = spectral_radius(dF * dV^(-1))
Returns:
Symbolic expression for R0 (evaluated at DFE)
Note:
For single infected compartment: R0 = (∂F/∂I)|DFE / (∂V/∂I)|DFE
"""
infected_vars = self._identify_infected_compartments()
if not infected_vars:
raise ValueError("Cannot identify infected compartments")
# Get disease-free equilibrium for evaluation
dfe = self.find_disease_free_equilibrium()
if len(infected_vars) == 1:
I_name = infected_vars[0]
I_sym = self.variables[I_name]
dI_dt = self.odes.get(I_sym)
if dI_dt is None:
raise ValueError(f"No ODE defined for {I_name}")
# Decompose: dI/dt = F - V
F, V = self._decompose_infection_terms(dI_dt)
if F == 0:
raise ValueError("Cannot identify new infection terms")
# Compute partial derivatives with respect to infected compartment
# This linearizes F and V around the equilibrium
dF_dI = diff(F, I_sym) if hasattr(F, "diff") else 0
dV_dI = diff(V, I_sym) if hasattr(V, "diff") else 0
# Substitute DFE values (S=N, I=0, E=0, etc.)
subs_dict = {}
for var_name, var_sym in self.variables.items():
dfe_val = dfe.get(var_name, 0)
subs_dict[var_sym] = dfe_val
# Evaluate derivatives at DFE
dF_at_dfe = dF_dI.subs(subs_dict) if hasattr(dF_dI, "subs") else dF_dI
dV_at_dfe = dV_dI.subs(subs_dict) if hasattr(dV_dI, "subs") else dV_dI
# R0 = dF/dI / dV/dI at DFE
if dV_at_dfe != 0:
R0 = simplify(dF_at_dfe / dV_at_dfe)
else:
R0 = simplify(dF_at_dfe)
return R0
else:
return self._compute_R0_multivariate(infected_vars, dfe)
def _identify_infected_compartments(self) -> list:
"""
Identify infected compartments based on naming conventions.
Looks for variables with names containing:
- 'I' (infectious)
- 'E' (exposed)
- 'A' (asymptomatic)
"""
infected = []
priority_order = ["I", "E", "A"]
for name, var in self.variables.items():
if name.startswith("I") or "Infectious" in name:
infected.append(name)
if not infected:
for name, var in self.variables.items():
if name.startswith("E") or "Exposed" in name:
infected.append(name)
if not infected:
for name, var in self.variables.items():
if name.startswith("A") or "Asymptomatic" in name:
infected.append(name)
return infected
def _decompose_infection_terms(self, ode_rhs: Any) -> tuple[Any, Any]:
"""
Decompose ODE right-hand side into new infections (F) and transitions (V).
Convention: dI/dt = F - V
- F: rate of new infections entering compartment (positive terms from transmission)
- V: rate of transfer out of compartment (recovery, death, progression, etc.)
For example, for dI/dt = beta*S*I/N - gamma*I - mu*I:
- F = beta*S*I/N (new infections)
- V = (gamma + mu)*I (transitions out: recovery + death)
Returns:
Tuple of (F, V) where dI/dt = F - V
"""
if ode_rhs is None:
return 0, 0
rhs = sympify(ode_rhs)
if not hasattr(rhs, "as_ordered_terms"):
return rhs, 0
terms = rhs.as_ordered_terms()
F_terms = []
V_terms = []
# Identify the infected variable this ODE is for
infected_vars = self._identify_infected_compartments()
for term in terms:
term_str = str(term)
# A term is a "new infection" if it represents influx from transmission
# This typically has the pattern: transmission_param * S * I / N
is_new_infection = False
# Get the coefficient and check the term structure
coeff, rest = term.as_coeff_Mul()
# Check if term contains a transmission parameter multiplied by state variables
has_transmission_param = any(str(p) in term_str for p in self.parameters.values())
has_susceptible = any(v.startswith("S") for v in self.variables.keys() if v in term_str)
has_population = self.total_population and str(self.total_population) in term_str
# New infections come FROM transmission (beta*S*I pattern), not from transitions
# Terms like -gamma*I or -mu*I are transitions OUT, not new infections
if has_transmission_param and (has_susceptible or has_population):
# This looks like a transmission term
if coeff > 0:
# Positive transmission term: new infections (e.g., +beta*S*I/N)
is_new_infection = True
F_terms.append(term)
else:
# Negative transmission term: unusual but could be loss to infection
V_terms.append(-term)
else:
# All other terms are transitions (recovery, death, progression)
if coeff > 0:
# Positive transition (unusual): goes to V as subtraction
V_terms.append(term)
else:
# Negative transition (normal): e.g., -gamma*I becomes +gamma*I in V
V_terms.append(-term)
F = sum(F_terms) if F_terms else 0
V = sum(V_terms) if V_terms else 0
return sympify(F), sympify(V)
def _compute_R0_multivariate(
self, infected_vars: list, dfe: dict[str, Any] | None = None
) -> Any:
"""
Compute R0 for models with multiple infected compartments.
Uses next-generation matrix method with evaluation at DFE.
Args:
infected_vars: List of infected compartment names
dfe: Disease-free equilibrium values (optional, computed if not provided)
"""
if dfe is None:
dfe = self.find_disease_free_equilibrium()
n = len(infected_vars)
x = [self.variables[name] for name in infected_vars]
F_matrix = []
V_matrix = []
for i, var_name in enumerate(infected_vars):
var = self.variables[var_name]
ode = self.odes.get(var)
if ode is None:
F_row = [0] * n
V_row = [0] * n
else:
F, V = self._decompose_infection_terms(ode)
F_row = []
V_row = []
for j, x_j in enumerate(x):
if hasattr(F, "diff"):
F_ij = diff(F, x_j)
else:
F_ij = 0
F_row.append(F_ij)
if hasattr(V, "diff"):
V_ij = diff(V, x_j)
else:
V_ij = 0
V_row.append(V_ij)
F_matrix.append(F_row)
V_matrix.append(V_row)
F_mat = Matrix(F_matrix)
V_mat = Matrix(V_matrix)
# Substitute DFE values into matrices
subs_dict = {}
for var_name, var_sym in self.variables.items():
dfe_val = dfe.get(var_name, 0)
subs_dict[var_sym] = dfe_val
F_mat = F_mat.subs(subs_dict)
V_mat = V_mat.subs(subs_dict)
try:
V_inv = V_mat.inv()
K = F_mat * V_inv
eigenvalues = K.eigenvals()
if eigenvalues:
max_eigenvalue = max(eigenvalues.keys(), key=lambda e: Abs(complex(e.evalf())))
return simplify(max_eigenvalue)
else:
return None
except Exception:
return None
[docs]
def find_disease_free_equilibrium(self) -> dict[str, Any]:
"""
Find the disease-free equilibrium (DFE) of the model.
At DFE:
- All infected compartments are zero
- Susceptible compartment equals total population
Returns:
Dictionary mapping variable names to equilibrium values
"""
equilibrium = {}
infected = self._identify_infected_compartments()
for var_name in self.variables:
if var_name in infected:
equilibrium[var_name] = 0
elif var_name.startswith("S") or "Susceptible" in var_name:
equilibrium[var_name] = self.total_population if self.total_population else 1
else:
equilibrium[var_name] = 0
return equilibrium
[docs]
def find_all_equilibria(
self,
params: dict[str, float] | None = None,
numeric_fallback: bool = True,
max_solutions: int = 10,
) -> list[dict[str, Any]]:
"""
Find all equilibrium points of the model.
Solves the system of equations f(x) = 0 where f is the vector field.
Includes both disease-free and endemic equilibria.
Args:
params: Parameter values for numeric solving (optional)
numeric_fallback: If True, use numerical solver when symbolic fails
max_solutions: Maximum number of solutions to return
Returns:
List of equilibrium dictionaries. Each dictionary contains:
- Variable names mapped to equilibrium values
- 'type': 'dfe' or 'endemic'
- 'method': 'symbolic' or 'numeric'
Example:
>>> model.find_all_equilibria(params={'beta': 0.3, 'gamma': 0.1, 'N': 1000})
[
{'S': N, 'I': 0, 'R': 0, 'type': 'dfe', 'method': 'symbolic'},
{'S': N*gamma/beta, 'I': ..., 'R': ..., 'type': 'endemic', 'method': 'symbolic'}
]
"""
equilibria = []
# 1. Always include DFE
dfe = self.find_disease_free_equilibrium()
dfe["type"] = "dfe"
dfe["method"] = "analytical"
equilibria.append(dfe)
# 2. Try symbolic solving for all equilibria
symbolic_eqs = self._find_equilibria_symbolic(max_solutions)
for eq in symbolic_eqs:
if not self._is_equilibrium_duplicate(eq, equilibria):
eq_type = self._classify_equilibrium(eq)
eq["type"] = eq_type
eq["method"] = "symbolic"
equilibria.append(eq)
# 3. If requested and params provided, try numeric solving
if numeric_fallback and params and len(equilibria) < max_solutions:
numeric_eqs = self._find_equilibria_numeric(params, max_solutions - len(equilibria))
for eq in numeric_eqs:
if not self._is_equilibrium_duplicate(eq, equilibria):
eq_type = self._classify_equilibrium(eq)
eq["type"] = eq_type
eq["method"] = "numeric"
equilibria.append(eq)
return equilibria[:max_solutions]
[docs]
def find_endemic_equilibrium(
self, params: dict[str, float] | None = None, numeric_fallback: bool = True
) -> dict[str, Any] | None:
"""
Find endemic equilibrium point.
At endemic equilibrium, disease persists (I* > 0).
Only exists when R0 > 1.
Args:
params: Parameter values for numeric solving and R0 calculation
numeric_fallback: If True, use numerical solver when symbolic fails
Returns:
Equilibrium dictionary or None if no endemic equilibrium exists
Example:
>>> model.find_endemic_equilibrium({'beta': 0.3, 'gamma': 0.1, 'N': 1000})
{'S': 333.33, 'I': 666.67, 'R': 0, 'type': 'endemic', 'method': 'symbolic'}
"""
# Check if endemic equilibrium exists (R0 > 1)
if params:
try:
R0_expr = self.compute_R0_next_generation()
R0_val = float(self.substitute_values(R0_expr, params))
if R0_val <= 1:
return None # No endemic equilibrium when R0 <= 1
except Exception:
pass
# Find all equilibria and filter for endemic
equilibria = self.find_all_equilibria(params, numeric_fallback)
for eq in equilibria:
if eq.get("type") == "endemic":
return eq
return None
def _find_equilibria_symbolic(self, max_solutions: int) -> list[dict[str, Any]]:
"""
Find equilibria using symbolic solving.
Args:
max_solutions: Maximum number of solutions to find
Returns:
List of equilibrium dictionaries
"""
equilibria = []
if not self.odes:
return equilibria
try:
# Build system of equations: dx/dt = 0 for all variables
equations = []
for var_sym, ode_rhs in self.odes.items():
equations.append(Eq(ode_rhs, 0))
# Solve the system
variables_list = list(self.variables.values())
solutions = solve(equations, variables_list, dict=True)
# Handle single solution case
if solutions and not isinstance(solutions, list):
solutions = [solutions]
for solution in solutions[:max_solutions]:
eq = {}
# Extract values for each variable
for var_name, var_sym in self.variables.items():
if var_sym in solution:
eq[var_name] = solution[var_sym]
else:
eq[var_name] = var_sym
# Validate the equilibrium
if self._validate_equilibrium(eq):
# Check for duplicates before adding
if not self._is_equilibrium_duplicate(eq, equilibria):
equilibria.append(eq)
except Exception as e:
# Symbolic solving failed, will fall back to numeric
pass
# Only try analytical method if no endemic equilibrium found yet
has_endemic = any(self._classify_equilibrium(eq) == "endemic" for eq in equilibria)
if not has_endemic:
# Try to find endemic equilibrium by solving with I != 0 constraint
# This helps find solutions that SymPy might miss
endemic_eq = self._find_endemic_equilibrium_analytical()
if endemic_eq and not self._is_equilibrium_duplicate(endemic_eq, equilibria):
equilibria.append(endemic_eq)
return equilibria
if endemic_eq and not self._is_equilibrium_duplicate(endemic_eq, equilibria):
equilibria.append(endemic_eq)
return equilibria
def _find_endemic_equilibrium_analytical(self) -> dict[str, Any] | None:
"""
Try to find endemic equilibrium using analytical approach.
For common model structures like SIR with vital dynamics,
we can derive the endemic equilibrium analytically.
Returns:
Endemic equilibrium dictionary or None
"""
try:
# Try solving with constraint that infected compartments are non-zero
# For SIR with vital dynamics:
# dS/dt = mu*N - beta*S*I/N - mu*S = 0
# dI/dt = beta*S*I/N - (gamma + mu)*I = 0
# dR/dt = gamma*I - mu*R = 0
#
# From dI/dt = 0 with I != 0: S = N*(gamma + mu)/beta = N/R0
# From dS/dt = 0: I = (mu*N/gamma)*(R0 - 1)
# From dR/dt = 0: R = gamma*I/mu
infected_vars = self._identify_infected_compartments()
if not infected_vars:
return None
# Get the susceptible variable
susceptibles = [v for v in self.variables.keys() if v.startswith("S")]
if not susceptibles:
return None
S_name = susceptibles[0]
I_name = infected_vars[0]
# Get the ODE for I
I_sym = self.variables[I_name]
dI_dt = self.odes.get(I_sym)
if dI_dt is None:
return None
# Get DFE for reference
dfe = self.find_disease_free_equilibrium()
# Try to extract the endemic equilibrium form
# For dI/dt = beta*S*I/N - (gamma + mu)*I = 0
# The endemic equilibrium satisfies: S = N*(gamma + mu)/beta
# Compute R0 symbolically at DFE
R0_expr = self.compute_R0_next_generation()
# The endemic equilibrium has S = N/R0
# This means S = N * (gamma + mu) / beta for SIR with vital dynamics
# Try to solve for endemic equilibrium
S_sym = self.variables[S_name]
dS_dt = self.odes.get(S_sym)
if dS_dt is None:
return None
# Solve the system with the assumption I != 0
equations = []
for var_sym, ode_rhs in self.odes.items():
equations.append(Eq(ode_rhs, 0))
# Try to solve without assuming I = 0
solutions = solve(equations, list(self.variables.values()), dict=True)
if solutions and not isinstance(solutions, list):
solutions = [solutions]
for solution in solutions:
eq = {}
for var_name, var_sym in self.variables.items():
if var_sym in solution:
eq[var_name] = solution[var_sym]
else:
eq[var_name] = var_sym
# Check if this is an endemic equilibrium (I > 0)
I_val = eq.get(I_name, 0)
if hasattr(I_val, "evalf"):
try:
I_numeric = float(I_val.evalf())
if I_numeric > 0 and self._validate_equilibrium(eq):
eq["method"] = "symbolic"
return eq
except:
# Symbolic value, check if it could be positive
pass
# Also check for symbolic solutions
if I_val != 0 and self._validate_equilibrium(eq):
eq["method"] = "symbolic"
return eq
return None
except Exception:
return None
def _find_equilibria_numeric(
self, params: dict[str, float], max_solutions: int
) -> list[dict[str, Any]]:
"""
Find equilibria using numerical solving.
Args:
params: Parameter values
max_solutions: Maximum number of solutions to find
Returns:
List of equilibrium dictionaries
"""
equilibria = []
if not self.odes or not params:
return equilibria
try:
import numpy as np
from scipy.optimize import fsolve
# Substitute parameter values into ODEs
odes_numeric = {}
for var_sym, ode_rhs in self.odes.items():
odes_numeric[var_sym] = self.substitute_values(ode_rhs, params)
# Define system of equations for fsolve
def equations(x):
result = []
for i, (var_sym, ode_rhs) in enumerate(odes_numeric.items()):
# Substitute variable values
subs_dict = {}
for j, (v_name, v_sym) in enumerate(self.variables.items()):
subs_dict[v_sym] = x[j]
val = float(ode_rhs.subs(subs_dict))
result.append(val)
return result
# Try multiple initial guesses
n_vars = len(self.variables)
initial_guesses = self._generate_initial_guesses(params, n_vars, max_solutions)
found_solutions = set()
for x0 in initial_guesses:
try:
solution, info, ier, msg = fsolve(equations, x0, full_output=True)
if ier == 1: # Solution found
# Round to avoid duplicates
solution_key = tuple(round(x, 6) for x in solution)
if solution_key not in found_solutions:
found_solutions.add(solution_key)
eq = {}
for i, (var_name, var_sym) in enumerate(self.variables.items()):
eq[var_name] = float(solution[i])
# Validate
if self._validate_equilibrium(eq, tolerance=1e-6):
equilibria.append(eq)
if len(equilibria) >= max_solutions:
break
except Exception:
continue
except ImportError:
# scipy not available
pass
except Exception:
pass
return equilibria
def _generate_initial_guesses(
self, params: dict[str, float], n_vars: int, n_guesses: int
) -> list[list[float]]:
"""
Generate initial guesses for numeric equilibrium finding.
Args:
params: Parameter values
n_vars: Number of variables
n_guesses: Number of guesses to generate
Returns:
List of initial guess vectors
"""
guesses = []
N = params.get("N", params.get(str(self.total_population), 1000))
# 1. DFE
dfe_guess = [0.0] * n_vars
for i, var_name in enumerate(self.variables.keys()):
if var_name.startswith("S") or "Susceptible" in var_name:
dfe_guess[i] = N
guesses.append(dfe_guess)
# 2. Endemic-like guess based on R0 if available
endemic_guess = [0.0] * n_vars
try:
# Compute R0 to get a better endemic guess
R0_expr = self.compute_R0_next_generation()
R0_val = float(self.substitute_values(R0_expr, params))
if R0_val > 1:
# For SIR with vital dynamics: S* = N/R0, I* = (mu*N/gamma)*(R0-1)
# Use approximate values for the guess
for i, var_name in enumerate(self.variables.keys()):
if var_name.startswith("S") or "Susceptible" in var_name:
endemic_guess[i] = N / R0_val
elif var_name.startswith("I") or "Infectious" in var_name:
# Approximate I based on model structure
mu = params.get("mu", 0.01)
gamma = params.get("gamma", 0.1)
I_star = (mu * N / gamma) * (R0_val - 1) if gamma > 0 else N * 0.1
endemic_guess[i] = min(I_star, N * 0.5) # Cap at 50% of population
elif var_name.startswith("E") or "Exposed" in var_name:
# For SEIR models
endemic_guess[i] = N * 0.05
elif var_name.startswith("R") or "Removed" in var_name:
endemic_guess[i] = N * 0.1
else:
endemic_guess[i] = N * 0.05
else:
# R0 <= 1, use generic endemic guess
for i, var_name in enumerate(self.variables.keys()):
if var_name.startswith("S") or "Susceptible" in var_name:
endemic_guess[i] = N * 0.8
elif var_name.startswith("I") or "Infectious" in var_name:
endemic_guess[i] = N * 0.1
elif var_name.startswith("R") or "Removed" in var_name:
endemic_guess[i] = N * 0.1
except Exception:
# Fall back to generic endemic guess
for i, var_name in enumerate(self.variables.keys()):
if var_name.startswith("S") or "Susceptible" in var_name:
endemic_guess[i] = N * 0.8
elif var_name.startswith("I") or "Infectious" in var_name:
endemic_guess[i] = N * 0.1
elif var_name.startswith("R") or "Removed" in var_name:
endemic_guess[i] = N * 0.1
guesses.append(endemic_guess)
# 3. Multiple varied endemic guesses
# Try different proportions of susceptible population
for S_ratio in [0.3, 0.5, 0.6]:
varied_guess = [0.0] * n_vars
remaining = N * (1 - S_ratio)
for i, var_name in enumerate(self.variables.keys()):
if var_name.startswith("S") or "Susceptible" in var_name:
varied_guess[i] = N * S_ratio
elif var_name.startswith("I") or "Infectious" in var_name:
varied_guess[i] = remaining * 0.5
elif var_name.startswith("E") or "Exposed" in var_name:
varied_guess[i] = remaining * 0.2
elif var_name.startswith("R") or "Removed" in var_name:
varied_guess[i] = remaining * 0.3
else:
varied_guess[i] = remaining * 0.1
guesses.append(varied_guess)
# 4. Random guesses
np.random.seed(42) # Reproducibility
for _ in range(max(1, n_guesses - len(guesses))):
guess = np.random.uniform(0, N, n_vars)
# Ensure sum equals N for conserved models
if self.total_population:
guess = guess / guess.sum() * N
guesses.append(guess.tolist())
return guesses[:n_guesses]
def _classify_equilibrium(self, eq: dict[str, Any]) -> str:
"""
Classify equilibrium as disease-free or endemic.
Args:
eq: Equilibrium dictionary
tolerance: Tolerance for considering infected compartment as zero
Returns:
'dfe' or 'endemic'
"""
infected_vars = self._identify_infected_compartments()
tolerance = 1e-10
for var_name in infected_vars:
value = eq.get(var_name, 0)
# Convert symbolic to numeric if needed
if hasattr(value, "evalf"):
try:
value = float(value.evalf())
except Exception:
# Symbolic and non-zero, likely endemic
return "endemic"
if abs(value) > tolerance:
return "endemic"
return "dfe"
def _validate_equilibrium(self, eq: dict[str, Any], tolerance: float = 1e-10) -> bool:
"""
Validate that a point is actually an equilibrium.
Args:
eq: Equilibrium dictionary
tolerance: Tolerance for checking dx/dt ≈ 0
Returns:
True if valid equilibrium, False otherwise
"""
if not self.odes:
return True
try:
# Substitute equilibrium values into ODEs
subs_dict = {}
for var_name, value in eq.items():
if var_name in self.variables:
subs_dict[self.variables[var_name]] = value
for var_sym, ode_rhs in self.odes.items():
residual = ode_rhs.subs(subs_dict)
# Try to simplify the residual
if hasattr(residual, "simplify"):
residual = residual.simplify()
# Check if residual is exactly zero (symbolic)
if hasattr(residual, "is_zero") and residual.is_zero:
continue
# Try to evaluate numerically
if hasattr(residual, "evalf"):
try:
residual_numeric = float(residual.evalf())
if abs(residual_numeric) > tolerance:
return False
except (TypeError, ValueError):
# Residual contains free symbols - check if it simplifies to zero
# For symbolic equilibria, the residual should be exactly 0
if hasattr(residual, "simplify"):
simplified = residual.simplify()
if simplified != 0 and not (
hasattr(simplified, "is_zero") and simplified.is_zero
):
# Check if it's a symbolic expression that could be zero
# For now, assume symbolic solutions from solve() are valid
pass
return True
except Exception:
return False
def _is_equilibrium_duplicate(
self, eq: dict[str, Any], existing: list[dict[str, Any]], tolerance: float = 1e-6
) -> bool:
"""
Check if equilibrium is duplicate of existing ones.
Args:
eq: Equilibrium to check
existing: List of existing equilibria
tolerance: Tolerance for comparison
Returns:
True if duplicate, False otherwise
"""
for existing_eq in existing:
is_duplicate = True
for var_name in self.variables.keys():
val1 = eq.get(var_name, 0)
val2 = existing_eq.get(var_name, 0)
# Convert to float for comparison
try:
if hasattr(val1, "evalf"):
val1 = float(val1.evalf())
elif hasattr(val1, "__float__"):
val1 = float(val1)
except (ValueError, TypeError):
# Cannot convert to float, treat as different
is_duplicate = False
break
try:
if hasattr(val2, "evalf"):
val2 = float(val2.evalf())
elif hasattr(val2, "__float__"):
val2 = float(val2)
except (ValueError, TypeError):
# Cannot convert to float, treat as different
is_duplicate = False
break
if abs(val1 - val2) > tolerance:
is_duplicate = False
break
if is_duplicate:
return True
return False
[docs]
def check_stability_at_dfe(self, R0: Any) -> str:
"""
Check stability of disease-free equilibrium based on R0.
Args:
R0: Basic reproduction number (symbolic or numeric)
Returns:
Stability classification: "stable", "unstable", or "neutral"
"""
if R0 is None:
return "unknown"
try:
if hasattr(R0, "is_real") and R0.is_real:
if hasattr(R0, "evalf"):
R0_val = float(R0.evalf())
else:
R0_val = float(R0)
if R0_val < 1:
return "stable"
elif R0_val > 1:
return "unstable"
else:
return "neutral"
if hasattr(R0, "subs"):
return "symbolic (depends on parameter values)"
return "unknown"
except Exception:
return "unknown"
[docs]
def compute_jacobian(
self, equilibrium: dict[str, float | Symbol], substitute_values: bool = False
) -> Matrix:
"""
Compute Jacobian matrix at an equilibrium point.
The Jacobian matrix J has entries J_ij = ∂f_i/∂x_j where:
- f_i is the ODE for variable i
- x_j is the j-th state variable
Args:
equilibrium: Dictionary mapping variable names to values.
Can be symbolic or numeric values.
substitute_values: If True, substitute equilibrium values into Jacobian.
If False, keep symbolic form.
Returns:
SymPy Matrix representing the Jacobian
Example:
>>> dfe = model.find_disease_free_equilibrium()
>>> J = model.compute_jacobian(dfe)
>>> J
Matrix([
[0, -beta, 0],
[0, beta - gamma, 0],
[0, gamma, 0]
])
"""
if not self.odes:
raise ValueError("No ODEs defined for this model")
# Build Jacobian matrix
n_vars = len(self.variables)
var_names = list(self.variables.keys())
var_syms = list(self.variables.values())
jacobian_entries = []
for i, var_name_i in enumerate(var_names):
var_sym_i = self.variables[var_name_i]
ode_i = self.odes.get(var_sym_i)
if ode_i is None:
# No ODE for this variable - all zeros
jacobian_entries.append([0] * n_vars)
continue
row = []
for j, var_name_j in enumerate(var_names):
var_sym_j = self.variables[var_name_j]
# Compute partial derivative ∂f_i/∂x_j
try:
partial = diff(ode_i, var_sym_j)
partial = simplify(partial)
except Exception:
partial = 0
row.append(partial)
jacobian_entries.append(row)
J = Matrix(jacobian_entries)
# Substitute equilibrium values if requested
if substitute_values and equilibrium:
subs_dict = {}
for var_name, value in equilibrium.items():
if var_name in self.variables:
subs_dict[self.variables[var_name]] = value
J = J.subs(subs_dict)
J = simplify(J)
return J
[docs]
def compute_eigenvalues(
self, jacobian: Matrix, numeric: bool = False, params: dict[str, float] | None = None
) -> list[Any]:
"""
Compute eigenvalues of the Jacobian matrix.
Eigenvalues determine local stability:
- All Re(λ) < 0: stable equilibrium
- Any Re(λ) > 0: unstable equilibrium
- Re(λ) = 0: neutral or bifurcation point
Args:
jacobian: Jacobian matrix (from compute_jacobian)
numeric: Force numeric evaluation of eigenvalues
params: Parameter values for numeric evaluation
Returns:
List of eigenvalues (symbolic or numeric complex numbers)
Example:
>>> J = model.compute_jacobian(dfe, substitute_values=True)
>>> eigenvalues = model.compute_eigenvalues(J)
>>> eigenvalues
[0, -gamma, -beta + gamma]
"""
eigenvalues = []
try:
if numeric or params:
# Substitute parameter values if provided
if params:
subs_dict = {}
for param_name, value in params.items():
if param_name in self.parameters:
subs_dict[self.parameters[param_name]] = value
jacobian = jacobian.subs(subs_dict)
# Try numeric eigenvalue computation
try:
# Convert to numpy array for numeric computation
jac_np = np.array(jacobian.tolist(), dtype=float)
eigenvalues_np = np.linalg.eigvals(jac_np)
eigenvalues = [complex(ev) for ev in eigenvalues_np]
except Exception:
# Fall back to symbolic
pass
if not eigenvalues:
# Try symbolic eigenvalue computation
eigenvalue_dict = jacobian.eigenvals()
if eigenvalue_dict:
for eigenvalue, multiplicity in eigenvalue_dict.items():
eigenvalue = simplify(eigenvalue)
# Add according to multiplicity
for _ in range(multiplicity):
eigenvalues.append(eigenvalue)
else:
# If eigenvals() fails, try more robust method
n = jacobian.shape[0]
char_poly = jacobian.charpoly()
roots = solve(char_poly.as_expr())
eigenvalues = roots if isinstance(roots, list) else [roots]
except Exception as e:
# Last resort: return empty list
pass
return eigenvalues
[docs]
def analyze_stability_full(
self,
equilibrium: dict[str, float | Symbol],
params: dict[str, float] | None = None,
tolerance: float = 1e-10,
) -> dict[str, Any]:
"""
Perform full stability analysis at an equilibrium point.
Computes and analyzes:
- Jacobian matrix
- Eigenvalues
- Stability classification
- Bifurcation indicators
- Detailed classification (node, focus, saddle, etc.)
Args:
equilibrium: Equilibrium point (variable names to values)
params: Parameter values for numeric evaluation
tolerance: Tolerance for eigenvalue zero detection
Returns:
Dictionary with comprehensive stability information:
- 'jacobian': Jacobian matrix (symbolic or numeric)
- 'eigenvalues': List of eigenvalues
- 'eigenvalues_numeric': Numeric eigenvalues (if params provided)
- 'stability': 'stable', 'unstable', 'neutral', or 'saddle'
- 'classification': Detailed type (e.g., 'stable_node', 'unstable_focus')
- 'max_real_part': Maximum real part of eigenvalues
- 'min_real_part': Minimum real part of eigenvalues
- 'has_complex': Boolean indicating complex eigenvalues
- 'near_bifurcation': Boolean indicating proximity to bifurcation
- 'bifurcation_type': Type of bifurcation if detected
Example:
>>> result = model.analyze_stability_full(dfe, params={'beta': 0.3, 'gamma': 0.1})
>>> result['stability']
'unstable'
>>> result['classification']
'unstable_node'
>>> result['max_real_part']
0.2
"""
result = {
"jacobian": None,
"eigenvalues": [],
"eigenvalues_numeric": [],
"stability": "unknown",
"classification": "unknown",
"max_real_part": None,
"min_real_part": None,
"has_complex": False,
"near_bifurcation": False,
"bifurcation_type": None,
}
try:
# 1. Compute Jacobian
J = self.compute_jacobian(equilibrium, substitute_values=False)
result["jacobian"] = J
# 2. Compute symbolic eigenvalues
eigenvalues_sym = self.compute_eigenvalues(J, numeric=False)
result["eigenvalues"] = eigenvalues_sym
# 3. Compute numeric eigenvalues if params provided
if params:
J_numeric = self.compute_jacobian(equilibrium, substitute_values=True)
eigenvalues_num = self.compute_eigenvalues(J_numeric, numeric=True, params=params)
result["eigenvalues_numeric"] = eigenvalues_num
# Use numeric eigenvalues for analysis
eigenvalues_for_analysis = eigenvalues_num
else:
eigenvalues_for_analysis = eigenvalues_sym
# 4. Analyze eigenvalue spectrum
if eigenvalues_for_analysis:
real_parts = []
imag_parts = []
for ev in eigenvalues_for_analysis:
if hasattr(ev, "evalf"):
try:
ev_complex = complex(ev.evalf())
real_parts.append(ev_complex.real)
imag_parts.append(ev_complex.imag)
except Exception:
# Symbolic eigenvalue
pass
elif isinstance(ev, complex):
real_parts.append(ev.real)
imag_parts.append(ev.imag)
elif isinstance(ev, (int, float)):
real_parts.append(ev)
imag_parts.append(0.0)
if real_parts:
result["max_real_part"] = max(real_parts)
result["min_real_part"] = min(real_parts)
result["has_complex"] = any(abs(im) > tolerance for im in imag_parts)
# 5. Classify stability
result["stability"] = self._classify_stability(
real_parts, imag_parts, tolerance
)
# 6. Detailed classification
result["classification"] = self._classify_stability_detailed(
real_parts, imag_parts, tolerance
)
# 7. Detect bifurcations
result["near_bifurcation"], result["bifurcation_type"] = (
self._detect_bifurcation(real_parts, imag_parts, tolerance)
)
except Exception as e:
result["error"] = str(e)
return result
def _classify_stability(
self, real_parts: list[float], imag_parts: list[float], tolerance: float
) -> str:
"""
Classify stability based on eigenvalue real parts.
Args:
real_parts: Real parts of eigenvalues
imag_parts: Imaginary parts of eigenvalues
tolerance: Tolerance for zero detection
Returns:
'stable', 'unstable', 'saddle', or 'neutral'
"""
max_real = max(real_parts)
min_real = min(real_parts)
# Check for zero eigenvalues (neutral)
if abs(max_real) < tolerance and abs(min_real) < tolerance:
return "neutral"
# Check for mixed signs (saddle point)
if max_real > tolerance and min_real < -tolerance:
return "saddle"
# Check for all negative (stable)
if max_real < -tolerance:
return "stable"
# Check for all positive (unstable)
if min_real > tolerance:
return "unstable"
# Some eigenvalues near zero
if abs(max_real) < tolerance or abs(min_real) < tolerance:
return "neutral"
return "unknown"
def _classify_stability_detailed(
self, real_parts: list[float], imag_parts: list[float], tolerance: float
) -> str:
"""
Detailed stability classification.
Args:
real_parts: Real parts of eigenvalues
imag_parts: Imaginary parts of eigenvalues
tolerance: Tolerance for zero detection
Returns:
Detailed classification string
"""
max_real = max(real_parts)
min_real = min(real_parts)
has_complex = any(abs(im) > tolerance for im in imag_parts)
# Determine base stability
if max_real < -tolerance:
base = "stable"
elif min_real > tolerance:
base = "unstable"
elif max_real > tolerance and min_real < -tolerance:
base = "saddle"
elif abs(max_real) < tolerance:
base = "neutral"
else:
base = "unknown"
# Determine type (node vs focus)
if has_complex:
type_str = "focus"
else:
type_str = "node"
# Special case: saddle
if base == "saddle":
return f"saddle_point"
# Combine
if base in ["stable", "unstable"]:
return f"{base}_{type_str}"
elif base == "neutral":
if has_complex:
return "center"
else:
return "neutral"
return base
def _detect_bifurcation(
self, real_parts: list[float], imag_parts: list[float], tolerance: float
) -> tuple[bool, str | None]:
"""
Detect if system is near a bifurcation point.
Bifurcations occur when eigenvalues cross the imaginary axis.
Args:
real_parts: Real parts of eigenvalues
imag_parts: Imaginary parts of eigenvalues
tolerance: Tolerance for detection
Returns:
Tuple of (is_near_bifurcation, bifurcation_type)
"""
# Check if any eigenvalue is near imaginary axis
near_bifurcation = False
bifurcation_type = None
for i, (re, im) in enumerate(zip(real_parts, imag_parts)):
if abs(re) < tolerance * 10: # Near zero real part
near_bifurcation = True
if abs(im) < tolerance:
# Real eigenvalue crossing zero
bifurcation_type = "transcritical_or_saddle_node"
else:
# Complex pair crossing imaginary axis
bifurcation_type = "hopf"
break
return near_bifurcation, bifurcation_type
[docs]
def compute_sensitivity_matrix(
self,
params: dict[str, float] | None = None,
output_vars: list[str] | None = None,
param_list: list[str] | None = None,
perturbation: float = 0.01,
) -> dict[str, dict[str, Any]]:
"""
Compute sensitivity of equilibrium values to parameters.
S_ij = ∂y_i/∂p_j where y is equilibrium value and p is parameter.
Uses numerical perturbation when symbolic computation is not feasible.
Args:
params: Parameter values dict (required for numeric computation)
output_vars: Variables to analyze (default: all)
param_list: Parameters to analyze (default: all from params)
perturbation: Relative perturbation size for numerical derivatives (default: 1%)
Returns:
Nested dictionary: {output_var: {param: sensitivity_value}}
Example:
>>> S = model.compute_sensitivity_matrix({'beta': 0.3, 'gamma': 0.1, 'N': 1000})
>>> S['I']['beta']
100.0 # ∂I*/∂beta at equilibrium
"""
if output_vars is None:
output_vars = list(self.variables.keys())
sensitivities = {var: {} for var in output_vars}
if params is None:
# Without params, we can only return placeholder
for output_var in output_vars:
for param_name in param_list or []:
sensitivities[output_var][param_name] = None
return sensitivities
if param_list is None:
param_list = list(params.keys())
# Find base equilibrium
try:
equilibria = self.find_all_equilibria(params, numeric_fallback=True)
base_eq = None
# Prefer endemic equilibrium if available
for eq in equilibria:
if eq.get("type") == "endemic":
base_eq = eq
break
if base_eq is None:
# Fall back to DFE
base_eq = self.find_disease_free_equilibrium()
# Get base equilibrium values
base_values = {}
for var_name in output_vars:
val = base_eq.get(var_name, 0)
if hasattr(val, "evalf"):
try:
val = float(val.evalf())
except (TypeError, ValueError):
val = 0
base_values[var_name] = val
# Compute sensitivities using finite differences
for param_name in param_list:
if param_name not in params:
continue
param_val = params[param_name]
delta = param_val * perturbation
# Perturb parameter
params_perturbed = params.copy()
params_perturbed[param_name] = param_val + delta
# Find perturbed equilibrium
equilibria_perturbed = self.find_all_equilibria(
params_perturbed, numeric_fallback=True
)
perturbed_eq = None
# Find equilibrium of same type
for eq in equilibria_perturbed:
if eq.get("type") == base_eq.get("type"):
perturbed_eq = eq
break
if perturbed_eq is None:
for var_name in output_vars:
sensitivities[var_name][param_name] = None
continue
# Compute sensitivity
for var_name in output_vars:
perturbed_val = perturbed_eq.get(var_name, 0)
if hasattr(perturbed_val, "evalf"):
try:
perturbed_val = float(perturbed_val.evalf())
except (TypeError, ValueError):
perturbed_val = 0
# Sensitivity = dy/dp = (y_perturbed - y_base) / delta
if delta != 0:
sensitivities[var_name][param_name] = (
perturbed_val - base_values[var_name]
) / delta
else:
sensitivities[var_name][param_name] = None
except Exception as e:
# Return empty sensitivities on failure
pass
return sensitivities
[docs]
def compute_elasticity_indices(
self, params: dict[str, float], output_vars: list[str] | None = None
) -> dict[str, dict[str, float]]:
"""
Compute elasticity indices (normalized sensitivity).
E_ij = (∂y_i/y_i) / (∂p_j/p_j) = (p_j/y_i) * (∂y_i/∂p_j)
Interpreted as: percentage change in output per 1% change in parameter.
Args:
params: Parameter values (numeric)
output_vars: Variables to analyze
Returns:
Nested dictionary: {output_var: {param: elasticity}}
Example:
>>> E = model.compute_elasticity_indices({'beta': 0.3, 'gamma': 0.1, 'N': 1000})
>>> E['I']['beta']
1.0 # 1% increase in beta → 1% increase in I*
"""
elasticities = {}
if output_vars is None:
output_vars = list(self.variables.keys())
# Find equilibrium values
try:
equilibria = self.find_all_equilibria(params)
equilibrium = None
# Prefer endemic equilibrium if available
for eq in equilibria:
if eq.get("type") == "endemic":
equilibrium = eq
break
if equilibrium is None:
# Fall back to DFE
equilibrium = self.find_disease_free_equilibrium()
# Compute elasticities at equilibrium
for output_var in output_vars:
var_elasticities = {}
# Get equilibrium value
eq_value = equilibrium.get(output_var, 0)
if eq_value is None:
continue
if hasattr(eq_value, "evalf"):
try:
eq_value = float(eq_value.evalf())
except (TypeError, ValueError):
continue
if eq_value == 0:
elasticities[output_var] = {}
continue
# Compute elasticity for each parameter
for param, param_val in params.items():
if param_val == 0 or eq_value == 0:
var_elasticities[param] = 0.0
continue
# Perturb parameter by small amount
delta_p = param_val * 0.01 # 1% perturbation
params_perturbed = params.copy()
params_perturbed[param] = param_val + delta_p
# Find new equilibrium
try:
equilibria_perturbed = self.find_all_equilibria(params_perturbed)
eq_perturbed = None
# Find equilibrium of same type
for eq_p in equilibria_perturbed:
if eq_p.get("type") == equilibrium.get("type"):
eq_perturbed = eq_p
break
if eq_perturbed is None:
var_elasticities[param] = 0.0
continue
# Get perturbed equilibrium value
eq_perturbed_value = eq_perturbed.get(output_var, 0)
if eq_perturbed_value is None:
var_elasticities[param] = 0.0
continue
if hasattr(eq_perturbed_value, "evalf"):
try:
eq_perturbed_value = float(eq_perturbed_value.evalf())
except (TypeError, ValueError):
var_elasticities[param] = 0.0
continue
if eq_perturbed_value == 0:
var_elasticities[param] = 0.0
continue
# Compute elasticity: % change in output / % change in param
percent_change_param = (delta_p / param_val) * 100.0
percent_change_output = ((eq_perturbed_value - eq_value) / eq_value) * 100.0
# Elasticity = percent_change_output / percent_change_param
if abs(percent_change_param) > 1e-10:
var_elasticities[param] = percent_change_output / percent_change_param
else:
var_elasticities[param] = 0.0
except Exception:
var_elasticities[param] = 0.0
elasticities[output_var] = var_elasticities
except Exception as e:
# Return zeros if computation fails
elasticities = {var: {} for var in output_vars}
return elasticities
[docs]
def rank_parameter_importance(
self, params: dict[str, float], output_var: str, method: str = "elasticity"
) -> list[tuple[str, float]]:
"""
Rank parameters by importance for a given output.
Args:
params: Parameter values
output_var: Variable to analyze
method: 'elasticity' or 'perturbation'
Returns:
List of (parameter, importance_score) tuples, sorted by absolute importance
Example:
>>> model.rank_parameter_importance(params, 'I')
[('beta', 1.0), ('gamma', -1.0), ('N', 0.0)]
# beta most important, gamma second, N has no effect
"""
# Compute sensitivity based on method
if method == "elasticity":
sensitivities = self.compute_elasticity_indices(params, [output_var])
elif method == "perturbation":
dfe = self.find_disease_free_equilibrium()
sensitivities = self.perform_perturbation_analysis(params, dfe, 0.01, [output_var])
else:
# Unknown method
return []
# Rank by absolute importance
if output_var not in sensitivities:
return []
ranking = sorted(sensitivities[output_var].items(), key=lambda x: abs(x[1]), reverse=True)
return ranking
def _get_context(self) -> dict:
"""
Get context dictionary for sympify.
Returns:
Dictionary mapping names to SymPy symbols
"""
context = {}
context.update(self.parameters)
context.update(self.variables)
if self.total_population:
context[str(self.total_population)] = self.total_population
return context
[docs]
def get_parameter_symbol(self, name: str) -> Symbol:
"""Get SymPy Symbol for a parameter."""
if name not in self.parameters:
raise KeyError(f"Unknown parameter: {name}")
return self.parameters[name]
[docs]
def get_variable_symbol(self, name: str) -> Symbol:
"""Get SymPy Symbol for a variable."""
if name not in self.variables:
raise KeyError(f"Unknown variable: {name}")
return self.variables[name]
[docs]
def substitute_values(self, expression: Any, values: dict[str, float]) -> Any:
"""
Substitute numeric values into a symbolic expression.
Args:
expression: SymPy expression
values: Dictionary mapping parameter/variable names to values
Returns:
Expression with substitutions applied
"""
subs_dict = {}
for name, value in values.items():
if name in self.parameters:
subs_dict[self.parameters[name]] = value
elif name in self.variables:
subs_dict[self.variables[name]] = value
elif self.total_population and name == str(self.total_population):
subs_dict[self.total_population] = value
return expression.subs(subs_dict)
[docs]
def to_latex(self, expression: Any) -> str:
"""
Convert a SymPy expression to LaTeX.
Args:
expression: SymPy expression
Returns:
LaTeX string
"""
return latex(expression)