Source code for solver.pde_solver

"""PDE solver for multivariate scalar fields using finite differences.

Supports 2D elliptic PDEs of the general form:

    a(x,y)*f_xx + b(x,y)*f_xy + c(x,y)*f_yy + d(x,y)*f_x + e(x,y)*f_y + g(x,y)*f = rhs(x,y)

The solver probes the user-supplied residual function at each grid point to
extract local coefficients, then assembles a sparse linear system using central
differences (5-point stencil for the Laplacian, central differences for first
derivatives and mixed derivative).

Supports:
- Arbitrary domain shapes via boolean masks
- Dirichlet and Neumann boundary conditions
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable

import numpy as np

from utils import SolverFailedError, get_logger, normalize_params

logger = get_logger(__name__)

# Boundary condition type constants
BC_DIRICHLET = "dirichlet"
BC_NEUMANN = "neumann"


[docs] @dataclass class PDESolution: """Container for PDE solution data. Attributes: grid: Tuple of 1D arrays, one per variable (e.g. (x_vals, y_vals)). u: Solution array. For 2D: shape (ny, nx). Exterior points are NaN. success: Whether the solver converged. message: Solver status message. n_eval: Number of iterations (if applicable). mask: Optional boolean mask (ny, nx). True = inside domain. """ grid: tuple[np.ndarray, ...] u: np.ndarray success: bool message: str n_eval: int = 0 mask: np.ndarray | None = None
def _classify_mask( mask: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, dict[tuple[int, int], int]]: """Classify grid points as interior, boundary, or exterior. A point (i, j) where mask[j, i] is True is *boundary* if any of its 4 direct neighbors is False or lies outside the grid. Otherwise it is *interior*. Args: mask: Boolean array of shape (ny, nx). Returns: (interior_mask, boundary_mask, index_map) - interior_mask: bool (ny, nx) - boundary_mask: bool (ny, nx) - index_map: {(i, j): flat_index} for interior points only """ ny, nx = mask.shape interior = np.zeros_like(mask) boundary = np.zeros_like(mask) for j in range(ny): for i in range(nx): if not mask[j, i]: continue # Check if all 4 neighbors are inside the mask is_interior = True for di, dj in ((1, 0), (-1, 0), (0, 1), (0, -1)): ni, nj = i + di, j + dj if ni < 0 or ni >= nx or nj < 0 or nj >= ny or not mask[nj, ni]: is_interior = False break if is_interior: interior[j, i] = True else: boundary[j, i] = True # Build index map for interior points (row-major order) index_map: dict[tuple[int, int], int] = {} idx = 0 for j in range(ny): for i in range(nx): if interior[j, i]: index_map[(i, j)] = idx idx += 1 return interior, boundary, index_map def _probe_coefficients( residual_func: Callable[..., float], xi: float, yj: float, params: dict, ) -> tuple[float, float, float, float, float, float, float]: """Probe the residual function to extract linear PDE coefficients. Given residual R(x,y,f,fx,fy,fxx,fxy,fyy) which should equal zero, we assume R is affine in (f, fx, fy, fxx, fxy, fyy): R = a*fxx + b*fxy + c*fyy + d*fx + e*fy + g*f + rhs_const We extract a,b,c,d,e,g by probing with unit perturbations. Args: residual_func: Callable returning residual at (x,y) with derivatives. xi: x-coordinate of the grid point. yj: y-coordinate of the grid point. params: Parameter dict passed to residual_func. Returns: Tuple of (a_fxx, b_fxy, c_fyy, d_fx, e_fy, g_f, rhs_const). """ # R(all zeros) gives the constant part (negated RHS) r0 = residual_func(xi, yj, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, **params) # Probe each derivative direction g_f = residual_func(xi, yj, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, **params) - r0 d_fx = residual_func(xi, yj, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, **params) - r0 e_fy = residual_func(xi, yj, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, **params) - r0 a_fxx = residual_func(xi, yj, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, **params) - r0 b_fxy = residual_func(xi, yj, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, **params) - r0 c_fyy = residual_func(xi, yj, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, **params) - r0 return (a_fxx, b_fxy, c_fyy, d_fx, e_fy, g_f, r0) def _default_rectangular_mask(nx: int, ny: int) -> np.ndarray: """Create a mask where every point is inside the domain. Args: nx: Number of x grid points. ny: Number of y grid points. Returns: Boolean array (ny, nx) with all True. """ return np.ones((ny, nx), dtype=bool)
[docs] def solve_pde_2d( residual_func: Callable[..., float], x_min: float, x_max: float, y_min: float, y_max: float, nx: int, ny: int, bc_values: np.ndarray | None = None, parameters: dict[str, float] | None = None, mask: np.ndarray | None = None, bc_type: np.ndarray | None = None, bc_neumann_value: np.ndarray | None = None, ) -> PDESolution: """Solve a general 2D linear elliptic PDE using finite differences. The residual_func is called as ``residual_func(x, y, f, f_x, f_y, f_xx, f_xy, f_yy, **params)`` and should return the value that must be zero at the solution. For example, for the equation ``-f_xx - f_yy = sin(pi*x)*sin(pi*y)``, the residual is ``-f_xx - f_yy - sin(pi*x)*sin(pi*y)``. Args: residual_func: Callable returning residual at (x,y) with derivatives. x_min: Domain x start. x_max: Domain x end. y_min: Domain y start. y_max: Domain y end. nx: Number of x grid points. ny: Number of y grid points. bc_values: Dirichlet boundary values, shape (ny, nx). Default: zeros. parameters: Optional dict of parameters for residual_func. mask: Boolean array (ny, nx). True = inside domain. None = full rectangle. bc_type: String array (ny, nx) with "dirichlet" or "neumann" per point. Only boundary points are used. Default: all Dirichlet. bc_neumann_value: Float array (ny, nx) with normal derivative values for Neumann boundary points. Default: zeros. Returns: PDESolution with grid, solution array, and mask. """ from scipy import sparse from scipy.sparse.linalg import spsolve params = normalize_params(parameters) if nx < 3 or ny < 3: raise SolverFailedError("Grid must have at least 3 points per dimension") x = np.linspace(x_min, x_max, nx) y = np.linspace(y_min, y_max, ny) hx = (x_max - x_min) / (nx - 1) if nx > 1 else 1.0 hy = (y_max - y_min) / (ny - 1) if ny > 1 else 1.0 # Build mask if mask is None: mask = _default_rectangular_mask(nx, ny) # Default BC arrays if bc_values is None: bc_values = np.zeros((ny, nx)) if bc_neumann_value is None: bc_neumann_value = np.zeros((ny, nx)) # Classify points interior_mask, boundary_mask, index_map = _classify_mask(mask) n_interior = len(index_map) if n_interior <= 0: u = np.full((ny, nx), np.nan) u[mask] = 0.0 if bc_values is not None: u[boundary_mask] = bc_values[boundary_mask] return PDESolution( grid=(x, y), u=u, success=True, message="No interior points", n_eval=0, mask=mask, ) # Finite difference weights inv_hx2 = 1.0 / (hx * hx) inv_hy2 = 1.0 / (hy * hy) inv_2hx = 1.0 / (2.0 * hx) inv_2hy = 1.0 / (2.0 * hy) inv_4hxhy = 1.0 / (4.0 * hx * hy) rows: list[int] = [] cols: list[int] = [] data: list[float] = [] b_vec = np.zeros(n_interior) def _is_neumann(ni: int, nj: int) -> bool: if bc_type is None: return False return bc_type[nj, ni] == BC_NEUMANN for (i, j), k in index_map.items(): try: a_c, bxy, c_c, d_c, e_c, g_c, r0 = _probe_coefficients( residual_func, x[i], y[j], params, ) except Exception as exc: logger.error("PDE coefficient probe failed at (%g, %g): %s", x[i], y[j], exc) raise SolverFailedError(f"Coefficient probe failed: {exc}") from exc # RHS: -r0 (since R = operator(f) + r0 = 0 => operator(f) = -r0) b_vec[k] = -r0 # Central point coefficient center_coeff = -2.0 * a_c * inv_hx2 - 2.0 * c_c * inv_hy2 + g_c rows.append(k) cols.append(k) data.append(center_coeff) def _add_neighbor(ni: int, nj: int, coeff: float) -> None: """Add matrix entry for neighbor or move to RHS for boundary.""" if (ni, nj) in index_map: # Interior neighbor → matrix entry rows.append(k) cols.append(index_map[(ni, nj)]) data.append(coeff) elif 0 <= ni < nx and 0 <= nj < ny and boundary_mask[nj, ni]: # Boundary neighbor if _is_neumann(ni, nj): # Neumann BC: ghost-point substitution # The outward normal direction from interior (i,j) to boundary # (ni,nj). We use: u_boundary = u_interior + h * g_N # where h is the step size and g_N is the prescribed derivative. di, dj = ni - i, nj - j if di != 0: h_step = hx * di # signed step else: h_step = hy * dj ghost_val = h_step * bc_neumann_value[nj, ni] # u[nj,ni] = u[j,i] + ghost_val # Substitute into stencil: coeff * u[nj,ni] = coeff * (u[j,i] + ghost_val) # Add coeff to center and move coeff*ghost_val to RHS rows.append(k) cols.append(k) data.append(coeff) # adds to center coefficient b_vec[k] -= coeff * ghost_val else: # Dirichlet BC: move known value to RHS b_vec[k] -= coeff * bc_values[nj, ni] # else: exterior point — contributes nothing (zero flux assumption) # f_xx stencil + f_x contribution _add_neighbor(i - 1, j, a_c * inv_hx2 - d_c * inv_2hx) _add_neighbor(i + 1, j, a_c * inv_hx2 + d_c * inv_2hx) # f_yy stencil + f_y contribution _add_neighbor(i, j - 1, c_c * inv_hy2 - e_c * inv_2hy) _add_neighbor(i, j + 1, c_c * inv_hy2 + e_c * inv_2hy) # f_xy stencil (cross derivative) if abs(bxy) > 1e-15: _add_neighbor(i + 1, j + 1, bxy * inv_4hxhy) _add_neighbor(i - 1, j + 1, -bxy * inv_4hxhy) _add_neighbor(i + 1, j - 1, -bxy * inv_4hxhy) _add_neighbor(i - 1, j - 1, bxy * inv_4hxhy) A = sparse.coo_matrix( (data, (rows, cols)), shape=(n_interior, n_interior), ).tocsr() try: u_flat = spsolve(A, b_vec) except Exception as exc: logger.error("PDE linear solver failed: %s", exc, exc_info=True) raise SolverFailedError(f"Linear solver failed: {exc}") from exc # Build solution: NaN outside domain, BC on boundary, solved values inside u = np.full((ny, nx), np.nan) for (i, j), k in index_map.items(): u[j, i] = u_flat[k] # Set boundary values for j in range(ny): for i in range(nx): if boundary_mask[j, i]: if _is_neumann(i, j): # For Neumann boundary points, approximate value from # nearest interior neighbor + h * g_N u[j, i] = _estimate_neumann_boundary_value( i, j, u, index_map, interior_mask, hx, hy, bc_neumann_value[j, i], ) else: u[j, i] = bc_values[j, i] logger.info("PDE 2D solved: %dx%d grid, %d interior points", nx, ny, n_interior) return PDESolution( grid=(x, y), u=u, success=True, message="OK", n_eval=0, mask=mask, )
def _estimate_neumann_boundary_value( bi: int, bj: int, u: np.ndarray, index_map: dict[tuple[int, int], int], interior_mask: np.ndarray, hx: float, hy: float, neumann_val: float, ) -> float: """Estimate the solution value at a Neumann boundary point. Uses the nearest interior neighbor's value plus h * g_N where g_N is the prescribed normal derivative. Args: bi: x-index of the boundary point. bj: y-index of the boundary point. u: Solution array (ny, nx). index_map: Mapping from (i,j) to flat index for interior points. interior_mask: Boolean array marking interior points. hx: Grid spacing in x direction. hy: Grid spacing in y direction. neumann_val: Prescribed normal derivative value. Returns: Estimated solution value at the Neumann boundary point. """ ny, nx = u.shape for di, dj in ((1, 0), (-1, 0), (0, 1), (0, -1)): ni, nj = bi + di, bj + dj if 0 <= ni < nx and 0 <= nj < ny and interior_mask[nj, ni]: val = u[nj, ni] if not np.isnan(val): # Step from interior to boundary if di != 0: return val + hx * abs(di) * neumann_val else: return val + hy * abs(dj) * neumann_val # Fallback: use 0 if no interior neighbor found return 0.0