Source code for torchsig.signals.builders.fsk

"""Frequency Shift Keying (FSK) and related Signal Builder and Modulator Module"""

from __future__ import annotations

import numpy as np
import scipy.signal as sp

from torchsig.signals.builder import BaseSignalGenerator
from torchsig.signals.signal_types import Signal
from torchsig.utils.dsp import (
    TorchSigComplexDataType,
    multistage_polyphase_resampler,
    pad_head_tail_to_length,
    slice_head_tail_to_length,
    slice_tail_to_length,
)


[docs] def get_fsk_freq_map(n: int) -> np.ndarray: """Generates frequency symbol maps for FSK and MSK variants. Args: n: Number of frequency points in the constellation. Returns: np.ndarray: Array of frequency points for the constellation. """ return np.linspace(-1 + (1 / n), 1 - (1 / n), n, endpoint=True)
[docs] def get_fsk_mod_index(fsk_type: str, rng: np.random.Generator | None = None) -> float: """Determines the modulation index for different FSK variants. The modulation index is derived from the symbol spacing in the frequency domain. Orthogonal FSK has a modulation index of 1.0, and MSK/GMSK have 0.5. Args: fsk_type: Type of FSK modulation ('fsk', 'gfsk', 'msk', 'gmsk'). rng: Random number generator for reproducibility. If None, creates a new default generator. Returns: float: Modulation index. Raises: ValueError: If fsk_type is not one of the supported types. """ if rng is None: rng = np.random.default_rng() if fsk_type == "gfsk": # Bluetooth GFSK return rng.uniform(0.1, 0.5) if fsk_type in ("msk", "gmsk"): # MSK and GMSK return 0.5 if fsk_type == "fsk": # FSK - 50% chance for orthogonal (mod_idx=1) or non-orthogonal orthogonal_probability = 0.50 return ( 1.0 if rng.uniform(0, 1) < orthogonal_probability else rng.uniform(0.7, 1.01) ) raise ValueError(f"Unexpected fsk_type: {fsk_type}")
[docs] def gaussian_taps( samples_per_symbol: int, bt: float, rng: np.random.Generator | None = None ) -> np.ndarray: """Designs a Gaussian pulse shape for GMSK and GFSK. Args: samples_per_symbol: Number of samples per symbol. bt: Time-bandwidth product (0.0 to 1.0). rng: Random number generator for reproducibility. If None, creates a new default generator. Returns: np.ndarray: Filter weights for the Gaussian pulse shape. Raises: ValueError: If bt is not in the valid range (0.0 to 1.0). """ if rng is None: rng = np.random.default_rng() if not 0.0 <= bt <= 1.0: raise ValueError("bt must be between 0.0 and 1.0") m = rng.integers(1, 5) # Randomize filter span n = np.arange(-m * samples_per_symbol, m * samples_per_symbol + 1) p = np.exp(-2 * np.pi**2 * bt**2 / np.log(2) * (n / float(samples_per_symbol)) ** 2) return p / np.sum(p)
[docs] def fsk_modulator_baseband( constellation_size: int, fsk_type: str, max_num_samples: int, oversampling_rate_nominal: int, rng: np.random.Generator | None = None, ) -> np.ndarray: """FSK modulator at baseband. Args: constellation_size: Number of points in the constellation. fsk_type: Type of FSK modulation ('fsk', 'gfsk', 'msk', 'gmsk'). max_num_samples: Maximum number of samples to produce. oversampling_rate_nominal: Oversampling rate (sampling_rate/bandwidth). rng: Random number generator for reproducibility. If None, creates a new default generator. Returns: np.ndarray: FSK modulated signal at baseband. Raises: ValueError: If max_num_samples or oversampling_rate_nominal are not positive. """ # Input validation if max_num_samples <= 0: raise ValueError("max_num_samples must be positive") if oversampling_rate_nominal <= 0: raise ValueError("oversampling_rate_nominal must be positive") if rng is None: rng = np.random.default_rng() # Determine modulation index mod_idx = get_fsk_mod_index(fsk_type, rng) # Get FSK frequency symbol map const = get_fsk_freq_map(constellation_size) const_oversampled = const / oversampling_rate_nominal # Calculate modulation order and samples per symbol mod_order = len(const) samples_per_symbol = int(mod_order * oversampling_rate_nominal) # Create pulse shape pulse_shape = np.ones(samples_per_symbol) if "g" in fsk_type: # GMSK, GFSK bt = rng.uniform(0.1, 0.5) # Randomize time-bandwidth product taps = gaussian_taps(samples_per_symbol, bt, rng) pulse_shape = sp.convolve(taps, pulse_shape) # Calculate number of symbols max_num_samples_minus_pulse_shape = max_num_samples - len(pulse_shape) + 1 num_symbols = max( 1, int(np.floor(max_num_samples_minus_pulse_shape / samples_per_symbol)) ) # Generate symbols symbol_nums = rng.integers(0, len(const_oversampled), num_symbols) symbols = const_oversampled[symbol_nums] # Apply pulse shaping and modulation filtered = sp.upfirdn(pulse_shape, symbols, up=samples_per_symbol, down=1) phase = np.cumsum(np.array(filtered) * 1j * mod_idx * np.pi) modulated = np.exp(phase) # Adjust signal length if len(modulated) > max_num_samples: modulated = slice_tail_to_length(modulated, max_num_samples) elif len(modulated) < max_num_samples: modulated = pad_head_tail_to_length(modulated, max_num_samples) return modulated
[docs] def fsk_modulator( constellation_size: int, fsk_type: str, bandwidth: float, sample_rate: float, num_samples: int, rng: np.random.Generator | None = None, ) -> np.ndarray: """FSK modulator. Args: constellation_size: Number of points in the constellation. fsk_type: Type of FSK modulation ('fsk', 'gfsk', 'msk', 'gmsk'). bandwidth: Desired 3 dB bandwidth of the signal (Hz). sample_rate: Sampling rate for the IQ signal (Hz). num_samples: Number of IQ samples to produce. rng: Random number generator for reproducibility. If None, creates a new default generator. Returns: np.ndarray: FSK modulated signal at the appropriate bandwidth. Raises: ValueError: If bandwidth or sample_rate are not positive. ValueError: If bandwidth exceeds sample_rate/2. ValueError: If num_samples is not positive. """ # Input validation if bandwidth <= 0: raise ValueError("bandwidth must be positive") if sample_rate <= 0: raise ValueError("sample_rate must be positive") if bandwidth > sample_rate / 2: raise ValueError("bandwidth must be less than sample_rate/2") if num_samples <= 0: raise ValueError("num_samples must be positive") if rng is None: rng = np.random.default_rng() # Baseband modulation parameters oversampling_rate_nominal = 4 oversampling_rate = sample_rate / bandwidth resample_rate_ideal = oversampling_rate / oversampling_rate_nominal # Calculate baseband samples max_num_samples = max( oversampling_rate_nominal, int(np.floor(num_samples / resample_rate_ideal)) ) # Generate and resample signal baseband_signal = fsk_modulator_baseband( constellation_size, fsk_type, max_num_samples, oversampling_rate_nominal, rng ) fsk_correct_bw = multistage_polyphase_resampler( baseband_signal, resample_rate_ideal ) fsk_correct_bw *= 1 / resample_rate_ideal # Adjust signal length fsk_correct_bw = ( slice_head_tail_to_length(fsk_correct_bw, num_samples) if len(fsk_correct_bw) > num_samples else pad_head_tail_to_length(fsk_correct_bw, num_samples) ) return fsk_correct_bw.astype(TorchSigComplexDataType)
[docs] class FSKSignalGenerator(BaseSignalGenerator): """FSK Signal Generator. Implements FSK waveforms with configurable parameters. """
[docs] def __init__(self, **kwargs: dict[str, str | float | int]) -> None: """Initializes FSK Signal Generator. Args: **kwargs: Metadata parameters including: - sample_rate: Sampling rate (Hz) - bandwidth_min: Minimum bandwidth (Hz) - bandwidth_max: Maximum bandwidth (Hz) - fsk_type: Type of FSK ('fsk', 'gfsk', 'msk', 'gmsk') - constellation_size: Number of points in the constellation - signal_duration_in_samples_min: Minimum signal duration (samples) - signal_duration_in_samples_max: Maximum signal duration (samples) Raises: ValueError: If required metadata fields are missing or invalid. """ super().__init__(**kwargs) self.required_metadata_fields = [ "sample_rate", "bandwidth_min", "bandwidth_max", "fsk_type", "constellation_size", "signal_duration_in_samples_min", "signal_duration_in_samples_max", ] self.set_default_class_name(f"{self['constellation_size']}{self['fsk_type']}")
[docs] def generate(self) -> Signal: """Generates an FSK signal based on the configured parameters. Returns: Signal: Generated FSK signal with metadata. Raises: ValueError: If required metadata fields are missing or invalid. """ # Get parameters from metadata sample_rate = self["sample_rate"] num_iq_samples_signal = self.random_generator.integers( low=self["signal_duration_in_samples_min"], high=self["signal_duration_in_samples_max"] + 1, ) bandwidth = self.random_generator.integers( low=self["bandwidth_min"], high=self["bandwidth_max"] + 1 ) fsk_type = self["fsk_type"] constellation_size = self["constellation_size"] # Generate signal signal_data = fsk_modulator( constellation_size, fsk_type, bandwidth, sample_rate, num_iq_samples_signal, self.random_generator, ) return Signal(data=signal_data, center_freq=0, bandwidth=bandwidth)