Source code for torchsig.signals.builders.constellation

"""Constellation Signal Builder and Modulator
"""
# TorchSig
from torchsig.signals.builder import SignalBuilder
from torchsig.datasets.dataset_metadata import DatasetMetadata
from torchsig.utils.dsp import (
    estimate_filter_length,
    srrc_taps, 
    multistage_polyphase_resampler,
    pad_head_tail_to_length,
    slice_head_tail_to_length,
    slice_tail_to_length,
    torchsig_complex_data_type
)
from torchsig.signals.signal_lists import TorchSigSignalLists
from torchsig.signals.builders.constellation_maps import all_symbol_maps

# Third Party
import numpy as np
import scipy.signal as sp

# Built-In
from copy import copy


# Modulator

[docs] def constellation_modulator_baseband ( class_name:str, pulse_shape_name:str, max_num_samples:int, oversampling_rate_nominal:int, alpha_rolloff:float=None, rng=np.random.default_rng() ) -> np.ndarray: """Modulates constellation based signals (QAM/PSK/ASK/OOK) at complex baseband. Args: class_name (str): Name of the signal to modulate, ex: 'qpsk'. pulse_shape_name (str): Pulse shaping filter selection, 'rectangular' or 'srrc' (square-root raised cosine). max_num_samples (int): Maximum number of samples to be produced. The length of the output signal must be less than or equal to this number. oversampling_rate_nominal (int): The amount of oversampling, which is equal to the ratio of the ratio of the sampling rate and bandwidth. alpha_rolloff (float, optional): The alpha-rolloff value for the SRRC filter. If pulse_shape_name == 'recantangular' then this value is ignored. If pulse_shape_name == 'srrc' then this value must be defined, and within the range 0 < alpha_rolloff < 1. Defaults to None. rng (optional): Seedable random number generator for reproducibility. Raises: ValueError: Raises ValueError if pulse_shape_name is neither 'rectangular' or 'srrc'. ValueError: Raises ValueError if alpha_rolloff is not defined when selecting 'srrc'. Returns: np.ndarray: IQ samples of the constellation-modulated complex baseband signal. """ # also the samples per symbol samples_per_symbol = oversampling_rate_nominal # get symbol map symbol_map = all_symbol_maps[class_name] # ensure symbol map is avg unit power symbol_map = symbol_map / np.sqrt(np.mean(np.abs(symbol_map)**2)) # pulse shape if pulse_shape_name == 'rectangular': pulse_shape = np.ones(samples_per_symbol) pulse_shape_filter_span = 0 elif pulse_shape_name == 'srrc': if alpha_rolloff is None: raise ValueError('must define an alpha rolloff for SRRC filter') # design the pulse shaping filter attenuation_db = 120 pulse_shape_filter_length = estimate_filter_length(alpha_rolloff,attenuation_db,1) pulse_shape_filter_span = int(np.ceil((pulse_shape_filter_length - 1) / (2*samples_per_symbol))) # convert filter length into the span pulse_shape = srrc_taps(samples_per_symbol, pulse_shape_filter_span, alpha_rolloff) else: raise ValueError('pulse shape ' + str(pulse_shape_name) + ' not supported') # number of symbols to subtract off from generation in order to produce # a signal that is less than the desired length to avoid slicing a symbol # # filter span = (number of symbols in pulse shape - 1)/2, therefore the span # for a rectangular pulse shape is zero subtract_off_symbols = 2*pulse_shape_filter_span # number of symbols to create. use floor() and subtract off based # on filter span to ensure that a smaller number of samples is created # and does not equal or exceed the max_num_samples. the idea is to avoid # slicing a symbol, rather all samples from the transition periods are # to be retained num_symbols = int(np.floor(max_num_samples/samples_per_symbol))-subtract_off_symbols # enforce that the minimum number cannot be less than 1 num_symbols = 1 if num_symbols < 1 else num_symbols # create symbols. because OOK has symbols which are zeros this needs to run # a loop until 1 or more symbols are non-zero symbols = np.zeros(1) while (np.sum(np.abs(symbols)) == 0): # index into the symbol map map_index = rng.integers(low=0,high=len(symbol_map),size=num_symbols) # randomly generate symbols symbols = symbol_map[map_index] # interplate using pulse shaping filter constellation_signal_baseband = sp.upfirdn(pulse_shape,symbols,up=samples_per_symbol,down=1) # zero-pad if signal is too short if len(constellation_signal_baseband) < max_num_samples: constellation_signal_baseband = pad_head_tail_to_length ( constellation_signal_baseband, max_num_samples ) # slice if signal is too long elif len(constellation_signal_baseband) > max_num_samples: constellation_signal_baseband = slice_tail_to_length ( constellation_signal_baseband, max_num_samples ) # else: signal correct length, do nothing return constellation_signal_baseband
[docs] def constellation_modulator ( class_name:str, pulse_shape_name:str, bandwidth:float, sample_rate:float, num_samples:int, alpha_rolloff:float=None, rng=np.random.default_rng() ) -> np.ndarray: """Modulator for constellation-based signals (QAM/PSK/ASK/OOK). Args: class_name (str): The modulation to create, ex: 'qpsk'. pulse_shape_name (str): Pulse shaping filter selection, 'rectangular' or 'srrc' (square-root raised cosine). bandwidth (float): The desired 3 dB bandwidth of the signal. Must be in the same units as `sample_rate` and within the bounds 0 < `bandwidth` < `sample_rate`. sample_rate (float): The sampling rate for the IQ signal. The sample rate can use a normalized value of 1, or it can use a practical sample rate such as 10 MHz. However, it must use the same units as the bandwidth parameter. num_samples (int): The number of IQ samples to produce. alpha_rolloff (float, optional): The alpha-rolloff value for the SRRC filter. This is a pass through to constellation_baseband_modulator(). If pulse_shape_name == 'recantangular' then this value is ignored. If pulse_shape_name == 'srrc' then this value must be defined, and within the range 0 < alpha_rolloff < 1. Defaults to None. rng (optional): Seedable random number generator for reproducibility. Raises: ValueError: Raises ValueError if the number of samples produced is incorrect. Returns: np.ndarray: Returns the constellation-modulated IQ samples at the appropriate center frequency and bandwidth. """ # calculate final oversampling rate oversampling_rate = sample_rate/bandwidth # modulate at a nominal oversampling rate. a resampling will be applied # after the baseband modulation to bring it to the appropriate bandwidth. oversampling_rate_baseband = 4 # calculate the resampling rate needed to convert the baseband signal into proper bandwidth resample_rate_ideal = oversampling_rate/oversampling_rate_baseband # determine how many samples baseband modulator needs to implement. # use floor() to ensure that generated sequence is slightly less in # order to avoid slicing any portion of the burst, and instead # zero-pad with a small number of samples at the end to bring up to # appropriate length num_samples_baseband_init = int(np.floor(num_samples/resample_rate_ideal)) if num_samples_baseband_init <= 0: num_samples_baseband = oversampling_rate_baseband else: num_samples_baseband = num_samples_baseband_init # modulate at baseband constellation_signal_baseband = constellation_modulator_baseband ( class_name, pulse_shape_name, num_samples_baseband, oversampling_rate_baseband, alpha_rolloff, rng ) # apply resampling constellation_mod_correct_bw = multistage_polyphase_resampler ( constellation_signal_baseband, resample_rate_ideal ) # either slice or pad the signal to the proper length if len(constellation_mod_correct_bw) > num_samples: constellation_mod_signal = slice_head_tail_to_length ( constellation_mod_correct_bw, num_samples ) else: constellation_mod_signal = pad_head_tail_to_length ( constellation_mod_correct_bw, num_samples ) if len(constellation_mod_signal) != num_samples: raise ValueError('constellation mod producing incorrect number of samples: ' + str(len(constellation_mod_signal)) + ' but requested: ' + str(num_samples)) # convert to appropriate type constellation_mod_signal = constellation_mod_signal.astype(torchsig_complex_data_type) return constellation_mod_signal
# Builder
[docs] class ConstellationSignalBuilder(SignalBuilder): """Implements the Constellation family signal generator. Implements SignalBuilder() for the linearly modulated constellation-based families: QAM, PSK, PAM, ASK, OOK. Attributes: dataset_metadata (DatasetMetadata): Parameters describing the dataset required for signal generation. supported_classes (List[str]): List of supported signal classes. Set to `TorchSigSignalLists.constellation_signals`. """ supported_classes = TorchSigSignalLists.constellation_signals
[docs] def __init__(self, dataset_metadata: DatasetMetadata, class_name: str, **kwargs): """Initializes Constellation Signal Builder. Args: dataset_metadata (DatasetMetadata): Dataset metadata. class_name (str, optional): Class name. """ super().__init__(dataset_metadata=dataset_metadata, class_name=class_name, **kwargs)
def _update_data(self) -> None: """Creates the IQ samples for the constellation waveform based on the signal metadata fields. """ # wideband params sample_rate = self.dataset_metadata.sample_rate # signal params class_name = self._signal.metadata.class_name bandwidth = self._signal.metadata.bandwidth num_iq_samples_signal = self._signal.metadata.duration_in_samples # randomize pulse shape selection if self.random_generator.integers(0,2) == 0: pulse_shape_name = 'srrc' # randomize alpha_rolloff alpha_rolloff = self.random_generator.uniform(0.1,0.5) else: pulse_shape_name = 'rectangular' alpha_rolloff = None # modulate waveform to complex baseband self._signal.data = constellation_modulator( class_name, pulse_shape_name, bandwidth, sample_rate, num_iq_samples_signal, alpha_rolloff, self.random_generator ) def _update_metadata(self) -> None: """Performs a signals-specific update of signal metadata. The signal duration for a constellation waveform must be at least 1 symbol, which is greater than the default in dataset metadata which is 1 sample. Therefore the duration needs to be recalculated based on the other signal metadata fields, since a symbol period is dependent on the oversampling rate (and therefore bandwidth). """ # the duration self._signal.metadata.duration_in_samples cannot be used for Constellation # waveform. the base class Builder() uses a minimum duration of 1 sample which is too small # for constellation and other modulated waveforms. instead, it has to be calculated to be at # least 1 symbol long which is based on the bandwidth and oversampling rate minimum_duration_in_symbols = 1 oversampling_rate = int(np.ceil(self.dataset_metadata.sample_rate/self._signal.metadata.bandwidth)) minimum_duration_for_one_symbol = np.clip(oversampling_rate*minimum_duration_in_symbols, a_min=None, a_max=self.dataset_metadata.num_iq_samples_dataset) # choose the larger of the two minimums minimum_duration_in_samples = np.max((minimum_duration_for_one_symbol,self.dataset_metadata.signal_duration_in_samples_min)) # is duration parameter to be randomized? if minimum_duration_in_samples == self.dataset_metadata.signal_duration_in_samples_max: # the min and max fields are the same, so just use one of the fields self._signal.metadata.duration_in_samples = copy(self.dataset_metadata.signal_duration_in_samples_min) else: # randomize the duration self._signal.metadata.duration_in_samples = self.random_generator.integers(low=minimum_duration_in_samples, high=self.dataset_metadata.signal_duration_in_samples_max,dtype=int) # is start parameter to be randomized? if self._signal.metadata.duration_in_samples == self.dataset_metadata.num_iq_samples_dataset: # duration is equal to the total dataset length, therefore start must be zero self._signal.metadata.start_in_samples = 0 else: # given duration, start is randomly set from 0 to rightmost time that the duration still fits inside the dataset iq samples self._signal.metadata.start_in_samples = self.random_generator.integers(low=0, high=self.dataset_metadata.num_iq_samples_dataset - self._signal.metadata.duration_in_samples,dtype=int)