Source code for torchsig.signals.builders.fsk

"""Frequency Shift Keying (FSK) and related Signal Builder and Modulator
"""

# TorchSig
from torchsig.signals.builder import SignalBuilder
from torchsig.datasets.dataset_metadata import DatasetMetadata
from torchsig.utils.dsp import (
    multistage_polyphase_resampler,
    pad_head_tail_to_length,
    slice_head_tail_to_length,
    slice_tail_to_length,
    torchsig_complex_data_type
)
from torchsig.signals.signal_lists import TorchSigSignalLists

# Third Party
import numpy as np
import scipy.signal as sp
from copy import copy

# Built-In
from collections import OrderedDict

[docs] def get_fsk_freq_map ( ): """Contains symbol maps for FSK and MSK variants. Returns: OrderedDict: a dictionary of all symbol maps. """ freq_map = OrderedDict( { "2fsk": np.linspace(-1 + (1 / 2), 1 - (1 / 2), 2, endpoint=True), "2gfsk": np.linspace(-1 + (1 / 2), 1 - (1 / 2), 2, endpoint=True), "2msk": np.linspace(-1 + (1 / 2), 1 - (1 / 2), 2, endpoint=True), "2gmsk": np.linspace(-1 + (1 / 2), 1 - (1 / 2), 2, endpoint=True), "4fsk": np.linspace(-1 + (1 / 4), 1 - (1 / 4), 4, endpoint=True), "4gfsk": np.linspace(-1 + (1 / 4), 1 - (1 / 4), 4, endpoint=True), "4msk": np.linspace(-1 + (1 / 4), 1 - (1 / 4), 4, endpoint=True), "4gmsk": np.linspace(-1 + (1 / 4), 1 - (1 / 4), 4, endpoint=True), "8fsk": np.linspace(-1 + (1 / 8), 1 - (1 / 8), 8, endpoint=True), "8gfsk": np.linspace(-1 + (1 / 8), 1 - (1 / 8), 8, endpoint=True), "8msk": np.linspace(-1 + (1 / 8), 1 - (1 / 8), 8, endpoint=True), "8gmsk": np.linspace(-1 + (1 / 8), 1 - (1 / 8), 8, endpoint=True), "16fsk": np.linspace(-1 + (1 / 16), 1 - (1 / 16), 16, endpoint=True), "16gfsk": np.linspace(-1 + (1 / 16), 1 - (1 / 16), 16, endpoint=True), "16msk": np.linspace(-1 + (1 / 16), 1 - (1 / 16), 16, endpoint=True), "16gmsk": np.linspace(-1 + (1 / 16), 1 - (1 / 16), 16, endpoint=True), } ) return freq_map
[docs] def get_fsk_mod_index( class_name:str, rng=np.random.default_rng() ) -> float: """FSK modulation index. The modulation index is a parameter that is derived from the symbol spacing in the frequency domain. Orthogonal FSK has a modulation index of 1.0, and MSK and GMSK have a modulation index of 0.5. The modulation index is randomized over a wide range for both GFSK and FSK. Args: class_name (str): Class name to return modulation index, ex: '2fsk'. rng (optional): Seedable random number generator for reproducibility. Returns: float: Modulation index. """ # returns the modulation index based on the modulation if "gfsk" in class_name: # bluetooth mod_idx = rng.uniform(0.1,0.5) elif "msk" in class_name: # MSK, GMSK mod_idx = 0.5 else: # FSK # 50% chance to use mod index of 1 (orthogonal) ... if rng.uniform(0,1) < 0.5: mod_idx = 1 else: # ... or something else (non-orthogonal). include # a modulation index both less than 1 and greater # than 1 to train over a variety of parameters mod_idx = rng.uniform(0.7,1.1) return mod_idx
[docs] def gaussian_taps(samples_per_symbol: int, bt:float, rng=np.random.default_rng()) -> np.ndarray: """Designs a gaussian pulse shape for GMSK and GFSK. Args: samples_per_symbol (int): Number of samples per symbol. bt (float): The time-bandwidth product for the Gaussian pulse shape. On the range 0.0 to 1.0. rng (optional): Seedable random number generator for reproducibility. Returns: np.ndarray: Filter weights for the gaussian pulse shape. """ # pre-modulation Bb*T product which sets the bandwidth of the Gaussian lowpass filter m = rng.integers(1,5) # randomize the filter span n = np.arange(-m * samples_per_symbol, m * samples_per_symbol + 1) p = np.exp(-2 * np.pi**2 * bt**2 / np.log(2) * (n / float(samples_per_symbol)) ** 2) p = p / np.sum(p) return p
[docs] def fsk_modulator_baseband ( class_name:str, max_num_samples:int, oversampling_rate_nominal:int, rng=np.random.default_rng() ) -> np.ndarray: """FSK modulator at baseband. Args: class_name (str): Name of the signal to modulate, ex: '2fsk'. max_num_samples (int): Maximum number of samples to be produced. The length of the output signal must be less than or equal to this number. oversampling_rate_nominal (int): The amount of oversampling, which is equal to the ratio of the ratio of the sampling rate and bandwidth. rng (optional): Seedable random number generator for reproducibility. Returns: np.ndarray: FSK modulated signal at baseband. """ # determine modulation index mod_idx = get_fsk_mod_index(class_name,rng) # get the FSK frequency symbol map freq_map = get_fsk_freq_map() # get the constellation to modulate const = freq_map[class_name] # scale the frequency map by the oversampling rate such that the tones # are packed tighter around f=0 the larger the oversampling rate const_oversampled = const / oversampling_rate_nominal # calculate the modulation order, ex: the "4" in "4-FSK" mod_order = len(const) # determine how many samples are in each symbol samples_per_symbol = int(mod_order * oversampling_rate_nominal) # rectangular pulse shape pulse_shape = np.ones(samples_per_symbol) if "g" in class_name: # GMSK, GFSK # design the gaussian pulse shape with the bandwidth as dictated by the oversampling rate #preresample_bandwidth = 1/oversampling_rate_nominal bt = rng.uniform(0.1,0.5) # randomize the time-bandwdith product taps = gaussian_taps(samples_per_symbol, bt, rng) pulse_shape = sp.convolve(taps,pulse_shape) # account for the increase in samples due to convolution of pulse shaping filter max_num_samples_minus_pulse_shape = max_num_samples - len(pulse_shape) + 1 # convert number of samples into number of symbols num_symbols = int(np.floor(max_num_samples_minus_pulse_shape/samples_per_symbol)) num_symbols = 1 if num_symbols <= 0 else num_symbols # calculate the indexes into symbol table symbol_nums = rng.integers(0, len(const_oversampled), num_symbols) # produce data symbols symbols = const_oversampled[symbol_nums] # upsample symbols and apply pulse shaping filtered = sp.upfirdn(pulse_shape,symbols,up=samples_per_symbol,down=1) # phase accumulator and scaling phase = np.cumsum(np.array(filtered) * 1j * mod_idx * np.pi) # apply frequency modulation modulated = np.exp(phase) # pad if signal is too long if len(modulated) > max_num_samples: # slice to max length modulated = slice_tail_to_length ( modulated, max_num_samples ) elif len(modulated) < max_num_samples: # pad to full length modulated = pad_head_tail_to_length ( modulated, max_num_samples ) # else: correct length do nothing return modulated
[docs] def fsk_modulator ( class_name:str, bandwidth:float, sample_rate:float, num_samples:int, rng=np.random.default_rng() ) -> np.ndarray: """FSK modulator. Args: class_name (str): The modulation to create, ex: '2fsk'. bandwidth (float): The desired 3 dB bandwidth of the signal. Must be in the same units as `sample_rate` and within the bounds 0 < `bandwidth` < `sample_rate`. sample_rate (float): The sampling rate for the IQ signal. The sample rate can use a normalized value of 1, or it can use a practical sample rate such as 10 MHz. However, it must use the same units as the bandwidth parameter. num_samples (int): The number of IQ samples to produce. rng (optional): Seedable random number generator for reproducibility. Returns: np.ndarray: FSK modulated at the appropriate bandwidth. """ # define the baseband oversampling rate oversampling_rate_nominal = 4 # calculate the output resampling rate oversampling_rate = sample_rate/bandwidth # how much to resample the baseband signal to match the bandwidth resample_rate_ideal = oversampling_rate/oversampling_rate_nominal # calculate the maximum number of samples to be produced by the baseband modulator max_num_samples = int(np.floor(num_samples/resample_rate_ideal)) # ensures a minimum number of samples if (max_num_samples < oversampling_rate_nominal): max_num_samples = copy(oversampling_rate_nominal) # modulate the baseband signal baseband_signal = fsk_modulator_baseband ( class_name, max_num_samples, oversampling_rate_nominal, rng ) # apply resampling fsk_correct_bw = multistage_polyphase_resampler ( baseband_signal, resample_rate_ideal ) # scale to account for the resampling fsk_correct_bw *= 1/resample_rate_ideal # either slice or pad the signal to the proper length if len(fsk_correct_bw) > num_samples: fsk_correct_bw = slice_head_tail_to_length ( fsk_correct_bw, num_samples ) else: fsk_correct_bw = pad_head_tail_to_length ( fsk_correct_bw, num_samples ) # convert into the appropriate data type fsk_correct_bw = fsk_correct_bw.astype(torchsig_complex_data_type) return fsk_correct_bw
# Builder
[docs] class FSKSignalBuilder(SignalBuilder): """Implements SignalBuilder() for frequency shift keying modulation (FSK) waveform. Attributes: dataset_metadata (DatasetMetadata): Parameters describing the dataset required for signal generation. supported_classes (List[str]): List of supported signal classes. Set to `["2fsk"]`. """ supported_classes = TorchSigSignalLists.fsk_signals
[docs] def __init__(self, dataset_metadata: DatasetMetadata, class_name:str = '2fsk', **kwargs): """Initializes FSK Signal Builder. Sets `class_name= "2fsk"`. Args: dataset_metadata (DatasetMetadata): Dataset metadata. class_name (str, optional): Class name. """ super().__init__(dataset_metadata=dataset_metadata, class_name=class_name, **kwargs)
def _update_data(self) -> None: """Creates the IQ samples for the FSK waveform based on the signal metadata fields. """ # wideband params sample_rate = self.dataset_metadata.sample_rate # signal params num_iq_samples_signal = self._signal.metadata.duration_in_samples bandwidth = self._signal.metadata.bandwidth class_name = self._signal.metadata.class_name # FSK modulator at complex baseband self._signal.data = fsk_modulator( class_name, bandwidth, sample_rate, num_iq_samples_signal, self.random_generator ) def _update_metadata(self) -> None: """Performs a signals-specific update of signal metadata. This does nothing because the signal does not need any fields to be updated. This `_update_metadata()` must be implemented but is not required to create or modify any data or fields for this particular signal case. """