Source code for torchsig.signals.builders.ofdm

"""OFDM Signal Builder and Modulator
"""
# TorchSig
from torchsig.signals.builder import SignalBuilder
from torchsig.datasets.dataset_metadata import DatasetMetadata
from torchsig.utils.dsp import (
    multistage_polyphase_resampler,
    pad_head_tail_to_length,
    slice_head_tail_to_length,
    slice_tail_to_length,
    torchsig_complex_data_type
)
from torchsig.signals.signal_lists import TorchSigSignalLists
from torchsig.signals.builders.constellation_maps import all_symbol_maps

# Third Party
import numpy as np
from copy import copy


# Modulator
[docs] def ofdm_modulator_baseband ( class_name:str, max_num_samples:int, oversampling_rate_nominal:int, rng=np.random.default_rng() ) -> np.ndarray: """Modulates OFDM signal at baseband Args: class_name (str): Name of the signal to modulate, ex: 'ofdm-1024'. max_num_samples (int): Maximum number of samples to be produced. The length of the output signal must be less than or equal to this number. oversampling_rate_nominal (int): The amount of oversampling, which is equal to the ratio of the ratio of the sampling rate and bandwidth. rng (optional): Seedable random number generator for reproducibility. Returns: np.ndarray: OFDM modulated signal """ # split the class name to determine how many subcarriers num_subcarriers = int(class_name.split('-')[1]) # define the oversampling rate for the OFDM signal to be generated oversampling_rate_nominal = 4 # calculate IFFT size ifft_size = int(oversampling_rate_nominal*num_subcarriers) # 50% chance to use cyclic prefix or not if rng.uniform(0,1) < 0.5: # no cyclic prefix cp_len = 0 else: # randomize the cyclic prefix length cp_len = rng.integers(2,int(num_subcarriers/2)) # update cyclic prefix length to account for oversampling cp_len_oversampled = cp_len*oversampling_rate_nominal # length of ofdm symbol ofdm_symbol_length = num_subcarriers+cp_len # length of ofdm symbol to account for oversampling ofdm_symbol_length_oversampled = ofdm_symbol_length*oversampling_rate_nominal # how many OFDM symbols to generate? always produce a number of samples slightly # larger than the maximum, this is to avoid returning a long string of zeros # which makes the FFT produce a log10(0) which produces an error num_ofdm_symbols = int(np.ceil(max_num_samples/ofdm_symbol_length_oversampled)) # randomize which modulation is to be used as data on subcarriers potential_subcarrier_modulations = TorchSigSignalLists.ofdm_subcarrier_modulations random_index = rng.integers(0,len(potential_subcarrier_modulations)) constellation_name = potential_subcarrier_modulations[random_index] # get the constellation map symbol_map = all_symbol_maps[constellation_name] # normalize to unit avg power symbol_map = symbol_map / np.sqrt(np.mean(np.abs(symbol_map)**2)) # generate symbols for active subcarriers across time/frequency grid map_index_grid = rng.integers(0,len(symbol_map),(num_subcarriers,num_ofdm_symbols)) symbol_grid = symbol_map[map_index_grid] # create the full time/frequency grid time_frequency_grid = np.zeros((ifft_size,num_ofdm_symbols),dtype=torchsig_complex_data_type) # fill in the active subcarriers, ignoring index 0 in order to notch DC subcarrier half_num_subcarriers = int(num_subcarriers/2) time_frequency_grid[1:half_num_subcarriers+1,:] = symbol_grid[0:half_num_subcarriers,:] time_frequency_grid[ifft_size-half_num_subcarriers:,:] = symbol_grid[half_num_subcarriers:,:] # perform IFFT to get to time-series, but still in grid modulated_grid = np.fft.ifft(time_frequency_grid,axis=0) # prepend the cyclic prefix cp_grid = modulated_grid[ifft_size-cp_len_oversampled:,:] modulated_with_cp_grid = np.concatenate((cp_grid,modulated_grid),axis=0) # serialize the time series ofdm_signal = np.ravel(np.transpose(modulated_with_cp_grid)) # enforce that the signal has the proper length. the signal here should always # be the exact proper length or a little longer, so use this function to slice # it down to proper length. discard off the tail to retain the appropriate # cyclic prefix and framing samples ofdm_signal = slice_tail_to_length(ofdm_signal,max_num_samples) return ofdm_signal
[docs] def ofdm_modulator ( class_name:str, bandwidth:float, sample_rate:float, num_samples:int, rng=np.random.default_rng() ) -> np.ndarray: """Modulator for OFDM signals Args: class_name (str): The modulation to create, ex: 'ofdm-1024'. bandwidth (float): The desired 3 dB bandwidth of the signal. Must be in the same units as `sample_rate` and within the bounds 0 < `bandwidth` < `sample_rate`. sample_rate (float): The sampling rate for the IQ signal. The sample rate can use a normalized value of 1, or it can use a practical sample rate such as 10 MHz. However, it must use the same units as the bandwidth parameter. num_samples (int): The number of IQ samples to produce. rng (optional): Seedable random number generator for reproducibility. Returns: np.ndarray: OFDM modulated signal at the appropriate bandwidth. """ # calculate final oversampling rate oversampling_rate = sample_rate/bandwidth # the oversampling rate in the baseband modulator oversampling_rate_baseband = 4 # calculate the resampling rate resample_rate_ideal = oversampling_rate/oversampling_rate_baseband # determine how many samples baseband modulator needs to implement. num_samples_baseband = int(np.ceil(num_samples/resample_rate_ideal)) # baseband modulator ofdm_signal_baseband = ofdm_modulator_baseband ( class_name, num_samples_baseband, oversampling_rate_baseband, rng ) # apply resampling to get to the proper bandwidth ofdm_signal_correct_bw = multistage_polyphase_resampler ( ofdm_signal_baseband, resample_rate_ideal ) # either slice or pad the signal to the proper length if len(ofdm_signal_correct_bw) > num_samples: ofdm_signal_correct_bw = slice_head_tail_to_length ( ofdm_signal_correct_bw, num_samples ) else: ofdm_signal_correct_bw = pad_head_tail_to_length ( ofdm_signal_correct_bw, num_samples ) # else: correct length, do nothing # convert to appropriate type ofdm_signal_correct_bw = ofdm_signal_correct_bw.astype(torchsig_complex_data_type) return ofdm_signal_correct_bw
# Builder
[docs] class OFDMSignalBuilder(SignalBuilder): """Implements the OFDM family signal generator. Implements SignalBuilder() for the OFDM signals. Attributes: dataset_metadata (DatasetMetadata): Parameters describing the dataset required for signal generation. supported_classes (List[str]): List of supported signal classes. Set to `TorchSigSignalLists.ofdm_signals`. """ supported_classes = TorchSigSignalLists.ofdm_signals
[docs] def __init__(self, dataset_metadata: DatasetMetadata, class_name: str, **kwargs): """Initializes OFDM Signal Builder. Args: dataset_metadata (DatasetMetadata): Dataset metadata. class_name (str, optional): Class name. """ super().__init__(dataset_metadata=dataset_metadata, class_name=class_name, **kwargs)
def _update_data(self) -> None: """Creates the IQ samples for the OFDM waveform based on the signal metadata fields """ # wideband params sample_rate = self.dataset_metadata.sample_rate # signal params class_name = self._signal.metadata.class_name bandwidth = self._signal.metadata.bandwidth num_iq_samples_signal = self._signal.metadata.duration_in_samples # modulate waveform to complex baseband self._signal.data = ofdm_modulator( class_name, bandwidth, sample_rate, num_iq_samples_signal, self.random_generator ) def _update_metadata(self) -> None: """Performs a signal-specific update of signal metadata. Properly defines the minimum duration such that OFDM will generate at least 1 symbol. """ # split the class name to determine how many subcarriers num_subcarriers = int(self._signal.metadata.class_name.split('-')[1]) # calculate final oversampling rate oversampling_rate = self.dataset_metadata.sample_rate/self._signal.metadata.bandwidth # calculate the minimum samples to generate minimum_duration_in_samples_for_ofdm = int(np.round(num_subcarriers*oversampling_rate)) # select the appropriate value against the signal minimum and the dataset minimum minimum_duration_in_samples = np.max((minimum_duration_in_samples_for_ofdm,self.dataset_metadata.signal_duration_in_samples_min)) if (minimum_duration_in_samples >= self.dataset_metadata.signal_duration_in_samples_max): # the estimated minimum is too large, use the max instead self._signal.metadata.duration_in_samples = copy(self.dataset_metadata.signal_duration_in_samples_max) else: # randomize the duration self._signal.metadata.duration_in_samples = self.random_generator.integers(low=minimum_duration_in_samples, high=self.dataset_metadata.signal_duration_in_samples_max,dtype=int) # is start parameter to be randomized? if self._signal.metadata.duration_in_samples == self.dataset_metadata.num_iq_samples_dataset: # duration is equal to the total dataset length, therefore start must be zero self._signal.metadata.start_in_samples = 0 else: # given duration, start is randomly set from 0 to rightmost time that the duration still fits inside the dataset iq samples self._signal.metadata.start_in_samples = self.random_generator.integers(low=0, high=self.dataset_metadata.num_iq_samples_dataset - self._signal.metadata.duration_in_samples,dtype=int)