Source code for sigima.objects.signal.object

# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.

"""Signal object class
===================

This module provides the main SignalObj class for handling 1D signal data.

The module includes:

- `SignalObj`: Main class for signal data management and operations

The SignalObj class supports:
- Signal data storage with x, y coordinates
- Error bars (dx, dy) for uncertainty quantification
- Metadata and annotations
- ROI (Region of Interest) operations
- Axis labels and units management
- Copy operations with type conversion
"""

# pylint: disable=invalid-name  # Allows short reference names like x, y, ...
# pylint: disable=duplicate-code

from __future__ import annotations

from typing import Type

import guidata.dataset as gds
import numpy as np
import pandas as pd

from sigima.config import _
from sigima.objects import base
from sigima.objects.signal.constants import (
    DATETIME_X_FORMAT_KEY,
    DATETIME_X_KEY,
    DEFAULT_DATETIME_FORMAT,
    VALID_TIME_UNITS,
)
from sigima.objects.signal.roi import SignalROI
from sigima.tools.datatypes import datetime64_to_seconds


def validate_and_convert_dtype(x: np.ndarray) -> np.ndarray:
    """Check if data type is valid, convert integer to float64 if needed.

    Args:
        x: Input array

    Returns:
        Original array if data type is valid, array converted to float64 if integer

    Raises:
        ValueError: If data type is not valid
    """
    if np.issubdtype(x.dtype, np.integer):
        return x.astype(np.float64)
    if x.dtype not in SignalObj.VALID_DTYPES:
        raise ValueError(
            f"Invalid data type: {x.dtype}. "
            f"Valid types: {', '.join(str(dt) for dt in SignalObj.VALID_DTYPES)}"
        )
    return x


[docs] class SignalObj(gds.DataSet, base.BaseObj[SignalROI]): """Signal object""" PREFIX = "s" VALID_DTYPES = (np.float32, np.float64, np.complex128) _tabs = gds.BeginTabGroup("all") _datag = gds.BeginGroup(_("Data and metadata")) title = gds.StringItem(_("Signal title"), default=_("Untitled")) xydata = gds.FloatArrayItem(_("Data"), transpose=True, minmax="rows") metadata = gds.DictItem(_("Metadata"), default={}) # type: ignore[assignment] annotations = gds.StringItem(_("Annotations"), default="").set_prop( "display", hide=True, ) # Annotations (JSON). Use get/set_annotations() API # type: ignore[assignment] _e_datag = gds.EndGroup(_("Data and metadata")) _unitsg = gds.BeginGroup(_("Titles / Units")) title = gds.StringItem(_("Signal title"), default=_("Untitled")) _tabs_u = gds.BeginTabGroup("units") _unitsx = gds.BeginGroup(_("X-axis")) xlabel = gds.StringItem(_("Title"), default="") xunit = gds.StringItem(_("Unit"), default="") _e_unitsx = gds.EndGroup(_("X-axis")) _unitsy = gds.BeginGroup(_("Y-axis")) ylabel = gds.StringItem(_("Title"), default="") yunit = gds.StringItem(_("Unit"), default="") _e_unitsy = gds.EndGroup(_("Y-axis")) _e_tabs_u = gds.EndTabGroup("units") _e_unitsg = gds.EndGroup(_("Titles / Units")) _scalesg = gds.BeginGroup(_("Scales")) _prop_autoscale = gds.GetAttrProp("autoscale") autoscale = gds.BoolItem(_("Auto scale"), default=True).set_prop( "display", store=_prop_autoscale ) _tabs_b = gds.BeginTabGroup("bounds") _boundsx = gds.BeginGroup(_("X-axis")) xscalelog = gds.BoolItem(_("Logarithmic scale"), default=False) xscalemin = gds.FloatItem(_("Lower bound"), check=False).set_prop( "display", active=gds.NotProp(_prop_autoscale) ) xscalemax = gds.FloatItem(_("Upper bound"), check=False).set_prop( "display", active=gds.NotProp(_prop_autoscale) ) _e_boundsx = gds.EndGroup(_("X-axis")) _boundsy = gds.BeginGroup(_("Y-axis")) yscalelog = gds.BoolItem(_("Logarithmic scale"), default=False) yscalemin = gds.FloatItem(_("Lower bound"), check=False).set_prop( "display", active=gds.NotProp(_prop_autoscale) ) yscalemax = gds.FloatItem(_("Upper bound"), check=False).set_prop( "display", active=gds.NotProp(_prop_autoscale) ) _e_boundsy = gds.EndGroup(_("Y-axis")) _e_tabs_b = gds.EndTabGroup("bounds") _e_scalesg = gds.EndGroup(_("Scales")) _e_tabs = gds.EndTabGroup("all") def __init__(self, title=None, comment=None, icon=""): """Constructor Args: title: title comment: comment icon: icon """ gds.DataSet.__init__(self, title, comment, icon) base.BaseObj.__init__(self) @staticmethod def get_roi_class() -> Type[SignalROI]: """Return ROI class""" return SignalROI def copy( self, title: str | None = None, dtype: np.dtype | None = None, all_metadata: bool = False, ) -> SignalObj: """Copy object. Args: title: title dtype: data type all_metadata: if True, copy all metadata, otherwise only basic metadata Returns: Copied object """ title = self.title if title is None else title obj = SignalObj(title=title) obj.title = title obj.xlabel = self.xlabel obj.ylabel = self.ylabel obj.xunit = self.xunit obj.yunit = self.yunit if dtype not in (None, float, complex, np.complex128): raise RuntimeError("Signal data only supports float64/complex128 dtype") obj.metadata = base.deepcopy_metadata(self.metadata, all_metadata=all_metadata) obj.annotations = self.annotations obj.xydata = np.array(self.xydata, copy=True, dtype=dtype) obj.autoscale = self.autoscale obj.xscalelog = self.xscalelog obj.xscalemin = self.xscalemin obj.xscalemax = self.xscalemax obj.yscalelog = self.yscalelog obj.yscalemin = self.yscalemin obj.yscalemax = self.yscalemax return obj def set_data_type(self, dtype: np.dtype) -> None: # pylint: disable=unused-argument """Change data type. Args: Data type """ raise RuntimeError("Setting data type is not support for signals") def set_xydata( self, x: np.ndarray | list | None, y: np.ndarray | list | None, dx: np.ndarray | list | None = None, dy: np.ndarray | list | None = None, ) -> None: """Set xy data Args: x: x data y: y data dx: dx data (optional: error bars). Use None to reset dx data to None, or provide array to set new dx data. dy: dy data (optional: error bars). Use None to reset dy data to None, or provide array to set new dy data. """ if x is None and y is None: # Using empty arrays (this allows initialization of the object without data) x = np.array([], dtype=np.float64) y = np.array([], dtype=np.float64) if x is None and y is not None: # If x is None, we create a default x array based on the length of y assert isinstance(y, (list, np.ndarray)) x = np.arange(len(y), dtype=np.float64) if x is not None: x = np.array(x) if y is not None: y = np.array(y) if dx is not None: dx = np.array(dx) if dy is not None: dy = np.array(dy) if dx is None and dy is None: xydata = np.vstack([x, y]) else: if dx is None: dx = np.full_like(x, np.nan) if dy is None: dy = np.full_like(y, np.nan) assert x is not None and y is not None xydata = np.vstack((x, y, dx, dy)) self.xydata = validate_and_convert_dtype(xydata) def __get_x(self) -> np.ndarray | None: """Get x data""" if self.xydata is not None: x: np.ndarray = self.xydata[0] # We have to ensure that x is a floating point array, because if y is # complex, the whole xydata array will be complex, and we need to avoid # any unintended type promotion. return x.real.astype(float) return None def __set_x(self, data: np.ndarray | list[float]) -> None: """Set x data""" assert isinstance(self.xydata, np.ndarray) assert isinstance(data, (list, np.ndarray)) data = np.array(data, dtype=float) assert data.shape[0] == self.xydata.shape[1], ( "X data size must match Y data size" ) if not np.all(np.diff(data) >= 0.0): raise ValueError("X data must be monotonic (sorted in ascending order)") self.xydata[0] = validate_and_convert_dtype(data) def __get_y(self) -> np.ndarray | None: """Get y data""" if self.xydata is not None: return self.xydata[1] return None def __set_y(self, data: np.ndarray | list[float]) -> None: """Set y data""" assert isinstance(self.xydata, np.ndarray) assert isinstance(data, (list, np.ndarray)) data = np.array(data) assert data.shape[0] == self.xydata.shape[1], ( "Y data size must match X data size" ) assert np.issubdtype(data.dtype, np.inexact), "Y data must be float or complex" self.xydata[1] = validate_and_convert_dtype(data) def __get_dx(self) -> np.ndarray | None: """Get dx data""" if self.xydata is not None and len(self.xydata) == 4: dx: np.ndarray = self.xydata[2] if np.all(np.isnan(dx)): return None return dx.real.astype(float) return None def __set_dx(self, data: np.ndarray | list[float] | None) -> None: """Set dx data""" if data is None: data = np.full_like(self.x, np.nan) assert isinstance(data, (list, np.ndarray)) data = np.array(data) if self.xydata is None: raise ValueError("Signal data not initialized") assert data.shape[0] == self.xydata.shape[1], ( "dx data size must match X data size" ) if len(self.xydata) == 2: # Initialize uncertainty rows with NaN so that the uncertainty for # the other dimension is reported as missing (None) rather than as # an array of phantom zero values. self.xydata = np.vstack( (self.xydata, np.full((2, self.xydata.shape[1]), np.nan)) ) self.xydata[2] = validate_and_convert_dtype(data) def __get_dy(self) -> np.ndarray | None: """Get dy data""" if self.xydata is not None and len(self.xydata) == 4: dy: np.ndarray = self.xydata[3] if np.all(np.isnan(dy)): return None return dy return None def __set_dy(self, data: np.ndarray | list[float] | None) -> None: """Set dy data""" if data is None: data = np.full_like(self.x, np.nan) assert isinstance(data, (list, np.ndarray)) data = np.array(data) if self.xydata is None: raise ValueError("Signal data not initialized") assert data.shape[0] == self.xydata.shape[1], ( "dy data size must match X data size" ) if len(self.xydata) == 2: # Initialize uncertainty rows with NaN so that the uncertainty for # the other dimension is reported as missing (None) rather than as # an array of phantom zero values. self.xydata = np.vstack( (self.xydata, np.full((2, self.xydata.shape[1]), np.nan)) ) self.xydata[3] = validate_and_convert_dtype(data) x = property(__get_x, __set_x) y = data = property(__get_y, __set_y) dx = property(__get_dx, __set_dx) dy = property(__get_dy, __set_dy) def _repr_html_(self) -> str: """Return HTML representation for Jupyter notebook display. This method is automatically called by Jupyter when displaying the object as a cell output, providing a rich HTML rendering of the signal object. Returns: HTML representation of the signal with summary statistics. """ n_points = len(self.x) if self.x is not None else 0 x_min = f"{self.x.min():.4g}" if n_points > 0 else "N/A" x_max = f"{self.x.max():.4g}" if n_points > 0 else "N/A" y_min = f"{self.y.min():.4g}" if n_points > 0 else "N/A" y_max = f"{self.y.max():.4g}" if n_points > 0 else "N/A" dtype_str = str(self.y.dtype) if self.y is not None else "N/A" # Build axis labels with optional title x_label = "X" if self.xlabel: x_label = f"X ({self.xlabel})" y_label = "Y" if self.ylabel: y_label = f"Y ({self.ylabel})" html = f'<u><b style="color: #5294e2">SignalObj: {self.title}</b></u>:' html += '<table border="0">' html += f"<tr><td style='text-align:right'>Points:</td><td>{n_points}</td></tr>" html += ( f"<tr><td style='text-align:right'>{x_label} range:</td>" f"<td>[{x_min}, {x_max}]" ) if self.xunit: html += f" {self.xunit}" html += "</td></tr>" html += ( f"<tr><td style='text-align:right'>{y_label} range:</td>" f"<td>[{y_min}, {y_max}]" ) if self.yunit: html += f" {self.yunit}" html += "</td></tr>" html += ( f"<tr><td style='text-align:right'>Data type:</td><td>{dtype_str}</td></tr>" ) if self.roi is not None: html += ( f"<tr><td style='text-align:right'>ROIs:</td>" f"<td>{len(self.roi)}</td></tr>" ) html += "</table>" return html def get_data(self, roi_index: int | None = None) -> tuple[np.ndarray, np.ndarray]: """ Return original data (if ROI is not defined or `roi_index` is None), or ROI data (if both ROI and `roi_index` are defined). Args: roi_index: ROI index Returns: Data """ if self.roi is None or roi_index is None: assert isinstance(self.xydata, np.ndarray) return self.x, self.y single_roi = self.roi.get_single_roi(roi_index) return single_roi.get_data(self) def physical_to_indices(self, coords: list[float]) -> list[int]: """Convert coordinates from physical (real world) to indices (pixel) Args: coords: coordinates Returns: Indices """ assert isinstance(self.x, np.ndarray) return [int(np.abs(self.x - x).argmin()) for x in coords] def indices_to_physical(self, indices: list[int]) -> list[float]: """Convert coordinates from indices to physical (real world) Args: indices: indices Returns: Coordinates """ # We take the real part of the x data to avoid `ComplexWarning` warnings # when creating and manipulating the `XRangeSelection` shape (`plotpy`) return self.x.real[indices].tolist() def is_x_datetime(self) -> bool: """Check if x data represents datetime values. Returns: True if x data represents datetime values, False otherwise """ return self.metadata.get(DATETIME_X_KEY, False) def set_x_from_datetime( self, dt_array: np.ndarray | list, unit: str = "s", format_str: str | None = None, ) -> None: """Set x values from datetime objects or strings. This method converts datetime data to float timestamps (Unix time: seconds since 1970-01-01) for efficient storage and computation. The datetime context is preserved through metadata. Note: X values are always stored as Unix timestamps (seconds since 1970-01-01) regardless of the 'unit' parameter. The 'unit' parameter is stored in metadata and used only for axis labeling when plotting. Args: dt_array: Array of datetime objects, datetime strings, or numpy datetime64 unit: Time unit label for display. Options: 's' (seconds), 'ms' (milliseconds), 'us' (microseconds), 'ns' (nanoseconds), 'min' (minutes), 'h' (hours). Default is 's'. This parameter only affects the axis label, not the stored data. format_str: Format string for datetime display. If None, uses default. Raises: ValueError: If unit is not valid Example: >>> from datetime import datetime >>> signal = SignalObj() >>> timestamps = [datetime(2025, 1, 1, 10, 0, 0), ... datetime(2025, 1, 1, 10, 0, 1)] >>> signal.set_x_from_datetime(timestamps, unit='s') >>> signal.is_x_datetime() True >>> # X data is stored as Unix timestamps (seconds since 1970) >>> signal.x[0] > 1.7e9 # Year 2025 True """ if unit not in VALID_TIME_UNITS: raise ValueError( f"Invalid unit: {unit}. Must be one of: {', '.join(VALID_TIME_UNITS)}" ) # Convert to pandas datetime (handles strings, datetime objects, etc.) dt_series = pd.to_datetime(dt_array) # Convert to float timestamp in seconds (Unix timestamps since 1970-01-01). # Note: We always store as Unix timestamps regardless of the 'unit' parameter, # which is only for display purposes. x_float = datetime64_to_seconds(dt_series.values).astype(np.float64) # Check if signal already has data with matching size if self.xydata is not None and self.xydata.shape[1] == len(x_float): # Signal already has matching data, just update x self.x = x_float else: # Initialize or reinitialize signal with x data (y will be zeros) y_placeholder = np.zeros_like(x_float) self.set_xydata(x_float, y_placeholder) # Store metadata self.metadata[DATETIME_X_KEY] = True self.metadata[DATETIME_X_FORMAT_KEY] = ( format_str if format_str is not None else DEFAULT_DATETIME_FORMAT ) # Store unit in xunit attribute (more intuitive than metadata) self.xunit = unit def get_x_as_datetime(self) -> np.ndarray: """Get x values as datetime objects if x is datetime data. Returns x data as numpy datetime64 array if the signal contains datetime data, otherwise returns the regular x data as floats. Returns: Array of datetime64 objects if x is datetime data, otherwise regular x array Example: >>> signal.set_x_from_datetime([datetime(2025, 1, 1, 10, 0, 0)]) >>> dt_values = signal.get_x_as_datetime() >>> isinstance(dt_values[0], np.datetime64) True """ if not self.is_x_datetime(): return self.x # X values are always stored as Unix timestamps (seconds since 1970-01-01) # regardless of the 'unit' parameter x_float = self.x # Convert seconds to datetime using pandas return pd.to_datetime(x_float, unit="s").to_numpy()