Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions SU2_PY/SU2/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@
from .config import Config
from .state import State_Factory as State
from .historyMap import history_header_map as historyOutFields

try:
from .fwh import FWHData
except ImportError: # numpy not available in all environments
pass
206 changes: 206 additions & 0 deletions SU2_PY/SU2/io/fwh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# \file fwh.py
# \brief Reader for SU2 FWH binary surface data files (fwh_bin.dat).
#
# The binary format written by the SU2 FWH surface writer:
# Header : 3 × uint32 (little-endian)
# [0] start_time_us — simulation time at first stored timestep (microseconds)
# [1] n_timesteps — number of timesteps in this file
# [2] n_points — number of surface points
# Data : n_timesteps × n_points × float32 (little-endian), row-major
# pressure[t, i] — static pressure at surface point i, timestep t (Pa)
#
# Usage:
# from SU2.io.fwh import FWHData
# fwh = FWHData.from_file("fwh_bin.dat", time_step=0.0015)
# print(fwh.pressure.shape) # (n_timesteps, n_points)
# print(fwh.time) # physical time array in seconds
#
# \author Riddhi (GSoC 2026 candidate)
# \version 8.4.0 "Harrier"

import struct
import numpy as np
from dataclasses import dataclass
from pathlib import Path
from typing import Optional


@dataclass
class FWHData:
"""Container for FWH binary surface pressure data read from fwh_bin.dat.

Attributes
----------
pressure : np.ndarray, shape (n_timesteps, n_points), float32
Static pressure at each FWH surface point for each timestep (Pa).
time : np.ndarray, shape (n_timesteps,), float64
Physical simulation time corresponding to each timestep (seconds).
n_timesteps : int
Number of timesteps stored in the file.
n_points : int
Number of surface quadrature points on the FWH surface.
start_time_us : int
Raw start-time tag from file header (microseconds, uint32).
source_file : str
Path to the file this data was read from.
"""

pressure: np.ndarray
time: np.ndarray
n_timesteps: int
n_points: int
start_time_us: int
source_file: str = ""

# ------------------------------------------------------------------
# Construction
# ------------------------------------------------------------------

@classmethod
def from_file(cls, path: str, time_step: float = 0.0) -> "FWHData":
"""Read a SU2 FWH binary file.

Parameters
----------
path : str or Path
Path to fwh_bin.dat (or equivalent).
time_step : float, optional
CFD time step in seconds (TIME_STEP in the .cfg file).
Used to build the physical time array. If 0.0, the time
array is integer timestep indices cast to float.

Returns
-------
FWHData
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"FWH binary file not found: {path}")

with open(path, "rb") as fh:
raw = fh.read()

# --- header ---------------------------------------------------
if len(raw) < 12:
raise ValueError(
f"File too small to contain a valid FWH header: {len(raw)} bytes"
)

start_time_us, n_t, n_pts = struct.unpack_from("<3I", raw, 0)
expected_data = n_t * n_pts * 4 # float32
actual_data = len(raw) - 12

if actual_data != expected_data:
raise ValueError(
f"FWH binary size mismatch: header says {n_t}×{n_pts} "
f"({expected_data} bytes) but file has {actual_data} data bytes. "
f"File may be truncated or use a different layout."
)

# --- data block -----------------------------------------------
pressure = np.frombuffer(raw, dtype="<f4", offset=12).reshape(n_t, n_pts).copy()

# --- time axis ------------------------------------------------
if time_step > 0.0:
t0 = start_time_us * 1e-6 # convert μs → s
time = t0 + np.arange(n_t) * time_step
else:
time = np.arange(n_t, dtype=np.float64)

return cls(
pressure=pressure,
time=time,
n_timesteps=int(n_t),
n_points=int(n_pts),
start_time_us=int(start_time_us),
source_file=str(path),
)

# ------------------------------------------------------------------
# Derived quantities for FWH post-processing
# ------------------------------------------------------------------

def pressure_perturbation(self, p_ref: Optional[float] = None) -> np.ndarray:
"""Return p' = p - p_mean (or p - p_ref if supplied).

Parameters
----------
p_ref : float, optional
Reference (free-stream) pressure in Pa. If None, the
time-mean pressure at each point is used.

Returns
-------
np.ndarray, shape (n_timesteps, n_points)
"""
if p_ref is not None:
return self.pressure - p_ref
return self.pressure - self.pressure.mean(axis=0, keepdims=True)

def time_mean_pressure(self) -> np.ndarray:
"""Time-averaged pressure at each surface point (Pa).

Returns
-------
np.ndarray, shape (n_points,)
"""
return self.pressure.mean(axis=0)

def pressure_rms(self) -> np.ndarray:
"""RMS of pressure fluctuation p' at each surface point (Pa).

Returns
-------
np.ndarray, shape (n_points,)
"""
return self.pressure_perturbation().std(axis=0)

# ------------------------------------------------------------------
# I/O helpers
# ------------------------------------------------------------------

def to_dict(self) -> dict:
"""Return a plain dict suitable for JSON serialisation or DataFrame construction."""
return {
"source_file": self.source_file,
"n_timesteps": self.n_timesteps,
"n_points": self.n_points,
"start_time_us": self.start_time_us,
"time": self.time.tolist(),
"pressure_mean": self.time_mean_pressure().tolist(),
"pressure_rms": self.pressure_rms().tolist(),
}

def to_dataframe(self):
"""Return a pandas DataFrame with columns [time, point_0, point_1, ...].

Requires pandas. Each column ``point_i`` contains the pressure
time-series at surface point i.

Returns
-------
pandas.DataFrame
"""
try:
import pandas as pd
except ImportError as exc:
raise ImportError(
"pandas is required for FWHData.to_dataframe(). "
"Install it with: pip install pandas"
) from exc

cols = {"time": self.time}
cols.update({f"point_{i}": self.pressure[:, i] for i in range(self.n_points)})
return pd.DataFrame(cols)

# ------------------------------------------------------------------
# Dunder
# ------------------------------------------------------------------

def __repr__(self) -> str:
return (
f"FWHData(n_timesteps={self.n_timesteps}, n_points={self.n_points}, "
f"t=[{self.time[0]:.4f}, {self.time[-1]:.4f}] s, "
f"p=[{self.pressure.min():.1f}, {self.pressure.max():.1f}] Pa)"
)
Empty file added SU2_PY/tests/__init__.py
Empty file.
132 changes: 132 additions & 0 deletions SU2_PY/tests/test_fwh_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""Tests for SU2.io.fwh — FWH binary surface data reader.

Run with:
cd /home/riddhi/SU2
python3 -m pytest SU2_PY/tests/test_fwh_reader.py -v
"""

import struct
import sys
from pathlib import Path

import numpy as np
import pytest

sys.path.insert(0, str(Path(__file__).parent.parent))
from SU2.io.fwh import FWHData

REPO_ROOT = Path(__file__).parent.parent.parent
REFERENCE_FILE = REPO_ROOT / "TestCases/unsteady/square_cylinder/fwh_bin.dat"


def make_synthetic_fwh(path, n_t=10, n_pts=5, start_us=1000):
rng = np.random.default_rng(42)
pressure = (101325.0 + rng.standard_normal((n_t, n_pts)) * 100).astype("<f4")
with open(path, "wb") as fh:
fh.write(struct.pack("<3I", start_us, n_t, n_pts))
fh.write(pressure.tobytes())
return pressure


class TestFWHDataSynthetic:
def test_roundtrip_shape(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
assert fwh.pressure.shape == (10, 5)
assert fwh.n_timesteps == 10
assert fwh.n_points == 5

def test_roundtrip_values(self, tmp_path):
p = make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
np.testing.assert_allclose(fwh.pressure, p, rtol=1e-6)

def test_time_axis_with_dt(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5, start_us=500000)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
assert fwh.time[0] == pytest.approx(0.5, rel=1e-6)
assert fwh.time[-1] == pytest.approx(0.5 + 9 * 0.0015, rel=1e-6)
assert len(fwh.time) == 10

def test_time_axis_no_dt(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=8, n_pts=3)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
np.testing.assert_array_equal(fwh.time, np.arange(8, dtype=float))

def test_pressure_perturbation_zero_mean(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
p_prime = fwh.pressure_perturbation()
np.testing.assert_allclose(
p_prime.mean(axis=0), 0.0, atol=0.01
) # float32 quantisation

def test_pressure_perturbation_with_ref(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
p_prime = fwh.pressure_perturbation(p_ref=101325.0)
np.testing.assert_allclose(p_prime, fwh.pressure - 101325.0, rtol=1e-6)

def test_file_not_found(self):
with pytest.raises(FileNotFoundError, match="not found"):
FWHData.from_file("/nonexistent/path/fwh_bin.dat")

def test_truncated_file_raises(self, tmp_path):
p = tmp_path / "fwh_bin.dat"
p.write_bytes(struct.pack("<3I", 0, 10, 5) + b"\x00" * 100)
with pytest.raises(ValueError, match="size mismatch"):
FWHData.from_file(p)

def test_repr(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat")
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
r = repr(fwh)
assert "FWHData" in r
assert "n_timesteps=10" in r

def test_to_dict_keys(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat")
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
d = fwh.to_dict()
for key in ("n_timesteps", "n_points", "time", "pressure_mean", "pressure_rms"):
assert key in d

def test_time_mean_pressure_shape(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
assert fwh.time_mean_pressure().shape == (5,)

def test_pressure_rms_shape(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
assert fwh.pressure_rms().shape == (5,)


@pytest.mark.skipif(
not REFERENCE_FILE.exists(),
reason="Reference fwh_bin.dat not found — run from repo root",
)
class TestFWHDataReference:
@pytest.fixture(scope="class")
def fwh(self):
return FWHData.from_file(REFERENCE_FILE, time_step=0.0015)

def test_shape(self, fwh):
assert fwh.n_timesteps == 50
assert fwh.n_points == 200
assert fwh.pressure.shape == (50, 200)

def test_pressure_range(self, fwh):
assert fwh.pressure.min() > 800.0
assert fwh.pressure.max() < 1200.0

def test_unsteady_variation(self, fwh):
assert fwh.pressure.std(axis=0).max() > 10.0

def test_time_step_spacing(self, fwh):
diffs = np.diff(fwh.time)
np.testing.assert_allclose(diffs, 0.0015, rtol=1e-6)

def test_pressure_rms_max(self, fwh):
assert fwh.pressure_rms().max() == pytest.approx(74.22, abs=0.5)
Loading