Source code for gsnn.simulate.simulate

import pyro
import pyro.distributions as dist
import torch
import numpy as np
import networkx as nx
from typing import Optional, Dict
from scipy.integrate import solve_ivp

from gsnn.simulate import utils

[docs]def simulate(G, n_train: int, n_test: int, input_nodes, output_nodes, *, noise_scale: float = 1.0, special_functions: Optional[Dict] = None, signed_edges: Optional[Dict[tuple, int]] = None): r"""Generate samples from a *synthetic* graph-structured data-generation process. The function takes a directed NetworkX graph that represents causal relationships between **input**, **function**, and **output** nodes. It converts the graph into a *Pyro* probabilistic program (via :pyfunc:`gsnn.simulate.utils.nx_to_pyro_model`) and then draws IID samples from that model. Args: G (networkx.DiGraph): Directed graph encoding the Bayesian network structure. n_train (int): Number of training instances to simulate. n_test (int): Number of test instances to simulate. input_nodes (list[str]): Ordered list of node names that are treated as **inputs** (observed variables). output_nodes (list[str]): Ordered list of node names that are treated as **outputs** (targets). noise_scale (float, optional): Standard deviation of the additive Gaussian noise term used for every conditional distribution that has no *special function* attached. **Default:** ``1.0``. special_functions (dict[str, callable], optional): Mapping from node name to a Python callable that overrides the default linear relationship for that node. Each callable must have the signature ``f(parent_values: list) -> Tensor`` where parent_values is a list of the parent node values. Shapes: - **x_train** – :math:`(n_{\text{train}}, |\text{inputs}|)` - **y_train** – :math:`(n_{\text{train}}, |\text{outputs}|)` - **x_test** – :math:`(n_{\text{test}}, |\text{inputs}|)` - **y_test** – :math:`(n_{\text{test}}, |\text{outputs}|)` Returns: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: ``(x_train, y_train, x_test, y_test)`` where each element is a dense NumPy array ordered according to ``input_nodes`` / ``output_nodes``. """ # Convert the NetworkX graph into a Pyro model with special functions pyro_model = utils.nx_to_pyro_model(G, input_nodes, output_nodes, special_functions, noise_scale=noise_scale, signed_edges=signed_edges) # Helper function to generate samples for a given number of instances def generate_samples(n_samples): x_samples = [] y_samples = [] for _ in range(n_samples): # Sample x values from a standard normal distribution x_values = {node: torch.tensor(np.random.normal(0, 1), dtype=torch.float32) for node in input_nodes} # Generate the corresponding y values using the Pyro model with pyro.plate("data", 1): y_values = pyro_model(x_values) # Append the x and y values to the sample lists x_samples.append([x_values[node].item() for node in input_nodes]) y_samples.append([y_values[node].item() for node in output_nodes]) return np.array(x_samples), np.array(y_samples) # Generate training and test samples x_train, y_train = generate_samples(n_train) x_test, y_test = generate_samples(n_test) return x_train, x_test, y_train, y_test
def simulate_sde(G, n_train: int, n_test: int, input_nodes, output_nodes, *, noise_scale: float = 1.0, dt: float = 0.01, t_final: float = 10.0, special_functions: Optional[Dict] = None, seed: Optional[int] = None, signed_edges: Optional[Dict[tuple, int]] = None): r"""Generate samples from a *synthetic* graph-structured data-generation process using stochastic ODEs. The function takes a directed NetworkX graph that represents causal relationships between **input**, **function**, and **output** nodes. It converts the graph into a system of stochastic ordinary differential equations (SODEs) and integrates them using the Euler-Maruyama method to generate data samples. The stochastic ODE system has the form: dX_i(t) = f_i(X_1(t), ..., X_n(t)) dt + σ dW_i(t) where X_i are the node values, f_i represents the deterministic dynamics (weighted sum of parent nodes), σ is the noise scale, and dW_i are independent Wiener processes. Args: G (networkx.DiGraph): Directed graph encoding the network structure. n_train (int): Number of training instances to simulate. n_test (int): Number of test instances to simulate. input_nodes (list[str]): Ordered list of node names that are treated as **inputs** (boundary conditions). output_nodes (list[str]): Ordered list of node names that are treated as **outputs** (targets). noise_scale (float, optional): Standard deviation of the stochastic noise term (diffusion coefficient). **Default:** ``1.0``. dt (float, optional): Time step size for numerical integration. **Default:** ``0.01``. t_final (float, optional): Final integration time. **Default:** ``10.0``. special_functions (dict[str, callable], optional): Mapping from node name to a Python callable that overrides the default linear relationship for that node. Each callable must have the signature ``f(parent_values: list) -> float`` where parent_values is a list of the parent node values. seed (int, optional): Random seed for reproducibility. Shapes: - **x_train** – :math:`(n_{\text{train}}, |\text{inputs}|)` - **y_train** – :math:`(n_{\text{train}}, |\text{outputs}|)` - **x_test** – :math:`(n_{\text{test}}, |\text{inputs}|)` - **y_test** – :math:`(n_{\text{test}}, |\text{outputs}|)` Returns: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: ``(x_train, y_train, x_test, y_test)`` where each element is a dense NumPy array ordered according to ``input_nodes`` / ``output_nodes``. """ if seed is not None: np.random.seed(seed) # Get all nodes and create mapping to indices all_nodes = list(nx.topological_sort(G)) node_to_idx = {node: i for i, node in enumerate(all_nodes)} n_nodes = len(all_nodes) # Initialize edge weights randomly edge_weights = {} for edge in G.edges(): sign = 1 if signed_edges is not None: sign = signed_edges.get(edge, 1) elif G.edges[edge].get('sign') is not None: sign = G.edges[edge]['sign'] edge_weights[edge] = sign * np.random.normal(0, 1) def sde_system(t, y, noise_scale): """ Define the stochastic ODE system. Args: t: Current time y: Current state vector (node values) noise_scale: Noise intensity Returns: dydt: Deterministic drift term noise: Stochastic diffusion term """ dydt = np.zeros(n_nodes) # Input nodes: keep constant (no drift) for node in input_nodes: idx = node_to_idx[node] dydt[idx] = 0 # Function and output nodes: dynamics based on weighted sum of parents for node in all_nodes: if node not in input_nodes: idx = node_to_idx[node] parent_sum = 0 parents = list(G.predecessors(node)) if parents: parent_values = [y[node_to_idx[parent]] for parent in parents] # Apply special function if available if special_functions and node in special_functions: parent_sum = special_functions[node](parent_values) else: # Default: weighted sum of parents for parent in parents: parent_idx = node_to_idx[parent] weight = edge_weights.get((parent, node), 1.0) parent_sum += weight * y[parent_idx] # Simple dynamics: drift towards the weighted sum dydt[idx] = -y[idx] + parent_sum return dydt def euler_maruyama_step(y, dt, noise_scale): """ Perform one step of Euler-Maruyama integration. """ drift = sde_system(0, y, noise_scale) # t not used in our system noise = np.random.normal(0, np.sqrt(dt) * noise_scale, size=y.shape) # Input nodes don't get noise for node in input_nodes: idx = node_to_idx[node] noise[idx] = 0 return y + drift * dt + noise def solve_sde(input_values, n_steps): """ Solve the stochastic ODE for given input values. """ # Initialize state vector y = np.zeros(n_nodes) # Set input node values for i, node in enumerate(input_nodes): idx = node_to_idx[node] y[idx] = input_values[i] # Integrate over time for _ in range(n_steps): y = euler_maruyama_step(y, dt, noise_scale) # Extract output values output_values = [] for node in output_nodes: idx = node_to_idx[node] output_values.append(y[idx]) return np.array(output_values) def generate_samples(n_samples): """Generate samples using the stochastic ODE system.""" x_samples = [] y_samples = [] n_steps = int(t_final / dt) for _ in range(n_samples): # Sample input values from standard normal distribution x_values = np.random.normal(0, 1, len(input_nodes)) # Solve the stochastic ODE system y_values = solve_sde(x_values, n_steps) x_samples.append(x_values) y_samples.append(y_values) return np.array(x_samples), np.array(y_samples) # Generate training and test samples x_train, y_train = generate_samples(n_train) x_test, y_test = generate_samples(n_test) return x_train, y_train, x_test, y_test