# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.
"""
.. Fourier Analysis (see parent package :mod:`sigima.tools.signal`).
"""
# pylint: disable=invalid-name # Allows short reference names like x, y, ...
from __future__ import annotations
from typing import Literal
import numpy as np
import scipy.signal # type: ignore[import]
from sigima.tools.checks import check_1d_arrays, normalize_kernel
from sigima.tools.signal.dynamic import sampling_rate
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def zero_padding(
x: np.ndarray, y: np.ndarray, n_prepend: int = 0, n_append: int = 0
) -> tuple[np.ndarray, np.ndarray]:
"""Prepend and append zeros.
This function pads the input signal with zeros at the beginning and end.
Args:
x: X data.
y: Y data.
n_prepend: Number of zeros to prepend.
n_append: Number of zeros to append.
Returns:
Tuple (xnew, ynew): Padded x and y.
"""
if n_prepend < 0:
raise ValueError("Number of zeros to prepend must be non-negative.")
if n_append < 0:
raise ValueError("Number of zeros to append must be non-negative.")
dx = np.mean(np.diff(x))
xnew = np.linspace(
x[0] - n_prepend * dx,
x[-1] + n_append * dx,
y.size + n_prepend + n_append,
)
ynew = np.pad(y, (n_prepend, n_append), mode="constant")
return xnew, ynew
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def fft1d(
x: np.ndarray, y: np.ndarray, shift: bool = True
) -> tuple[np.ndarray, np.ndarray]:
"""Compute the Fast Fourier Transform (FFT) of a 1D real signal.
Args:
x: Time domain axis (evenly spaced).
y: Signal values.
shift: If True, shift zero frequency and its corresponding FFT component to the
center.
Returns:
Tuple (f, sp): Frequency axis and corresponding FFT values.
"""
dt = np.mean(np.diff(x))
f = np.fft.fftfreq(x.size, d=dt) # Frequency axis
sp = np.fft.fft(y) # Spectrum values
if shift:
f = np.fft.fftshift(f)
sp = np.fft.fftshift(sp)
return f, sp
[docs]
@check_1d_arrays(x_evenly_spaced=False, x_sorted=False, y_dtype=np.complexfloating)
def ifft1d(
f: np.ndarray, sp: np.ndarray, initial: float = 0.0
) -> tuple[np.ndarray, np.ndarray]:
"""Compute the inverse Fast Fourier Transform (FFT) of a 1D complex spectrum.
Args:
f: Frequency axis (evenly spaced).
sp: FFT values.
initial: Starting value for the time axis.
Returns:
Tuple (x, y): Time axis and real signal.
Raises:
ValueError: If frequency array is not evenly spaced or has fewer than 2 points.
"""
if f.size < 2:
raise ValueError("Frequency array must have at least two elements.")
if np.all(np.diff(f) >= 0.0):
# If frequencies are sorted, assume input is shifted.
# The spectrum needs to be unshifted.
sp = np.fft.ifftshift(sp)
else:
# Otherwise assume input is not shifted.
# The frequencies need to be shifted.
f = np.fft.fftshift(f)
diff_f = np.diff(f)
df = np.mean(diff_f)
if not np.allclose(diff_f, df):
raise ValueError("Frequency array must be evenly spaced.")
y = np.fft.ifft(sp)
dt = 1.0 / (f.size * df)
x = np.linspace(initial, initial + (y.size - 1) * dt, y.size)
return x, y.real
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def magnitude_spectrum(
x: np.ndarray, y: np.ndarray, decibel: bool = False
) -> tuple[np.ndarray, np.ndarray]:
"""Compute magnitude spectrum.
Args:
x: X data.
y: Y data.
decibel: Compute the magnitude spectrum root-power level in decibel (dB).
Returns:
Tuple (f, mag_spectrum): Frequency values and magnitude spectrum.
"""
f, spectrum = fft1d(x, y)
mag_spectrum = np.abs(spectrum)
if decibel:
mag_spectrum = 20 * np.log10(mag_spectrum)
return f, mag_spectrum
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def phase_spectrum(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Compute phase spectrum.
Args:
x: X data.
y: Y data.
Returns:
Tuple (f, phase): Frequency values and phase spectrum in degrees.
"""
f, spectrum = fft1d(x, y)
phase = np.rad2deg(np.angle(spectrum))
return f, phase
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def psd(
x: np.ndarray, y: np.ndarray, decibel: bool = False
) -> tuple[np.ndarray, np.ndarray]:
"""Estimate the Power Spectral Density (PSD) using Welch's method.
Args:
x: X data.
y: Y data.
decibel: Compute the power spectral density power level in decibel (dB).
Returns:
Tuple (f, welch_psd): Frequency values and PSD.
"""
f, welch_psd = scipy.signal.welch(y, fs=sampling_rate(x))
if decibel:
welch_psd = 10 * np.log10(welch_psd)
return f, welch_psd
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def convolve(
x: np.ndarray,
y: np.ndarray,
h: np.ndarray,
boundary: Literal["reflect", "symmetric", "edge", "wrap"] = "reflect",
normalize_kernel_flag: bool = True,
method: Literal["auto", "direct", "fft"] = "auto",
correct_group_delay: bool = True,
) -> np.ndarray:
"""Convolve a 1D signal with a kernel, avoiding border artifacts and x-shift.
The input signal is padded before convolution, then a 'valid' extraction is
used to return exactly len(y) samples. Non-zero padding (e.g. "reflect")
prevents the typical edge attenuation caused by implicit zero-padding.
If the kernel is asymmetric, an optional group-delay correction recenters the
output on the same x-grid (no shift), using sub-sample interpolation.
Args:
x: 1D monotonically increasing and uniformly spaced axis (same length as y).
y: 1D input signal.
h: 1D convolution kernel (impulse response).
boundary: Padding mode passed to ``np.pad`` ("reflect" recommended).
normalize_kernel_flag: If True, normalize kernel so that ``h.sum() == 1`` to
preserve DC level.
method: Convolution method for ``scipy.signal.convolve``.
correct_group_delay: If True, compensate the kernel center-of-mass shift
(group delay) to avoid any x-shift in the output.
Returns:
Convolved signal with the same length as ``y``, aligned on ``x``.
Raises:
ValueError: If inputs are not 1D, empty, or shapes are inconsistent.
Notes:
Precondition: ``x`` is strictly increasing with constant spacing. This is
required for standard discrete convolution to represent a physical LTI
filtering on the given grid.
"""
if h.size != y.size:
raise ValueError("X data and Y data of the filter must have the same size.")
# ---- Optional DC preservation
if normalize_kernel_flag:
h = normalize_kernel(h)
M = int(h.size)
if M == 1:
# With normalization, h == [1]; otherwise scale by h[0]
return y.copy() if normalize_kernel_flag else y * h[0]
# ---- Compute asymmetric pad widths so that 'valid' returns exactly len(y)
w_left = M // 2
w_right = (M - 1) - w_left
# ---- Pad the signal to mitigate border artifacts during convolution
y_pad = np.pad(y, (w_left, w_right), mode=boundary)
# ---- Linear convolution with 'valid' to get back exactly N samples
y_conv = scipy.signal.convolve(y_pad, h, mode="valid", method=method)
if correct_group_delay:
# Center-of-mass of the kernel in sample units relative to w_left.
# n runs from -w_left ... +w_right (integer sample offsets).
n = np.arange(M, dtype=float) - w_left
denom = h.sum() if h.sum() != 0.0 else 1.0
mu_samples = float(np.dot(n, h) / denom) # may be fractional
if np.isfinite(mu_samples) and mu_samples != 0.0:
# Sub-sample compensation on the *x-axis* to keep alignment.
# Positive mu_samples means the effective kernel center is to the right
# (additional delay); compensate by advancing the output.
dx = float(x[1] - x[0]) # uniform spacing guaranteed by your decorator
x_shifted = x + mu_samples * dx
# Interpolate with edge holding to maintain length and alignment
y_conv = np.interp(x, x_shifted, y_conv, left=y_conv[0], right=y_conv[-1])
return y_conv
def _psf_to_otf_1d(h: np.ndarray, L: int) -> np.ndarray:
"""Convert a centered 1D PSF h to an OTF (RFFT length L).
The PSF center (index floor((M-1)/2)) is shifted to index 0 before FFT so that
the convolution geometry matches 'same' with a centered kernel.
Args:
h: 1D convolution kernel (PSF).
L: Length of the output OTF (RFFT length, power of two recommended).
Returns:
OTF as a 1D complex array of length L//2 + 1 (RFFT output).
"""
M = h.size
w_left = M // 2
h0 = np.roll(h, -w_left) # center -> index 0
h_z = np.zeros(L, dtype=float)
h_z[:M] = h0
return np.fft.rfft(h_z)
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def deconvolve(
x: np.ndarray,
y: np.ndarray,
h: np.ndarray,
*,
boundary: Literal["reflect", "symmetric", "edge", "wrap"] = "reflect",
normalize_kernel_flag: bool = True,
# regularized inverse with derivative prior (recommended):
method: Literal["wiener", "fft"] = "wiener",
reg: float = 5e-2, # increase to reduce ringing (e.g. 5e-2, 1e-1)
gain_max: float | None = 10.0, # clamp max |gain| in frequency (None to disable)
dc_lock: bool = True, # force exact DC gain (preserve plateau)
auto_scale: bool = True, # auto-correct amplitude scaling after deconvolution
) -> np.ndarray:
"""Deconvolve a 1D signal with frequency-dependent regularization and DC lock.
Strategy:
1) Pad y with the same geometry as the ``convolve`` step (x-uniform grid).
2) Build OTF ``H(f)`` from the centered PSF ``h``.
3) Compute inverse filter:
- ``wiener`` (recommended): ``H*(f) / (|H|^2 + reg * |D|^2)``, with
``|D|^2 = (2 sin(ω/2))^2`` (1st-derivative prior).
- ``fft``: bare inverse ``1/H(f)`` (unstable; only for noise-free data).
- Optionally clamp ``|G(f)| ≤ gain_max`` and lock DC gain.
4) IFFT, then extract the central unpadded segment (``len == len(y)``).
5) Optionally auto-scale the result to correct amplitude bias from regularization.
Args:
x: Strictly increasing, uniformly spaced axis (same length as y).
y: Observed signal (result of ``y_true ⊛ h``, plus noise).
h: Centered convolution kernel (PSF).
boundary: Padding mode (should match your convolution).
normalize_kernel_flag: If True, normalize ``h`` to preserve DC.
method: ``"wiener"`` (regularized inverse) or ``"fft"`` (bare inverse).
reg: Regularization strength for the derivative prior.
gain_max: Optional clamp on ``|G(f)|`` to avoid wild amplification.
dc_lock: If True, enforce exact DC gain (preserve mean/plateau).
auto_scale: If True, auto-correct amplitude scaling after deconvolution.
Returns:
Deconvolved signal with the same length as y, x-aligned.
"""
if x.ndim != 1 or y.ndim != 1 or h.ndim != 1:
raise ValueError("`x`, `y`, and `h` must be 1D arrays.")
if y.size == 0 or h.size == 0 or x.size != y.size:
raise ValueError("Non-empty arrays required and `x` length must match `y`.")
if y.size != h.size:
raise ValueError("X data and Y data of the filter must have the same size.")
if np.all(h == 0.0):
raise ValueError("Filter is all zeros, cannot be used to deconvolve.")
y = np.asarray(y, dtype=float)
h = np.asarray(h, dtype=float)
# Check if kernel normalization is requested
if normalize_kernel_flag:
h = normalize_kernel(h)
M = int(h.size)
if M == 1:
return y.copy() # normalized h == [1]
# Padding identical to your convolve() geometry
w_left = M // 2
w_right = (M - 1) - w_left
y_pad = np.pad(y, (w_left, w_right), mode=boundary)
N = y.size
Npad = y_pad.size # N + (M - 1)
# FFT size for linear convolution equivalence
L_needed = Npad + M - 1
L = 1 << int(np.ceil(np.log2(L_needed)))
# Build spectra
y_z = np.zeros(L, dtype=float)
y_z[:Npad] = y_pad
Y = np.fft.rfft(y_z)
H = _psf_to_otf_1d(h, L)
if method == "wiener":
# Derivative prior: |D(ω)|^2 = (2 sin(ω/2))^2
k = np.arange(H.size, dtype=float)
omega = 2.0 * np.pi * k / L
D2 = (2.0 * np.sin(omega / 2.0)) ** 2
Hc = np.conjugate(H)
H2 = (H * Hc).real
denom = H2 + float(reg) * D2
# Lock exact DC gain (avoid plateau bias)
if dc_lock:
denom[0] = H2[0] # since D2[0] = 0, this already holds; keep explicit
G = Hc / denom
elif method == "fft":
eps = 1e-12
G = 1.0 / (H + eps)
else:
raise ValueError("Unknown method. Use 'wiener' or 'fft'.")
# Clamp frequency gain (safety net against spikes)
if gain_max is not None and gain_max > 0:
mag = np.abs(G)
too_big = mag > gain_max
if np.any(too_big):
G[too_big] *= gain_max / mag[too_big]
X = Y * G
y_true_pad = np.fft.irfft(X, n=L)[:Npad]
# Extract central segment (same slicing as convolve)
y_deconv = y_true_pad[w_left : w_left + N]
# Auto-scale to correct amplitude bias from regularization
if auto_scale and method == "wiener" and reg > 0:
# Use energy conservation principle for scaling correction
# The idea: compare input energy to output energy and adjust
# Calculate RMS (root mean square) of input and output
y_rms = np.sqrt(np.mean(y**2)) if len(y) > 0 else 0.0
y_deconv_rms = np.sqrt(np.mean(y_deconv**2)) if len(y_deconv) > 0 else 0.0
if y_rms > 1e-12 and y_deconv_rms > 1e-12:
# Calculate the energy-based scaling factor
energy_ratio = y_rms / y_deconv_rms
# Apply scaling if the ratio is reasonable
# (regularization typically reduces energy)
if 0.5 < energy_ratio < 5.0: # Conservative bounds
y_deconv *= energy_ratio
return y_deconv
[docs]
@check_1d_arrays(x_evenly_spaced=True)
def brickwall_filter(
x: np.ndarray,
y: np.ndarray,
mode: Literal["lowpass", "highpass", "bandpass", "bandstop"],
cut0: float,
cut1: float | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Apply an ideal frequency filter ("brick wall" filter) to a signal.
Args:
x: Time domain axis (evenly spaced).
y: Signal values (same length as x).
mode: Type of filter to apply.
cut0: First cutoff frequency.
cut1: Second cutoff frequency, required for band-pass and band-stop filters.
Returns:
Tuple (x, y_filtered), where y_filtered is the filtered signal.
Raises:
ValueError: If mode is unknown.
ValueError: If cut0 is not positive.
ValueError: If cut1 is missing for band-pass and band-stop filters.
ValueError: If cut0 > cut1 for band-pass and band-stop filters.
"""
if mode not in ("lowpass", "highpass", "bandpass", "bandstop"):
raise ValueError(f"Unknown filter mode: {mode!r}")
if cut0 <= 0.0:
raise ValueError("Cutoff frequency must be positive.")
if mode in ("bandpass", "bandstop"):
if cut1 is None:
raise ValueError(f"cut1 must be specified for mode '{mode}'")
if cut0 > cut1:
raise ValueError("cut0 must be less than or equal to cut1.")
freqs, ffty = fft1d(x, y, shift=False)
if mode == "lowpass":
frequency_mask = np.abs(freqs) <= cut0
elif mode == "highpass":
frequency_mask = np.abs(freqs) >= cut0
elif mode == "bandpass":
frequency_mask = (np.abs(freqs) >= cut0) & (np.abs(freqs) <= cut1)
else: # bandstop
frequency_mask = (np.abs(freqs) <= cut0) | (np.abs(freqs) >= cut1)
ffty_filtered = ffty * frequency_mask
_, y_filtered = ifft1d(freqs, ffty_filtered)
y_filtered = y_filtered.real
return x.copy(), y_filtered