Source code for complex_problems.pipe_flow.solver

"""Steady and transient 1D pipe-flow solvers."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

import numpy as np

from complex_problems.pipe_flow.model import (
    area_from_diameter,
    build_pipe_grid,
    diameter_profile,
    friction_factor,
    reynolds_number,
)
from utils import get_logger

logger = get_logger(__name__)

_MODEL_TYPES = {"steady", "transient"}


[docs] @dataclass class PipeFlowResult: """Result bundle for pipe-flow simulations.""" model_type: str x: np.ndarray t: np.ndarray diameter: np.ndarray area: np.ndarray pressure: np.ndarray velocity: np.ndarray reynolds: np.ndarray friction: np.ndarray flow_rate_mean: np.ndarray flow_rate_std: np.ndarray metadata: dict[str, Any] = field(default_factory=dict) magnitudes: dict[str, float] = field(default_factory=dict)
def _integrate_pressure_from_dpdx(x: np.ndarray, p0: float, dpdx: np.ndarray) -> np.ndarray: p = np.zeros_like(x, dtype=float) p[0] = p0 for i in range(1, len(x)): dx = x[i] - x[i - 1] p[i] = p[i - 1] - 0.5 * (dpdx[i - 1] + dpdx[i]) * dx return p def _solve_steady( *, x: np.ndarray, diameter: np.ndarray, area: np.ndarray, rho: float, mu: float, roughness: float, p_in: float, p_out: float, friction_model: str, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, float]: target_dp = float(p_in - p_out) if abs(target_dp) < 1e-14: u = np.zeros_like(x) re = reynolds_number(rho, mu, u, diameter) f = friction_factor(re, roughness=roughness, diameter=diameter, model=friction_model) p = np.full_like(x, fill_value=p_in, dtype=float) return p, u, re, f, 0.0 sign = 1.0 if target_dp > 0 else -1.0 target_abs = abs(target_dp) def residual(q_abs: float) -> float: u_abs = q_abs / area re = reynolds_number(rho, mu, u_abs, diameter) f = friction_factor(re, roughness=roughness, diameter=diameter, model=friction_model) dpdx = f * rho * u_abs * u_abs / (2.0 * diameter) dp = float(np.trapezoid(dpdx, x)) return dp - target_abs q_hi = max(1e-10, 0.01 * float(np.mean(area))) while residual(q_hi) < 0.0 and q_hi < 1e4: q_hi *= 2.0 q_lo = 0.0 for _ in range(80): q_mid = 0.5 * (q_lo + q_hi) if residual(q_mid) > 0.0: q_hi = q_mid else: q_lo = q_mid q_abs = 0.5 * (q_lo + q_hi) q = sign * q_abs u = q / area re = reynolds_number(rho, mu, u, diameter) f = friction_factor(re, roughness=roughness, diameter=diameter, model=friction_model) dpdx_signed = f * rho * u * np.abs(u) / (2.0 * diameter) p = _integrate_pressure_from_dpdx(x, p_in, dpdx_signed) return p, u, re, f, float(q) def _spatial_derivative(a: np.ndarray, dx: float) -> np.ndarray: out = np.zeros_like(a) out[1:-1] = (a[2:] - a[:-2]) / (2.0 * dx) out[0] = (a[1] - a[0]) / dx out[-1] = (a[-1] - a[-2]) / dx return out def _solve_transient( *, x: np.ndarray, dx: float, diameter: np.ndarray, area: np.ndarray, rho: float, mu: float, roughness: float, friction_model: str, p_base: float, p_out: float, p_amp: float, p_freq_hz: float, wave_speed: float, damping: float, t_max: float, dt: float, sample_every: int, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: cfl = wave_speed * dt / dx if cfl > 0.95: raise ValueError( f"Unstable transient configuration: c*dt/dx={cfl:.3f} > 0.95. " "Reduce dt or increase nx spacing." ) n_steps = int(np.ceil(t_max / dt)) sample_indices = list(range(0, n_steps + 1, sample_every)) if sample_indices[-1] != n_steps: sample_indices.append(n_steps) n_samples = len(sample_indices) t_hist = np.zeros(n_samples, dtype=float) p = np.linspace(p_base, p_out, len(x), dtype=float) u = np.zeros_like(x, dtype=float) p_hist = np.zeros((n_samples, len(x)), dtype=float) u_hist = np.zeros((n_samples, len(x)), dtype=float) re_hist = np.zeros((n_samples, len(x)), dtype=float) f_hist = np.zeros((n_samples, len(x)), dtype=float) q_mean_hist = np.zeros(n_samples, dtype=float) q_std_hist = np.zeros(n_samples, dtype=float) def p_inlet(t: float) -> float: return p_base + p_amp * np.sin(2.0 * np.pi * p_freq_hz * t) def apply_bc(u_arr: np.ndarray, p_arr: np.ndarray, t_now: float) -> None: p_arr[0] = p_inlet(t_now) p_arr[-1] = p_out u_arr[0] = u_arr[1] u_arr[-1] = u_arr[-2] def rhs(u_arr: np.ndarray, p_arr: np.ndarray, t_now: float) -> tuple[np.ndarray, np.ndarray]: u_loc = u_arr.copy() p_loc = p_arr.copy() apply_bc(u_loc, p_loc, t_now) dpdx = _spatial_derivative(p_loc, dx) dudx = _spatial_derivative(u_loc, dx) re = reynolds_number(rho, mu, u_loc, diameter) ff = friction_factor(re, roughness=roughness, diameter=diameter, model=friction_model) wall = ff * u_loc * np.abs(u_loc) / (2.0 * np.maximum(diameter, 1e-9)) du_dt = -(1.0 / rho) * dpdx - damping * u_loc - wall dp_dt = -(rho * wave_speed * wave_speed) * dudx du_dt[0] = 0.0 du_dt[-1] = 0.0 dp_dt[0] = 0.0 dp_dt[-1] = 0.0 return du_dt, dp_dt def sample(idx: int, t_now: float) -> None: re = reynolds_number(rho, mu, u, diameter) ff = friction_factor(re, roughness=roughness, diameter=diameter, model=friction_model) q_profile = u * area t_hist[idx] = t_now p_hist[idx] = p u_hist[idx] = u re_hist[idx] = re f_hist[idx] = ff q_mean_hist[idx] = float(np.mean(q_profile)) q_std_hist[idx] = float(np.std(q_profile)) sample_pos = 0 apply_bc(u, p, 0.0) sample(sample_pos, 0.0) sample_pos += 1 for step in range(1, n_steps + 1): t_now = (step - 1) * dt k1_u, k1_p = rhs(u, p, t_now) k2_u, k2_p = rhs(u + 0.5 * dt * k1_u, p + 0.5 * dt * k1_p, t_now + 0.5 * dt) k3_u, k3_p = rhs(u + 0.5 * dt * k2_u, p + 0.5 * dt * k2_p, t_now + 0.5 * dt) k4_u, k4_p = rhs(u + dt * k3_u, p + dt * k3_p, t_now + dt) u = u + (dt / 6.0) * (k1_u + 2.0 * k2_u + 2.0 * k3_u + k4_u) p = p + (dt / 6.0) * (k1_p + 2.0 * k2_p + 2.0 * k3_p + k4_p) apply_bc(u, p, step * dt) if step in sample_indices: sample(sample_pos, step * dt) sample_pos += 1 return t_hist, p_hist, u_hist, re_hist, f_hist, q_mean_hist, q_std_hist
[docs] def solve_pipe_flow( *, model_type: str = "steady", length: float = 20.0, nx: int = 256, profile: str = "constant", d_in: float = 0.08, d_out: float = 0.05, d0: float = 0.06, profile_amplitude: float = 0.20, profile_waves: float = 2.0, custom_diameter_fn=None, rho: float = 1000.0, mu: float = 1.0e-3, roughness: float = 1.0e-5, friction_model: str = "auto", # Steady settings p_in: float = 2.0e5, p_out: float = 1.9e5, # Transient settings p_base: float = 2.0e5, p_amp: float = 2.0e3, p_freq_hz: float = 2.0, wave_speed: float = 200.0, damping: float = 0.2, t_max: float = 1.0, dt: float = 5.0e-4, sample_every: int = 10, ) -> PipeFlowResult: """Solve steady or transient 1D pipe flow.""" mt = model_type.strip().lower() if mt not in _MODEL_TYPES: raise ValueError(f"model_type must be one of {sorted(_MODEL_TYPES)}") if rho <= 0 or mu <= 0: raise ValueError("rho and mu must be positive.") x, dx = build_pipe_grid(length, nx) diameter = diameter_profile( x, profile=profile, d_in=d_in, d_out=d_out, d0=d0, amplitude=profile_amplitude, n_waves=profile_waves, custom_fn=custom_diameter_fn, ) area = area_from_diameter(diameter) if mt == "steady": p, u, re, ff, q = _solve_steady( x=x, diameter=diameter, area=area, rho=rho, mu=mu, roughness=roughness, p_in=p_in, p_out=p_out, friction_model=friction_model, ) t = np.array([0.0], dtype=float) pressure = p[np.newaxis, :] velocity = u[np.newaxis, :] reynolds = re[np.newaxis, :] friction = ff[np.newaxis, :] q_mean = np.array([q], dtype=float) q_std = np.array([float(np.std(u * area))], dtype=float) magnitudes = { "flow_rate_m3s": float(q), "mean_velocity_ms": float(np.mean(u)), "re_max": float(np.max(re)), "dp_target_pa": float(p_in - p_out), "dp_solved_pa": float(p[0] - p[-1]), } metadata = { "model_type": mt, "length": float(length), "nx": int(nx), "profile": profile, "friction_model": friction_model, "rho": float(rho), "mu": float(mu), "roughness": float(roughness), "p_in": float(p_in), "p_out": float(p_out), } else: t, pressure, velocity, reynolds, friction, q_mean, q_std = _solve_transient( x=x, dx=dx, diameter=diameter, area=area, rho=rho, mu=mu, roughness=roughness, friction_model=friction_model, p_base=p_base, p_out=p_out, p_amp=p_amp, p_freq_hz=p_freq_hz, wave_speed=wave_speed, damping=damping, t_max=t_max, dt=dt, sample_every=sample_every, ) cfl = wave_speed * dt / dx magnitudes = { "max_pressure_pa": float(np.max(pressure)), "max_velocity_ms": float(np.max(np.abs(velocity))), "re_max": float(np.max(reynolds)), "cfl": float(cfl), "mean_q_std_m3s": float(np.mean(q_std)), } metadata = { "model_type": mt, "length": float(length), "nx": int(nx), "profile": profile, "friction_model": friction_model, "rho": float(rho), "mu": float(mu), "roughness": float(roughness), "p_base": float(p_base), "p_out": float(p_out), "p_amp": float(p_amp), "p_freq_hz": float(p_freq_hz), "wave_speed": float(wave_speed), "damping": float(damping), "t_max": float(t_max), "dt": float(dt), "sample_every": int(sample_every), } logger.info( "Solved pipe_flow: model=%s nx=%d profile=%s", mt, nx, profile, ) return PipeFlowResult( model_type=mt, x=x, t=t, diameter=diameter, area=area, pressure=pressure, velocity=velocity, reynolds=reynolds, friction=friction, flow_rate_mean=q_mean, flow_rate_std=q_std, metadata=metadata, magnitudes=magnitudes, )