Source code for torchsig.signals.builders.ofdm

"""OFDM Signal Builder and Modulator Module"""

from __future__ import annotations

import numpy as np

from torchsig.signals.builder import BaseSignalGenerator
from torchsig.signals.builders.constellation_maps import all_symbol_maps
from torchsig.signals.signal_lists import TorchSigSignalLists
from torchsig.signals.signal_types import Signal
from torchsig.utils.dsp import (
    TorchSigComplexDataType,
    multistage_polyphase_resampler,
    pad_head_tail_to_length,
    slice_head_tail_to_length,
    slice_tail_to_length,
)


[docs] def ofdm_modulator_baseband( num_subcarriers: int, max_num_samples: int, oversampling_rate_nominal: int, rng: np.random.Generator | None = None, ) -> np.ndarray: """Modulates OFDM signal at baseband. Args: num_subcarriers: Number of subcarriers to use. max_num_samples: Maximum number of samples to produce. oversampling_rate_nominal: Oversampling rate (sampling_rate/bandwidth). rng: Random number generator for reproducibility. If None, creates a new default generator. Returns: np.ndarray: OFDM modulated signal at baseband. Raises: ValueError: If num_subcarriers, max_num_samples, or oversampling_rate_nominal are not positive. """ # Input validation if num_subcarriers <= 0: raise ValueError("num_subcarriers must be positive") if max_num_samples <= 0: raise ValueError("max_num_samples must be positive") if oversampling_rate_nominal <= 0: raise ValueError("oversampling_rate_nominal must be positive") if rng is None: rng = np.random.default_rng() # Define oversampling rate for OFDM signal oversampling_rate_nominal = 4 ifft_size = int(oversampling_rate_nominal * num_subcarriers) # Randomize cyclic prefix cyclic_prefix_probability = 0.50 cp_len = ( 0 if rng.uniform(0, 1) < cyclic_prefix_probability else rng.integers(2, int(num_subcarriers / 2)) ) cp_len_oversampled = cp_len * oversampling_rate_nominal # Calculate OFDM symbol lengths ofdm_symbol_length = num_subcarriers + cp_len ofdm_symbol_length_oversampled = ofdm_symbol_length * oversampling_rate_nominal # Determine number of OFDM symbols num_ofdm_symbols = int(np.ceil(max_num_samples / ofdm_symbol_length_oversampled)) # Randomize subcarrier modulation potential_subcarrier_modulations = TorchSigSignalLists.ofdm_subcarrier_modulations random_index = rng.integers(0, len(potential_subcarrier_modulations)) constellation_name = potential_subcarrier_modulations[random_index] # Get and normalize symbol map symbol_map = all_symbol_maps[constellation_name] symbol_map = symbol_map / np.sqrt(np.mean(np.abs(symbol_map) ** 2)) # Generate symbols for active subcarriers map_index_grid = rng.integers( 0, len(symbol_map), (num_subcarriers, num_ofdm_symbols) ) symbol_grid = symbol_map[map_index_grid] # Create time/frequency grid time_frequency_grid = np.zeros( (ifft_size, num_ofdm_symbols), dtype=TorchSigComplexDataType ) half_num_subcarriers = int(num_subcarriers / 2) time_frequency_grid[1 : half_num_subcarriers + 1, :] = symbol_grid[ 0:half_num_subcarriers, : ] time_frequency_grid[ifft_size - half_num_subcarriers :, :] = symbol_grid[ half_num_subcarriers:, : ] # Perform IFFT modulated_grid = np.fft.ifft(time_frequency_grid, axis=0) # Add cyclic prefix cp_grid = modulated_grid[ifft_size - cp_len_oversampled :, :] modulated_with_cp_grid = np.concatenate((cp_grid, modulated_grid), axis=0) # Serialize time series ofdm_signal = np.ravel(np.transpose(modulated_with_cp_grid)) # Enforce proper length return slice_tail_to_length(ofdm_signal, max_num_samples)
[docs] def ofdm_modulator( num_subcarriers: int, bandwidth: float, sample_rate: float, num_samples: int, rng: np.random.Generator | None = None, ) -> np.ndarray: """Modulator for OFDM signals. Args: num_subcarriers: Number of subcarriers to use. bandwidth: Desired 3 dB bandwidth of the signal (Hz). sample_rate: Sampling rate for the IQ signal (Hz). num_samples: Number of IQ samples to produce. rng: Random number generator for reproducibility. If None, creates a new default generator. Returns: np.ndarray: OFDM modulated signal at the appropriate bandwidth. Raises: ValueError: If bandwidth or sample_rate are not positive. ValueError: If bandwidth exceeds sample_rate/2. ValueError: If num_samples is not positive. """ # Input validation if bandwidth <= 0: raise ValueError("bandwidth must be positive") if sample_rate <= 0: raise ValueError("sample_rate must be positive") if bandwidth > sample_rate / 2: raise ValueError("bandwidth must be less than sample_rate/2") if num_samples <= 0: raise ValueError("num_samples must be positive") if rng is None: rng = np.random.default_rng() # Calculate resampling parameters oversampling_rate = sample_rate / bandwidth oversampling_rate_baseband = 4 resample_rate_ideal = oversampling_rate / oversampling_rate_baseband # Calculate baseband samples num_samples_baseband = int(np.ceil(num_samples / resample_rate_ideal)) # Generate and resample signal ofdm_signal_baseband = ofdm_modulator_baseband( num_subcarriers, num_samples_baseband, oversampling_rate_baseband, rng ) ofdm_signal_correct_bw = multistage_polyphase_resampler( ofdm_signal_baseband, resample_rate_ideal ) # Adjust signal length ofdm_signal_correct_bw = ( slice_head_tail_to_length(ofdm_signal_correct_bw, num_samples) if len(ofdm_signal_correct_bw) > num_samples else pad_head_tail_to_length(ofdm_signal_correct_bw, num_samples) ) return ofdm_signal_correct_bw.astype(TorchSigComplexDataType)
[docs] class OFDMSignalGenerator(BaseSignalGenerator): """OFDM Signal Generator. Implements OFDM waveforms with configurable parameters. """
[docs] def __init__(self, **kwargs: dict[str, str | float | int]) -> None: """Initializes OFDM Signal Generator. Args: **kwargs: Metadata parameters including: - sample_rate: Sampling rate (Hz) - bandwidth_min: Minimum bandwidth (Hz) - bandwidth_max: Maximum bandwidth (Hz) - num_subcarriers: Number of subcarriers - signal_duration_in_samples_min: Minimum signal duration (samples) - signal_duration_in_samples_max: Maximum signal duration (samples) Raises: ValueError: If required metadata fields are missing or invalid. """ super().__init__(**kwargs) self.required_metadata_fields = [ "sample_rate", "bandwidth_min", "bandwidth_max", "num_subcarriers", "signal_duration_in_samples_min", "signal_duration_in_samples_max", ] self.set_default_class_name(f"ofdm-{self['num_subcarriers']}")
[docs] def generate(self) -> Signal: """Generates an OFDM signal based on the configured parameters. Returns: Signal: Generated OFDM signal with metadata. Raises: ValueError: If required metadata fields are missing or invalid. """ # Get parameters from metadata sample_rate = self["sample_rate"] num_iq_samples_signal = self.random_generator.integers( low=self["signal_duration_in_samples_min"], high=self["signal_duration_in_samples_max"] + 1, ) bandwidth = self.random_generator.integers( low=self["bandwidth_min"], high=self["bandwidth_max"] + 1 ) num_subcarriers = self["num_subcarriers"] # Generate signal signal_data = ofdm_modulator( num_subcarriers, bandwidth, sample_rate, num_iq_samples_signal, self.random_generator, ) return Signal(data=signal_data, center_freq=0, bandwidth=bandwidth)