Source code for torchsig.transforms.signal_transforms
"""SignalTransforms on Signal objects.
"""
__all__ = [
"SignalTransform",
"AdditiveNoiseSignalTransform",
"AdjacentChannelInterference",
"CarrierPhaseOffsetSignalTransform",
"CochannelInterference",
"DopplerSignalTransform",
"Fading",
"IntermodulationProductsSignalTransform",
"IQImbalanceSignalTransform",
"LocalOscillatorFrequencyDriftSignalTransform",
"LocalOscillatorPhaseNoiseSignalTransform",
"NonlinearAmplifierSignalTransform",
"PassbandRippleSignalTransform",
"QuantizeSignalTransform",
"ShadowingSignalTransform",
"SpectralInversionSignalTransform",
]
# TorchSig
from torchsig.transforms.base_transforms import Transform
from torchsig.signals.signal_types import Signal
import torchsig.transforms.functional as F
from torchsig.utils.dsp import (
torchsig_complex_data_type,
low_pass
)
# Third Party
import numpy as np
import scipy as sp
# Built-In
from typing import Tuple, List
[docs]
class SignalTransform(Transform):
"""SignalTransform parent class.
"""
[docs]
def update(self, signal: Signal) -> None:
"""Updates bookkeeping to transforms in Signal's SignalMetadata and checks signal valididty.
Inherited classes should always call self.update() after performing transform operation (inside __call__).
Args:
signal (Signal): Transformed signal.
"""
signal.metadata.applied_transforms.append(self)
# signal.verify()
[docs]
def __call__(self, signal: Signal) -> Signal:
"""Performs transforms.
Args:
signal (Signal): Signal to be transformed.
Raises:
NotImplementedError: Inherited classes must override this method.
Returns:
Signal: Transformed Signal.
"""
raise NotImplementedError
[docs]
class AdditiveNoiseSignalTransform(SignalTransform):
"""Adds noise with specified properties to Signal data.
Attributes:
power_range (Tuple[float, float]): Range bounds for interference power level (W).
Defaults to (0.01, 10.0).
power_distribution (Callable[[], float]): Random draw of interference power.
color (str): Noise color, supports 'white', 'pink', or 'red' noise frequency spectrum types.
Defaults to 'white'.
continuous (bool): Sets noise to continuous (True) or impulsive (False). Defaults to True.
measure (bool): Measure and update SNR metadata. Default to False.
"""
[docs]
def __init__(
self,
power_range: Tuple = (0.01, 10.0),
color: str = 'white',
continuous: bool = True,
measure: bool = False,
**kwargs
):
super().__init__(**kwargs)
self.power_range = power_range
self.power_distribution = self.get_distribution(self.power_range)
self.color = color
self.continuous = continuous
self.measure = measure
[docs]
def __call__(self, signal: Signal) -> Signal:
add_noise_power = self.power_distribution()
if self.measure:
# update SNR for full sampled band, assuming independent noise
snr_linear = 10 ** (signal.metadata.snr_db / 10)
total_power = np.sum(np.abs(signal.data)**2)/len(signal.data)
sig_power = total_power / (1 + 1/snr_linear)
noise_power = sig_power / snr_linear
new_snr = sig_power / (noise_power + add_noise_power)
signal.metadata.snr_db = 10*np.log10(new_snr)
signal.data = F.additive_noise(
data = signal.data,
power = add_noise_power,
color = self.color,
continuous = self.continuous,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class AdjacentChannelInterference(SignalTransform):
"""Applies adjacent channel interference to Signal.
Attributes:
sample_rate (float): Sample rate (normalized). Defaults to 1.0.
power_range (Tuple[float, float]): Range bounds for interference power level (W).
Defaults to (0.01, 10.0).
power_distribution (Callable[[], float]): Random draw of interference power.
center_frequency_range (Tuple[float, float]): Range bounds for interference center
frequency (normalized). Defaults to (0.2, 0.3).
center_frequency_distribution (Callable[[], float]): Random draw of interference power.
phase_sigma_range (Tuple[float, float]): Range bounds for interference phase sigma.
Defaults to (0.0, 1.0).
phase_sigma_distribution (Callable[[], float]): Random draw of phase sigma.
time_sigma_range (Tuple[float, float]): Range bounds for interference time sigma.
Defaults to (0.0, 10.0).
time_sigma_distribution (Callable[[], float]): Random draw of time sigma.
filter_weights (np.ndarray): Predefined baseband lowpass filter, fixed for all calls.
Defaults to low_pass(0.125, 0.125, 1.0).
"""
[docs]
def __init__(
self,
sample_rate: float = 1.0,
power_range: Tuple = (0.01, 10.0),
center_frequency_range: Tuple = (0.2, 0.3),
phase_sigma_range: Tuple = (0.0, 1.0),
time_sigma_range: Tuple = (0.0, 10.0),
filter_weights: np.ndarray = low_pass(0.125, 0.125, 1.0),
**kwargs
):
super().__init__(**kwargs)
self.sample_rate = sample_rate
self.power_range = power_range
self.power_distribution = self.get_distribution(self.power_range)
self.center_frequency_range = center_frequency_range
self.center_frequency_distribution = self.get_distribution(self.center_frequency_range)
self.phase_sigma_range = phase_sigma_range
self.phase_sigma_distribution = self.get_distribution(self.phase_sigma_range)
self.time_sigma_range = time_sigma_range
self.time_sigma_distribution = self.get_distribution(self.time_sigma_range)
self.filter_weights = filter_weights # predefined, fixed filter
[docs]
def __call__(self, signal: Signal) -> Signal:
signal.data = F.adjacent_channel_interference(
data = signal.data,
sample_rate = self.sample_rate,
power = self.power_distribution(),
center_frequency = self.center_frequency_distribution(),
phase_sigma = self.phase_sigma_distribution(),
time_sigma = self.time_sigma_distribution(),
filter_weights = self.filter_weights,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class CarrierPhaseOffsetSignalTransform(SignalTransform):
"""SignalTransform that applies a randomized carrier phase offset to Signal IQ data.
The randomized phase offset is of the form exp(j * phi) where
phi is in the range of 0 to 2pi radians. Real world effects such as
time delays as a signal transits the air and others can cause
such randomized phase offsets.
The transform does not usually require any arguments due to its simplicity.
It is generally unrealistic to have a randomized phase offset of a range less than 0 to 2pi.
Attributes:
phase_offset_range (Tuple[float, float]): Range bounds for phase offset (radians).
phase_offset_distribution (Callable[[], float]): Random draw from phase offset distribution.
"""
[docs]
def __init__(
self,
phase_offset_range: Tuple[float, float] = (0, 2*np.pi),
**kwargs
):
super().__init__(**kwargs)
self.phase_offset_range = phase_offset_range
self.phase_offset_distribution = self.get_distribution(self.phase_offset_range)
[docs]
def __call__(self, signal: Signal) -> Signal:
phase_offset = self.phase_offset_distribution()
signal.data = F.phase_offset(signal.data, phase_offset)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class CochannelInterference(SignalTransform):
"""Applies cochannel interference to Signal.
Attributes:
power_range (Tuple[float, float]): Range bounds for interference power level (W).
Default (0.01, 10.0).
power_distribution (float): Random draw of interference power.
filter_weights (np.ndarray): Predefined baseband lowpass filter, fixed for all calls.
Default low_pass(0.125, 0.125, 1.0).
noise_color (str): Base noise color, supports 'white', 'pink', or 'red' noise
frequency spectrum types. Default 'white'.
continuous (bool): Sets noise to continuous (True) or impulsive (False). Default True.
measure (bool): Measure and update SNR metadata. Default to False.
"""
[docs]
def __init__(
self,
power_range: Tuple = (0.01, 10.0),
filter_weights: np.ndarray = low_pass(0.125, 0.125, 1.0),
color: str = 'white',
continuous: bool = True,
measure: bool = False,
**kwargs
):
super().__init__(**kwargs)
self.power_range = power_range
self.power_distribution = self.get_distribution(self.power_range)
self.filter_weights = filter_weights # predefined, fixed band limiting filter
self.color = color
self.continuous = continuous
self.measure = measure
[docs]
def __call__(self, signal: Signal) -> Signal:
cochan_noise_power = self.power_distribution()
if self.measure:
# Treat SNR as SINR/AWGN for full sampled band, assuming independent noise
snr_linear = 10 ** (signal.metadata.snr_db / 10)
total_power = np.sum(np.abs(signal.data)**2)/len(signal.data)
sig_power = total_power / (1 + 1/snr_linear)
noise_power = sig_power / snr_linear
new_snr = sig_power / (noise_power + cochan_noise_power)
signal.metadata.snr_db = 10*np.log10(new_snr)
signal.data = F.cochannel_interference(
data = signal.data,
power = cochan_noise_power,
filter_weights = self.filter_weights,
color = self.color,
continuous = self.continuous,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class DopplerSignalTransform(SignalTransform):
"""SignalTransform that applies wideband Doppler to Signal IQ data.
Attributes:
velocity_range (Tuple[float, float]): Relative velocity bounds in m/s. Default (0.0, 10.0)
velocity_distribution (Callable[[], float]): Random draw from velocity distribution.
propagation_speed (float): Wave speed in medium. Default 2.9979e8 m/s.
sampling_rate (float): Data sampling rate. Default 1.0.
"""
[docs]
def __init__(
self,
velocity_range: Tuple[float, float] = (0.0, 10.0),
propagation_speed: float = 2.9979e8,
sampling_rate: float = 1.0,
**kwargs
):
super().__init__(**kwargs)
self.velocity_range = velocity_range
self.velocity_distribution = self.get_distribution(self.velocity_range)
self.propagation_speed = propagation_speed
self.sampling_rate = sampling_rate
[docs]
def __call__(self, signal: Signal) -> Signal:
velocity = self.velocity_distribution()
alpha = self.propagation_speed / (self.propagation_speed - velocity) # scaling factor
signal.data = F.doppler(
data = signal.data,
velocity = velocity,
propagation_speed = self.propagation_speed,
sampling_rate = self.sampling_rate
)
signal.data = signal.data.astype(torchsig_complex_data_type)
# adjust metadata by scaling factor
signal.metadata.center_freq *= alpha
signal.metadata.bandwidth *= alpha
self.update(signal)
return signal
[docs]
class Fading(SignalTransform): # slow, fast, block fading
"""SignalTransform that applies a channel fading model.
Note, currently only performs Rayleigh fading:
A Rayleigh fading channel can be modeled as an FIR filter with Gaussian distributed
taps which vary over time. The length of the filter determines the coherence bandwidth
of the channel and is inversely proportional to the delay spread. The rate at which
the channel taps vary over time is related to the coherence time and this is inversely
proportional to the maximum Doppler spread. This time variance is not included in this model.
Attributes:
coherence_bandwidth (optional): Coherence bandwidth sampling parameters.
Defaults to (0.01, 0.1).
coherence_bandwidth_distribution (Callable[[], float]): Random draw from coherence bandwidth distribution.
power_delay_profile (Tuple | List | np.ndarray, optional): A list of positive values
assigning power to taps of the channel model. When the number of taps exceeds the number
of items in the provided power_delay_profile, the list is linearly interpolated to
provide values for each tap of the channel. Defaults to (1, 1).
"""
[docs]
def __init__(
self,
coherence_bandwidth = (0.01, 0.1),
power_delay_profile: Tuple | List | np.ndarray = (1, 1),
**kwargs
):
super().__init__(**kwargs)
self.coherence_bandwidth = coherence_bandwidth
self.power_delay_profile = np.asarray(power_delay_profile)
self.coherence_bandwidth_distribution = self.get_distribution(self.coherence_bandwidth)
[docs]
def __call__(self, signal: Signal) -> Signal:
coherence_bandwidth = self.coherence_bandwidth_distribution()
signal.data = F.fading(
data = signal.data,
coherence_bandwidth = coherence_bandwidth,
power_delay_profile = self.power_delay_profile,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class IntermodulationProductsSignalTransform(SignalTransform):
"""Applies simulated intermodulation products to a Signal.
Attributes:
model_order (List[int]): The choices model order, 3rd or 5th order. Defaults to [3,5].
coeffs_range (Tuple[float, float]): Range bounds for each intermodulation coefficient.
Defaults to (0., 1.).
"""
[docs]
def __init__(
self,
model_order: List[int] = [3, 5],
coeffs_range: Tuple[float, float] = (1e-7, 1e-5),
**kwargs
):
super().__init__(**kwargs)
self.model_order = model_order
self.model_order_distribution = self.get_distribution(self.model_order)
self.coeffs_range = coeffs_range
self.coeffs_distribution = self.get_distribution(self.coeffs_range,'log10')
[docs]
def __call__(self, signal: Signal) -> Signal:
# get randomized choice for model order
model_order = self.model_order_distribution()
# determine how many non-zero coefficients
num_coefficients = len(np.arange(0,model_order,2))
# pre-allocate with all zeros
non_zero_coeffs = np.zeros(num_coefficients,dtype=torchsig_complex_data_type)
# randomize each coefficient
for index in range(num_coefficients):
if (index == 0):
non_zero_coeffs[index] = 1
else:
# calculate coefficient
non_zero_coeffs[index] = self.coeffs_distribution()
# run loop to ensure each coefficient must be smaller than the previous
while (non_zero_coeffs[index] > non_zero_coeffs[index-1]):
non_zero_coeffs[index] = self.coeffs_distribution()
# form the coeff array with appropriate zero-based weights
coeffs = np.zeros(model_order,dtype=torchsig_complex_data_type)
inner_index = 0
for outer_index in range(model_order):
if (np.mod(outer_index,2) == 0):
coeffs[outer_index] = non_zero_coeffs[inner_index]
inner_index += 1
signal.data = F.intermodulation_products(
data = signal.data,
coeffs = coeffs
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class IQImbalanceSignalTransform(SignalTransform):
"""Applies a set of IQImbalance effects to a Signal: amplitude, phase, and DC offset.
Attributes:
amplitude_imbalance (optional): Range bounds of IQ amplitude imbalance (dB).
amplitude_imbalance_distribution (Callable[[], float]): Random draw from amplitude imbalance distribution.
phase_imbalance (optional): Range bounds of IQ phase imbalance (radians).
phase_imbalance (Callable[[], float]): Random draw from phase imbalance distribution.
dc_offset (Tuple, optional): Range bounds for I and Q component DC offsets.
dc_offset (Callable[[], (float, float)]): Random draw from dc_offset distribution.
"""
[docs]
def __init__(
self,
amplitude_imbalance = (-1., 1.),
phase_imbalance = (-5.0 * np.pi / 180.0, 5.0 * np.pi / 180.0),
dc_offset = ((-0.1, 0.1),(-0.1, 0.1)),
**kwargs
):
super().__init__(**kwargs)
self.amplitude_imbalance = amplitude_imbalance
self.phase_imbalance = phase_imbalance
self.dc_offset = dc_offset
self.amplitude_imbalance_distribution = self.get_distribution(self.amplitude_imbalance)
self.phase_imbalance_distribution = self.get_distribution(self.phase_imbalance)
self.dc_offset_distribution = self.get_distribution(self.dc_offset)
[docs]
def __call__(self, signal: Signal) -> Signal:
amplitude_imbalance = self.amplitude_imbalance_distribution()
phase_imbalance = self.phase_imbalance_distribution()
dc_offset = self.dc_offset_distribution()
signal.data = F.iq_imbalance(signal.data, amplitude_imbalance, phase_imbalance, dc_offset)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class LocalOscillatorFrequencyDriftSignalTransform(SignalTransform):
"""SignalTransform that applies LO frequency drift to Signal IQ data.
Attributes:
drift_ppm_range (Tuple[float, float]): Drift in parts per million (ppm). Default (0.1,1).
drift_ppm_distribution (Callable[[], float]): Random draw from drift_ppm_range distribution.
"""
[docs]
def __init__(
self,
drift_ppm: Tuple[float, float] = (0.1, 1),
**kwargs
):
super().__init__(**kwargs)
self.drift_ppm = drift_ppm
self.drift_ppm_distribution = self.get_distribution(self.drift_ppm,'log10')
[docs]
def __call__(self, signal: Signal) -> Signal:
drift_ppm = self.drift_ppm_distribution()
signal.data = F.local_oscillator_frequency_drift(
data = signal.data,
drift_ppm = drift_ppm,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class LocalOscillatorPhaseNoiseSignalTransform(SignalTransform):
"""SignalTransform that applies LO phase noise to Signal IQ data.
Attributes:
phase_noise_degrees (Tuple[float, float]): Range for phase noise (in degrees). Defaults to (0.25, 1).
phase_noise_degrees_distribution (Callable[[], float]): Random draw from phase_noise_degrees distribution.
"""
[docs]
def __init__(
self,
phase_noise_degrees: Tuple[float, float] = (0.25, 1),
**kwargs
):
super().__init__(**kwargs)
self.phase_noise_degrees = phase_noise_degrees
self.phase_noise_degrees_distribution = self.get_distribution(self.phase_noise_degrees)
[docs]
def __call__(self, signal: Signal) -> Signal:
phase_noise_degrees = self.phase_noise_degrees_distribution()
signal.data = F.phase_noise(
data = signal.data,
phase_noise_degrees = phase_noise_degrees,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class NonlinearAmplifierSignalTransform(SignalTransform):
"""Applies a memoryless nonlinear amplifier model to Signal.
Attributes:
gain_range (Tuple[float, float]): Small-signal gain range (linear). Defaults to (1.0, 4.0).
gain_distribution (Callable[[], float]): Random draw from gain distribution.
psat_backoff_range (Tuple[float, float]): Psat backoff factor (linear) reflecting saturated
power level (Psat) relative to input signal mean power. Defaults to (5.0, 20.0).
past_backoff_distribution (Callable[[], float]): Random draw from psat_backoff distribution.
phi_range (Tuple[float, float]): Maximum signal relative phase shift at
saturation power level (radians). Defaults to (0.0, 0.0).
phi_distribution (Callable[[], float]): Random draw from phi distribution.
auto_scale (bool): Automatically rescale output power to match full-scale peak
input power prior to transform, based on peak estimates. Default True.
"""
[docs]
def __init__(
self,
gain_range: Tuple[float, float] = (1.0, 4.0),
psat_backoff_range: Tuple[float, float] = (5.0, 20.0),
phi_range: Tuple[float, float] = (0.0, 0.0),
auto_scale: bool = True,
**kwargs
):
super().__init__(**kwargs)
self.gain_range = gain_range
self.gain_distribution = self.get_distribution(self.gain_range)
self.psat_backoff_range = psat_backoff_range
self.psat_backoff_distribution = self.get_distribution(self.psat_backoff_range)
self.phi_range = phi_range
self.phi_distribution = self.get_distribution(self.phi_range)
self.auto_scale = auto_scale
[docs]
def __call__(self, signal: Signal) -> Signal:
gain = self.gain_distribution()
psat_backoff = self.psat_backoff_distribution()
phi = self.phi_distribution()
signal.data = F.nonlinear_amplifier(
data = signal.data,
gain = gain,
psat_backoff = psat_backoff,
phi_rad = phi,
auto_scale = self.auto_scale
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class PassbandRippleSignalTransform(SignalTransform):
"""SignalTransform that models analog filter passband ripple for Signal IQ data.
Attributes:
passband_ripple_db (float): Desired passband ripple in dB. Default 1.0 dB.
cutoff (float): Passband cutoff frequency relative to Fs=1.0 sample rate. Default 0.25.
order (int): Desired filter order, which drives number of ripples present within
the passband. Default 5.
numtaps (int): Number of taps in filter. Default 63.
"""
[docs]
def __init__(
self,
passband_ripple_db: float = 1.0,
cutoff: float = 0.25,
order: int = 5,
numtaps: int = 63,
**kwargs
):
super().__init__(**kwargs)
self.passband_ripple_db = passband_ripple_db
self.cutoff = cutoff
self.order = order
self.numtaps = numtaps
# design filter
b, a = sp.signal.cheby1(
self.order,
self.passband_ripple_db,
self.cutoff,
fs=1.0,
btype='low'
)
_, h = sp.signal.dimpulse((b, a, 1/1.0), n=numtaps)
self.fir_coeffs = h[0].squeeze()
[docs]
def __call__(self, signal: Signal) -> Signal:
signal.data = F.passband_ripple(
data = signal.data,
filter_coeffs = self.fir_coeffs,
normalize = True
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class Shadowing(SignalTransform):
"""SignalTransform that applies RF channel shadowing to Signal IQ data. This
slow channel obstruction effect is applied as a block to the whole data.
Attributes:
mean_db_range (Tuple[float, float]): Mean value range in dB. Defaults to (0.0, 4.0).
mean_db_distribution (Callable[[], float]): Random draw from mean_db distribution.
sigma_db_range (Tuple[float, float]): Sigma value range in dB. Defaults to (2.0, 6.0).
sigma_db_distribution (Callable[[], float]): Random draw from sigma_db distribution.
"""
[docs]
def __init__(
self,
mean_db_range: Tuple[float, float] = (0.0, 4.0),
sigma_db_range: Tuple[float, float] = (2.0, 6.0),
**kwargs
):
super().__init__(**kwargs)
self.mean_db_range = mean_db_range
self.mean_db_distribution = self.get_distribution(self.mean_db_range)
self.sigma_db_range = sigma_db_range
self.sigma_db_distribution = self.get_distribution(self.sigma_db_range)
[docs]
def __call__(self, signal: Signal) -> Signal:
mean_db = self.mean_db_distribution()
sigma_db = self.sigma_db_distribution()
signal.data = F.shadowing(
data = signal.data,
mean_db = mean_db,
sigma_db = sigma_db,
rng = self.random_generator
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class QuantizeSignalTransform(SignalTransform):
"""SignalTransform that models Quantization in DAC.
Attributes:
num_bits (float): Range of number of bits in DAC to simulate. Defaults 4 bits to 18 bits.
ref_level_adjustment_db (float): Reference level (in dB) to increase or decrease relative to full scale. Defaults to -10 dB to +3 dB.
"""
[docs]
def __init__(
self,
num_bits: Tuple[int, int] = (6, 18),
ref_level_adjustment_db: Tuple[float, float] = (-10, 3),
**kwargs
):
super().__init__(**kwargs)
self.num_bits = num_bits
self.num_bits_distribution = self.get_distribution(self.num_bits)
self.ref_level_adjustment_db = ref_level_adjustment_db
self.ref_level_adjustment_db_distribution = self.get_distribution(self.ref_level_adjustment_db)
[docs]
def __call__(self, signal: Signal) -> Signal:
num_bits = self.num_bits_distribution()
ref_level_adjustment_db = self.ref_level_adjustment_db_distribution()
# apply quantization
signal.data = F.quantize(
data = signal.data,
num_bits = num_bits,
ref_level_adjustment_db = ref_level_adjustment_db,
)
signal.data = signal.data.astype(torchsig_complex_data_type)
self.update(signal)
return signal
[docs]
class SpectralInversionSignalTransform(SignalTransform):
"""Inverts spectrum of complex IQ data.
"""
[docs]
def __call__(self, signal: Signal) -> Signal:
signal.data = F.spectral_inversion(signal.data)
signal.data = signal.data.astype(torchsig_complex_data_type)
signal.metadata.center_freq *= -1
self.update(signal)
return signal