Source code for torchsig.signals.builder

"""Signal and Composite Signal Builders

Examples
    Signal Builder
        >>> from torchsig.signals import SignalBuilder
        >>> sb = SignalBuilder()
        >>> sb.data = np.array([1.0, 2.0])
        >>> sb.sample_rate = 1.5
        >>> ...
        >>> new_signal = sb.build()
    Composite Signal Builder
        >>> from torchsig.signals import CompositeSignalBuilder, SignalBuilder
        >>> builder1 = SignalBuilder()
        >>> builder1.data = [1.0, 2.0]
        >>> ...
        >>> csb = CompositeSignalBuilder()
        >>> csb.builders.append(builder1)
        >>> csb.sample_rate = 2.0
        >>> ...
        >>> new_composite_signal = csb.build()
"""

# TorchSig
from torchsig.signals.signal_types import Signal, SignalMetadata
from torchsig.datasets.dataset_metadata import DatasetMetadata
from torchsig.utils.random import Seedable
from torchsig.utils.dsp import compute_spectrogram

# Third Party
import numpy as np

# Built-In
from abc import ABC, abstractmethod
from copy import copy
import os

DIR_PATH = os.path.dirname(os.path.realpath(__file__))



[docs] class Builder(ABC): """Abstract builder class for signals Attributes: name (str): Builder name. Defaults to "Builder". """
[docs] def __init__(self, name: str = "Builder"): """Initialize builder, reset. """ self.name = name
[docs] @abstractmethod def build(self) -> Signal: """Build and Return Signal() object. To be implemented by subclasses. Returns: Signal: signal being built. """
[docs] @abstractmethod def reset(self) -> None: """Resets builder. To be implemented by subclasses. """ self._signal = None
[docs] def __repr__(self): return f"{self.__class__.__name__}(name={self.name})"
[docs] class SignalBuilder(Builder, Seedable): """ Signal Builder. Creates a Signal. Attributes: dataset_metadata (DatasetMetadata): Dataset metadata for signal. class_name (str): Name of the specific waveform to build, ex: 2fsk, qpsk, ofdm-1024 name (str): Signal builder name. Defaults to "class_name Signal Builder" supported_classes (List[str]): Defines what classes builder supports. Defaults to []. supported_classes (List[str]): List of signal class names that the builder supports. Set to `[]`. """ supported_classes = []
[docs] def __init__(self, dataset_metadata: DatasetMetadata, class_name:str, **kwargs): """Initializes Signal Builder. Args: dataset_metadata (DatasetMetadata): Dataset metadata. class_name (str): Class name. Raises: ValueError: Signal builder does not support class_name signal. """ self.class_name = class_name Builder.__init__(self, name=f" {self.class_name} Signal Builder") Seedable.__init__(self, **kwargs) # retains dataset metadata info self.dataset_metadata = dataset_metadata if not self.class_name in self.supported_classes: raise ValueError(f"{self.class_name} + ' not supported by {self.__class__.__name__}. List of supported waveforms: {self.supported_classes}")
[docs] def __repr__(self): return f"{self.__class__.__name__}(class_name={self.class_name})"
def _update_data(self) -> None: """Creates the IQ samples and sets them to `self._signal.data`. Creates the IQ samples for the waveform modulated with the center frequency, SNR, and other fields as described by `self._signal.metadata`. `_update_data()` builds the waveform from metadata defined by `_update_metadata()`. Raises: NotImplementedError: Inherited classes must implement this method. """ raise NotImplementedError("_update_data() not implemented.") def _update_metadata(self) -> None: """Updates `self._signal.metadata`. Signal metadata such as center frequency, SNR and others are randomly created and set to default values as defined by dataset metadata inside `reset()`. This `_update_metadata()` function is only used to override those default values for signal-specific cases. The metadata will inform the creation of the IQ samples within the `_update_data()` function. Raises: NotImplementedError: Inherited classes must implement this method. """ raise NotImplementedError("_update_metadata() not implemented.")
[docs] def build(self) -> Signal: """Builds and returns Signal The Builder() __init__ calls `reset()` which initializes the signal metadata according to default values defined within dataset metadata. `_update_metadata()` then updates signal-specific metadata fields as needed. `_update_data()` creates the IQ samples and assigns them to the Signal() data field. `_correct_bandwidth_and_snr()` sets the power of the signal based on the PSD estimate to have the appropriate SNR and then estimates the total occupied bandwidth of the signal, including sidelobes, to develop a more accurate bounding box. `.verify()` ensures that the metadata values are within the appropriate bounds. The Signal() object is then stored for the return call, and `reset()` is called to reset the signal metadata fields to their dataset metadata defaults. In order, calls: - `reset()` - `_update_metadata()` - `_update_data()` - `_correct_bandwidth_and_snr()` - `_signal.verify()` Returns: Signal: Signal being built. """ # performs a reset of the signal metadata to default values as defined # by dataset metadata, as was done in the Builder() parent class __init__ self.reset() # the __init__ within the parent class Builder() has called .reset() # to establish the signal metadata according to the dataset metadata # defaults. this function is used to update signal-specific metadata # fields self._update_metadata() # generate IQ with appropriate modulator based on signal metadata self._update_data() # reestimate bandwidth in order to better fit the bounding box # in the frequency domain self._correct_bandwith_and_snr() # checks that signal and metadata set properly self._signal.verify() # signal object to be returned new_signal = self._signal # ensures IQ data is in complex64 new_signal.data = new_signal.data.astype(np.complex64) return new_signal
[docs] def reset(self) -> None: """Resets `_signal` according to defaults defined by dataset metadata. Signal metadata is generated according to the default values as defined by the dataset metadata. The signal data is set to noise samples. These metadata value can be overridden if a particular modulator or special signal requires it. For example, the bandwidth of a tone is dependent on the signal duration and therefore requires recalculation and an update to the signal metadata inside `_update_metadata()` in the tone builder class. Similarly, the minimum duration for a constellation based signal must be at least 1 symbol, which requires recomputation in the `_update_metadata()` field for the constellation builder class. This method is called by the parent Builder() `__init__` and after `build()`. """ # is duration parameter to be randomized? if self.dataset_metadata.signal_duration_in_samples_min == self.dataset_metadata.signal_duration_in_samples_max: # the min and max fields are the same, so just use one of the fields duration = copy(self.dataset_metadata.signal_duration_in_samples_min) else: # sets duration randomly between min duration and max duration as defined by dataset metadata duration = self.random_generator.integers(low=self.dataset_metadata.signal_duration_in_samples_min, high=self.dataset_metadata.signal_duration_in_samples_max,dtype=int) # is start parameter to be randomized? if duration == self.dataset_metadata.num_iq_samples_dataset: # duration is equal to the total dataset length, therefore start must be zero start = 0 else: # given duration, start is randomly set from 0 to rightmost time that the duration still fits inside the dataset iq samples start = self.random_generator.integers(low=0, high=self.dataset_metadata.num_iq_samples_dataset-duration,dtype=int) # randomly set bandwidth between a minimum and max bw = self.random_generator.uniform(self.dataset_metadata.signal_bandwidth_min,self.dataset_metadata.signal_bandwidth_max) # center frequency always zero, will be randomized within the Narrowband() or Wideband() datasets themselves # due to the need to apply impairments at complex baseband first before upconversion to the IF center_freq = 0 # randomly select SNR snr_db = np.round(self.random_generator.uniform(self.dataset_metadata.snr_db_min,self.dataset_metadata.snr_db_max),1) # define SignalMetadata default_metadata = SignalMetadata( dataset_metadata = self.dataset_metadata, start_in_samples = start, duration_in_samples = duration, bandwidth = bw, center_freq = center_freq, snr_db = snr_db, class_name = self.class_name, class_index = self.dataset_metadata.class_list.index(self.class_name) ) # set signal data to be zeros; the proper IQ samples will be # recreated as part of the _update_data() call default_data = np.zeros(self.dataset_metadata.num_iq_samples_dataset,dtype=np.complex64) # reset _signal self._signal = Signal(data=default_data, metadata=default_metadata)
def _correct_bandwith_and_snr(self) -> None: """Corrects SNR of time-series, and metadata bandwidth to produce more accurate bounding box The SNR is set by estimating the signal PSD and scaling the signal to produce an accurate SNR estimate. The SNR of the signal is calculated by the maximum of the PSD estimate subtracted by the noise power. The tone signal bandwidth is often larger than the predicted bounding box, in particular for high SNR signals. A spectral estimation method is used to estimate the 99% bandwidth of the signal and therefore marginally increase the bounding box size to fit. Note that the tone signal itself cannot be resampled to fit into the bounding box because the bandwidth of the tone is soley derived from the number of samples, which would force a change to the time duration which is not desirable. This same method is applied for all signals to produce a more accurate bounding box. """ # compute spectral estimate of signal. use a large stride to process only a # subset of the data to reduce computation signal_spectrogram_db = compute_spectrogram( self._signal.data, self.dataset_metadata.fft_size, self.dataset_metadata.fft_stride ) # average over time, used in PSD estimate for SNR calculation signal_avg_fft_db = np.mean(signal_spectrogram_db,axis=1) # estimate the frequency response maximum value max_value_db = np.max(signal_avg_fft_db) # estimate SNR snr_estimate_db = max_value_db - self.dataset_metadata.noise_power_db # calculate the appropriate correction to set SNR correction_db = self._signal.metadata.snr_db - snr_estimate_db # convert correction value to linear correction = 10**(correction_db/10) # apply correction value to signal self._signal.data *= np.sqrt(correction) # also apply correction to avg FFT signal_avg_fft_db += correction_db # apply correction to spectrogram signal_spectrogram_db += correction_db # compute max hold for bandwidth estimation signal_max_fft_db = np.max(signal_spectrogram_db,axis=1) # find edges where the signal is above noise floor by the amount dictated by the relative threshold noise_floor_db = self.dataset_metadata.noise_power_db relative_threshold_db = 3 bandwidth_estimation_threshold_db = noise_floor_db + relative_threshold_db exceedance_indices = np.where(signal_max_fft_db > bandwidth_estimation_threshold_db)[0] upper_edge_index = 0 lower_edge_index = 0 if len(exceedance_indices) == 1: # single threshold exceedance, measured bandwidth is equal to 1 FFT bin width lower_edge_index = copy(exceedance_indices[0]) upper_edge_index = copy(exceedance_indices[0]) # set flag that bandwidth field needs updating update_bandwidth = True elif len(exceedance_indices) > 1: # multiple exceedances, must compute bandwidth lower_edge_index = exceedance_indices[0] upper_edge_index = exceedance_indices[-1] # set flag that bandwidth field needs updating update_bandwidth = True else: # no exceedances, signal power is too low to be detected, current # bandwidth is fine update_bandwidth = False if update_bandwidth: # create frequency vector for the FFT f = np.linspace(-0.5,0.5-(1/self.dataset_metadata.fft_size),self.dataset_metadata.fft_size)*self.dataset_metadata.sample_rate # determine estimated upper and lower freq bounds upper_freq = f[upper_edge_index] lower_freq = f[lower_edge_index] # widen bandwidth by a small proportion widen_bandwidth_value = self.dataset_metadata.fft_frequency_resolution/2 # logic to widen bandwidth on upper and lower freq edge, and avoid running # past the boundary lower_freq -= widen_bandwidth_value if lower_freq < self.dataset_metadata.frequency_min: lower_freq = copy(self.dataset_metadata.frequency_min) upper_freq += widen_bandwidth_value if upper_freq > self.dataset_metadata.frequency_max: upper_freq = copy(self.dataset_metadata.frequency_max) # compute 99% bandwidth bandwidth99 = upper_freq - lower_freq # because the tone's bandwidth cannot be resampled we must instead change # the bounding box by changing the bandwidth metadata field self._signal.metadata.bandwidth = bandwidth99
# def __repr__(self): # return f"{self.__class__.__name__}(name={self.name})\n\t_signal={self._signal}" ### # class CompositeSignalBuilder(SignalBuilder): # """CompositeSignal Builder. Creates a complex signal with multiple Signals inside it. # Attributes: # dataset_metadata (DatasetMetadata): Dataset metadata for signal. # name (str): Composite signal builder name. Defaults to "Composite Signal Builder" # builders (List[SignalBuilder]): List of SignalBuilders. Defaults to []. # """ # def __init__(self, dataset_metadata: DatasetMetadata, seed:int = None): # super().__init__(dataset_metadata = dataset_metadata, class_name = "Composite", seed=seed) # self.builders = None # def build(self) -> CompositeSignal: # """Builds and returns CompositeSignal. # Returns: # CompositeSignal: CompositeSignal being built. # """ # for b in self.builders: # self._signal.signals.append(b.build()) # self._signal.verify() # return self._signal # def reset(self) -> None: # """Resets CompositeSignalBuilder # """ # self.builders = [] # self._signal = CompositeSignal(self.dataset_metadata) # def add_builder(self, b: SignalBuilder) -> None: # self.builders.append(b) # def __len__(self): # return len(self.builders) # def __repr__(self): # return f"{self.__class__.__name__}(name={self.name})\n\tbuilders={self.builders}"