Source code for sigima.proc.signal.processing

# -*- coding: utf-8 -*-
#
# Licensed under the terms of the BSD 3-Clause
# (see sigima/LICENSE for details)

"""
Signal processing operations
============================

This module provides signal processing operations:

- Zero padding
- Interpolation and resampling
- Convolution and deconvolution
- Signal manipulation functions

.. note::

    Most operations use functions from :mod:`sigima.tools.signal` for actual
    computations.
"""

from __future__ import annotations

import warnings

import guidata.dataset as gds
import numpy as np
import scipy.integrate as spt
import scipy.signal as sps
from guidata.dataset import FuncProp, GetAttrProp

from sigima.config import _
from sigima.config import options as sigima_options
from sigima.enums import Interpolation1DMethod, NormalizationMethod, WindowingMethod
from sigima.objects import ROI1DParam, SignalObj
from sigima.proc.base import ClipParam, NormalizeParam, dst_2_to_1
from sigima.proc.decorator import computation_function
from sigima.tools.signal import fourier, interpolation, scaling, windowing

from .base import dst_1_to_1, is_uncertainty_data_available, restore_data_outside_roi


[docs] class InterpolationParam(gds.DataSet, title=_("Interpolation")): """Interpolation parameters""" method = gds.ChoiceItem( _("Interpolation method"), [ (Interpolation1DMethod.LINEAR, "Linear"), (Interpolation1DMethod.SPLINE, "Spline"), (Interpolation1DMethod.QUADRATIC, "Quadratic"), (Interpolation1DMethod.CUBIC, "Cubic"), (Interpolation1DMethod.BARYCENTRIC, "Barycentric"), (Interpolation1DMethod.PCHIP, "PCHIP"), ], default=Interpolation1DMethod.LINEAR, ) fill_value = gds.FloatItem( _("Fill value"), default=None, help=_( "Value to use for points outside the interpolation domain " "(used only with linear, cubic and pchip methods)." ), check=False, )
[docs] @computation_function() def interpolate(src1: SignalObj, src2: SignalObj, p: InterpolationParam) -> SignalObj: """Interpolate data with :py:func:`sigima.tools.signal.interpolation.interpolate`. Args: src1: Source signal to interpolate. src2: Signal with new x-axis. p: Parameters. Returns: Result signal object. """ suffix = f"method={p.method}" if p.fill_value is not None and p.method in ( Interpolation1DMethod.LINEAR, Interpolation1DMethod.CUBIC, Interpolation1DMethod.PCHIP, ): suffix += f", fill_value={p.fill_value}" dst = dst_2_to_1(src1, src2, "interpolate", suffix) x1, y1 = src1.get_data() xnew, _y2 = src2.get_data() ynew = interpolation.interpolate(x1, y1, xnew, p.method, p.fill_value) dst.set_xydata(xnew, ynew) return dst
[docs] class Resampling1DParam(InterpolationParam): """Resample parameters for 1D signals""" xmin = gds.FloatItem(_("X<sub>min</sub>"), allow_none=True) xmax = gds.FloatItem(_("X<sub>max</sub>"), allow_none=True) _prop = GetAttrProp("dx_or_nbpts") _modes = (("dx", "ΔX"), ("nbpts", _("Number of points"))) mode = gds.ChoiceItem(_("Mode"), _modes, default="nbpts", radio=True).set_prop( "display", store=_prop ) dx = gds.FloatItem("ΔX", allow_none=True).set_prop( "display", active=FuncProp(_prop, lambda x: x == "dx") ) nbpts = gds.IntItem(_("Number of points"), allow_none=True).set_prop( "display", active=FuncProp(_prop, lambda x: x == "nbpts") )
[docs] def update_from_obj(self, obj: SignalObj) -> None: """Update parameters from a signal object.""" if self.xmin is None: self.xmin = obj.x[0] if self.xmax is None: self.xmax = obj.x[-1] if self.dx is None: self.dx = obj.x[1] - obj.x[0] if self.nbpts is None: self.nbpts = len(obj.x)
[docs] @computation_function() def resampling(src: SignalObj, p: Resampling1DParam) -> SignalObj: """Resample data with :py:func:`sigima.tools.signal.interpolation.interpolate` Args: src: source signal p: parameters Returns: Result signal object """ # Create new x-axis based on parameters if p.mode == "dx": assert p.dx is not None xnew = np.arange(p.xmin, p.xmax + p.dx / 2, p.dx) else: assert p.nbpts is not None xnew = np.linspace(p.xmin, p.xmax, p.nbpts) method: Interpolation1DMethod = p.method suffix = f"method={method.value}" if p.fill_value is not None and method in ( Interpolation1DMethod.LINEAR, Interpolation1DMethod.CUBIC, Interpolation1DMethod.PCHIP, ): suffix += f", fill_value={p.fill_value}" dst = dst_1_to_1(src, "resampling", suffix) x, y = src.get_data() ynew = interpolation.interpolate(x, y, xnew, method, p.fill_value) dst.set_xydata(xnew, ynew) return dst
[docs] def check_same_sample_rate(src1: SignalObj, src2: SignalObj) -> None: """Check if two signals have a constant step size *and* the same sample rate. Args: src1: First signal. src2: Second signal. Raises: ValueError: If the signals do not have a constant step size or the same sample rate. """ for sig in (src1, src2): if not np.allclose(np.diff(sig.x), sig.x[1] - sig.x[0]): raise ValueError(f"Signal {sig.title} must have a constant step size (dx).") dx1 = src1.x[1] - src1.x[0] dx2 = src2.x[1] - src2.x[0] if not np.isclose(dx1, dx2): raise ValueError(f"Signals must have the same sample rate (dx): {dx1} != {dx2}")
[docs] @computation_function() def deconvolution(src1: SignalObj, src2: SignalObj) -> SignalObj: """Compute deconvolution. The function computes the deconvolution of a signal using :py:func:`sigima_.algorithms.signal.fourier.deconvolve`. Args: src1: Source signal. src2: Filter signal. Returns: Result signal. Notes: The kernel normalization behavior can be configured globally using ``sigima.config.options.auto_normalize_kernel``. """ check_same_sample_rate(src1, src2) dst = dst_2_to_1(src1, src2, "⊛⁻¹", f"filter={src2.title}") x1, y1 = src1.get_data() _x2, y2 = src2.get_data() # Get kernel normalization option from configuration normalize_kernel = sigima_options.auto_normalize_kernel.get() result_y = fourier.deconvolve( x1, y1, y2, normalize_kernel_flag=normalize_kernel, reg=2.0, gain_max=None, auto_scale=True, ) dst.set_xydata(x1, result_y, None, None) restore_data_outside_roi(dst, src1) return dst
[docs] @computation_function() def normalize(src: SignalObj, p: NormalizeParam) -> SignalObj: """Normalize data with :py:func:`sigima.tools.signal.level.normalize` Args: src: source signal p: parameters Returns: Result signal object """ method: NormalizationMethod = p.method dst = dst_1_to_1(src, "normalize", f"ref={method.value}") x, y = src.get_data() normalized_y = scaling.normalize(y, method) dst.set_xydata(x, normalized_y) # Uncertainty propagation for normalization # σ(y/norm_factor) = σ(y) / norm_factor if is_uncertainty_data_available(src): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) # Calculate normalization factor if method == NormalizationMethod.MAXIMUM: norm_factor = np.nanmax(y) elif method == NormalizationMethod.AMPLITUDE: norm_factor = np.nanmax(y) - np.nanmin(y) elif method == NormalizationMethod.AREA: norm_factor = np.nansum(y) elif method == NormalizationMethod.ENERGY: norm_factor = np.sqrt(np.nansum(np.abs(y) ** 2)) elif method == NormalizationMethod.RMS: norm_factor = np.sqrt(np.nanmean(np.abs(y) ** 2)) else: raise RuntimeError(f"Unsupported normalization method: {method}") if norm_factor != 0: dst.dy = src.dy / np.abs(norm_factor) else: dst.dy[:] = np.nan restore_data_outside_roi(dst, src) return dst
[docs] @computation_function() def derivative(src: SignalObj) -> SignalObj: """Compute derivative with :py:func:`numpy.gradient` Args: src: source signal Returns: Result signal object """ dst = dst_1_to_1(src, "derivative") x, y = src.get_data() dst.set_xydata(x, np.gradient(y, x)) # Uncertainty propagation for numerical derivative # For gradient using finite differences: σ(dy/dx) ≈ σ(y) / Δx if is_uncertainty_data_available(src): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) # Use the same gradient approach as numpy.gradient for uncertainty dst.dy = np.gradient(src.dy, x) dst.dy[np.isinf(dst.dy) | np.isnan(dst.dy)] = np.nan restore_data_outside_roi(dst, src) return dst
[docs] @computation_function() def integral(src: SignalObj) -> SignalObj: """Compute integral with :py:func:`scipy.integrate.cumulative_trapezoid` Args: src: source signal Returns: Result signal object """ dst = dst_1_to_1(src, "integral") x, y = src.get_data() dst.set_xydata(x, spt.cumulative_trapezoid(y, x, initial=0.0)) # Uncertainty propagation for numerical integration # For cumulative trapezoidal integration, uncertainties accumulate if is_uncertainty_data_available(src): # Propagate uncertainties through cumulative trapezoidal rule # σ(∫y dx) ≈ √(Σ(σ(y_i) * Δx_i)²) for independent measurements dx = np.diff(x) dy_squared = src.dy[:-1] ** 2 + src.dy[1:] ** 2 # Trapezoidal rule uncertainty # Propagated variance for trapezoidal integration dst.dy = np.zeros_like(dst.y) # Initialize uncertainty array dst.dy[0] = 0.0 # Initial value has no uncertainty dst.dy[1:] = np.sqrt(np.cumsum(dy_squared * (dx**2) / 4)) restore_data_outside_roi(dst, src) return dst
[docs] class XYCalibrateParam(gds.DataSet, title=_("Calibration")): """Signal calibration parameters""" axes = (("x", _("X-axis")), ("y", _("Y-axis"))) axis = gds.ChoiceItem(_("Calibrate"), axes, default="y") a = gds.FloatItem("a", default=1.0) b = gds.FloatItem("b", default=0.0)
[docs] @computation_function() def calibration(src: SignalObj, p: XYCalibrateParam) -> SignalObj: """Compute linear calibration Args: src: source signal p: parameters Returns: Result signal object """ dst = dst_1_to_1(src, "calibration", f"{p.axis}={p.a}*{p.axis}+{p.b}") x, y = src.get_data() if p.axis == "x": dst.set_xydata(p.a * x + p.b, y, src.dx, src.dy) # For X-axis calibration: uncertainties in x are scaled, y unchanged if is_uncertainty_data_available(src): dst.dx = np.abs(p.a) * src.dx if src.dx is not None else None # Y uncertainties remain the same else: dst.set_xydata(x, p.a * y + p.b, src.dx, src.dy) # For Y-axis calibration: σ(a*y + b) = |a| * σ(y) if is_uncertainty_data_available(src): if dst.dy is not None: dst.dy *= np.abs(p.a) restore_data_outside_roi(dst, src) return dst
[docs] @computation_function() def clip(src: SignalObj, p: ClipParam) -> SignalObj: """Compute maximum data clipping with :py:func:`numpy.clip` Args: src: source signal p: parameters Returns: Result signal object """ dst = dst_1_to_1(src, "clip", f"[{p.lower}, {p.upper}]") x, y = src.get_data() # Compute result result_y = np.clip(y, p.lower, p.upper) dst.set_xydata(x, result_y, src.dx, src.dy) # Uncertainty propagation: σ(clip(y)) = σ(y) where not clipped, 0 where clipped if is_uncertainty_data_available(src): dst.dy = src.dy.copy() if p.lower is not None: dst.dy[y <= p.lower] = 0 if p.upper is not None: dst.dy[y >= p.upper] = 0 restore_data_outside_roi(dst, src) return dst
[docs] @computation_function() def offset_correction(src: SignalObj, p: ROI1DParam) -> SignalObj: """Correct offset: subtract the mean value of the signal in the specified range (baseline correction) Args: src: source signal p: parameters Returns: Result signal object """ dst = dst_1_to_1(src, "offset_correction", f"{p.xmin:.3g}≤x≤{p.xmax:.3g}") _roi_x, roi_y = p.get_data(src) dst.y -= np.mean(roi_y) restore_data_outside_roi(dst, src) return dst
[docs] class DetrendingParam(gds.DataSet, title=_("Detrending")): """Detrending parameters""" methods = (("linear", _("Linear")), ("constant", _("Constant"))) method = gds.ChoiceItem(_("Detrending method"), methods, default="linear")
[docs] @computation_function() def detrending(src: SignalObj, p: DetrendingParam) -> SignalObj: """Detrend data with :py:func:`scipy.signal.detrend` Args: src: source signal p: parameters Returns: Result signal object """ dst = dst_1_to_1(src, "detrending", f"method={p.method}") x, y = src.get_data() dst.set_xydata(x, sps.detrend(y, type=p.method)) restore_data_outside_roi(dst, src) return dst
[docs] class WindowingParam(gds.DataSet, title=_("Windowing")): """Windowing parameters.""" _meth_prop = gds.GetAttrProp("method") method = gds.ChoiceItem( _("Method"), WindowingMethod, default=WindowingMethod.HAMMING ).set_prop("display", store=_meth_prop) alpha = gds.FloatItem( "Alpha", default=0.5, help=_("Shape parameter of the Tukey windowing function"), ).set_prop( "display", active=gds.FuncProp(_meth_prop, lambda x: x == WindowingMethod.TUKEY) ) beta = gds.FloatItem( "Beta", default=14.0, help=_("Shape parameter of the Kaiser windowing function"), ).set_prop( "display", active=gds.FuncProp(_meth_prop, lambda x: x == WindowingMethod.KAISER), ) sigma = gds.FloatItem( "Sigma", default=0.5, help=_("Shape parameter of the Gaussian windowing function"), ).set_prop( "display", active=gds.FuncProp(_meth_prop, lambda x: x == WindowingMethod.GAUSSIAN), )
[docs] @computation_function() def apply_window(src: SignalObj, p: WindowingParam) -> SignalObj: """Compute windowing with :py:func:`sigima.tools.signal.windowing.apply_window`. Args: src: Source signal. p: Parameters for windowing. Returns: Result signal object. """ method: WindowingMethod = p.method suffix = f"method={method.value}" if method == WindowingMethod.GAUSSIAN: suffix += f", sigma={p.sigma:.3f}" elif method == WindowingMethod.KAISER: suffix += f", beta={p.beta:.3f}" elif method == WindowingMethod.TUKEY: suffix += f", alpha={p.alpha:.3f}" dst = dst_1_to_1(src, "apply_window", suffix) assert p.alpha is not None dst.y = windowing.apply_window(dst.y, method, p.alpha) restore_data_outside_roi(dst, src) return dst
[docs] @computation_function() def reverse_x(src: SignalObj) -> SignalObj: """Reverse x-axis Args: src: source signal Returns: Result signal object """ dst = dst_1_to_1(src, "reverse_x") dst.y = dst.y[::-1] return dst
[docs] @computation_function() def convolution(src1: SignalObj, src2: SignalObj) -> SignalObj: """Compute convolution of two signals with :py:func:`scipy.signal.convolve`. Args: src1: Source signal 1. src2: Source signal 2. Returns: Result signal. Notes: The behavior of kernel normalization is controlled by the global configuration option ``sigima.config.options.auto_normalize_kernel``. """ check_same_sample_rate(src1, src2) dst = dst_2_to_1(src1, src2, "⊛") x1, y1 = src1.get_data() _x2, y2 = src2.get_data() # Get configuration option for kernel normalization normalize_kernel = sigima_options.auto_normalize_kernel.get() ynew = fourier.convolve( x1, y1, y2, normalize_kernel_flag=normalize_kernel, ) dst.set_xydata(x1, ynew, None, None) restore_data_outside_roi(dst, src1) return dst
[docs] @computation_function() def xy_mode(src1: SignalObj, src2: SignalObj) -> SignalObj: """Simulate the X-Y mode of an oscilloscope. Use the first signal as the X-axis and the second signal as the Y-axis. Args: src1: First input signal (X-axis). src2: Second input signal (Y-axis). Returns: A signal object representing the X-Y mode. """ dst = dst_2_to_1(src1, src2, "", "X-Y Mode") p = Resampling1DParam() p.xmin = max(src1.x[0], src2.x[0]) p.xmax = min(src1.x[-1], src2.x[-1]) assert p.xmin < p.xmax, "X-Y mode: No overlap between signals." p.mode = "nbpts" p.nbpts = min(src1.x.size, src2.x.size) _, y1 = resampling(src1, p).get_data() _, y2 = resampling(src2, p).get_data() dst.set_xydata(y1, y2) dst.title = "{1} = f({0})" restore_data_outside_roi(dst, src1) return dst
[docs] @computation_function() def replace_x_by_other_y(src1: SignalObj, src2: SignalObj) -> SignalObj: """Create a new signal using Y from src1 and Y from src2 as X coordinates. This is useful for calibration scenarios where one signal contains calibration data (e.g., wavelengths) in its Y values, and you want to plot another signal's Y values against these calibration points. The two signals must have the same number of points. Args: src1: First signal (provides Y data for output). src2: Second signal (provides Y data to be used as X coordinates for output). Returns: A new signal with X from src2.y and Y from src1.y. Raises: ValueError: If signals don't have the same number of points. """ if src1.y.size != src2.y.size: raise ValueError( f"Signals must have the same number of points: " f"{src1.y.size} != {src2.y.size}" ) dst = dst_2_to_1(src1, src2, "replace_x_by_other_y") dst.set_xydata(src2.y, src1.y) dst.xlabel = src2.ylabel if src2.ylabel else "X" dst.xunit = src2.yunit if src2.yunit else "" # Y label and unit remain from src1 restore_data_outside_roi(dst, src1) return dst