Source code for torchsig.transforms.functional

"""Functional transforms for reuse and custom fine-grained control
"""
from typing import Literal, Optional, Tuple

# TorchSig
import torchsig.utils.dsp as dsp
from torchsig.utils.dsp import (
    torchsig_complex_data_type,
    torchsig_float_data_type
)

# Third Party
import scipy
from scipy import signal as sp
import numpy as np
from scipy.constants import c

__all__ = [
    "add_slope",
    "additive_noise",
    "adjacent_channel_interference",
    "agc",
    "block_agc",
    "channel_swap",
    "cochannel_interference",
    "complex_to_2d",
    "cut_out",
    "doppler",
    "drop_samples",
    "fading",
    "intermodulation_products",
    "iq_imbalance",
    "local_oscillator_frequency_drift",
    "phase_noise",
    "mag_rescale",
    "nonlinear_amplifier",
    "nonlinear_amplifier_table",
    "normalize",
    "passband_ripple",
    "patch_shuffle",
    "phase_offset",
    "quantize",
    "shadowing",
    "spectral_inversion",
    "spectrogram",
    "spectrogram_drop_samples",
    "time_reversal",
    "time_varying_noise"
]


[docs] def add_slope( data: np.ndarray ) -> np.ndarray: """Add slope between each sample and its preceding sample is added to every sample. Augmentation has the effect of amplifying high frequency component more than lower frequency components. Args: data (np.ndarray): IQ data. Returns: np.ndarray: IQ data with added slope. """ slope = np.diff(data) slope = np.insert(slope, 0, 0) return (data + slope).astype(torchsig_complex_data_type)
[docs] def agc( data: np.ndarray, initial_gain_db: float, alpha_smooth: float, alpha_track: float, alpha_overflow: float, alpha_acquire: float, ref_level_db: float, track_range_db: float, low_level_db: float, high_level_db: float, ) -> np.ndarray: """Automatic Gain Control algorithm (deterministic). Args: data (np.ndarray): IQ data samples. initial_gain_db (float): Inital gain value in dB. alpha_smooth (float): Alpha for avergaing the measure signal level `level_n = level_n * alpha + level_n-1(1-alpha)` alpha_track (float): Amount to adjust gain when in tracking state. alpha_overflow (float): Amount to adjust gain when in overflow state `[level_db + gain_db] >= max_level`. alpha_acquire (float): Amount to adjust gain when in acquire state. ref_level_db (float): Reference level goal for algorithm to achieve, in dB units. track_range_db (float): dB range for operating in tracking state. low_level_db (float): minimum magnitude value (dB) to perform any gain control adjustment. high_level_db (float): magnitude value (dB) to enter overflow state. Returns: np.ndarray: IQ data adjusted sample-by-sample by the AGC algorithm. """ output = np.zeros_like(data) gain_db = initial_gain_db level_db = 0.0 for sample_idx, sample in enumerate(data): if not np.abs(sample): # sample == 0 level_db = -200 elif not sample_idx: # first sample == 0, no smoothing level_db = np.log(np.abs(sample)) else: level_db = level_db * alpha_smooth + np.log(np.abs(sample)) * ( 1 - alpha_smooth ) output_db = level_db + gain_db diff_db = ref_level_db - output_db if level_db <= low_level_db: alpha_adjust = 0.0 elif output_db >= high_level_db: alpha_adjust = alpha_overflow elif abs(diff_db) > track_range_db: alpha_adjust = alpha_acquire else: alpha_adjust = alpha_track gain_db += diff_db * alpha_adjust output[sample_idx] = data[sample_idx] * np.exp(gain_db) return output.astype(torchsig_complex_data_type)
[docs] def additive_noise( data: np.ndarray, power: float = 1.0, color: str = 'white', continuous: bool = True, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray: """Additive complex noise with specified parameters. Args: data (np.ndarray): Complex valued IQ data samples. power (float): Desired noise power (linear, positive). Defaults to 1.0 W (0 dBW). color (str): Noise color, supports 'white', 'pink', or 'red' noise frequency spectrum types. Defaults to 'white'. continuous (bool): Sets noise to continuous (True) or impulsive (False). Defaults to True. rng (np.random.Generator, optional): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: Data with complex noise samples with specified power added. """ N = len(data) noise_samples = dsp.noise_generator(N, power, color, continuous, rng) return (data + noise_samples).astype(torchsig_complex_data_type)
[docs] def adjacent_channel_interference( data: np.ndarray, sample_rate: float = 4.0, power: float = 1.0, center_frequency: float = 0.2, filter_weights: np.ndarray = dsp.low_pass(0.25, 0.25, 4.0), phase_sigma: float = 1.0, time_sigma: float = 0.0, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray: """Adds adjacent channel interference to the baseband data at a specified center frequency and power level. The adjacent channel signal is a filtered, frequency-offset, randomly block time-shifted, randomly phase-perturbed baseband copy that has similar bandwidth and modulation properties, but degrades phase and time coherence with the original baseband signal. Args: data (np.ndarray): Complex valued IQ data samples. sample_rate (float): Sampling rate (Fs). Default 4.0 power (float): Adjacent interference signal power (linear, positive). Default 1.0 W (0 dBW). center_frequency (float): Adjacent interference signal center frequency (normalized relative to Fs). Default 0.2. filter_weights (np.ndarray): Lowpass filter weights applied to baseband signal data to band limit prior to creating adjacent signal. Default low_pass(0.25,0.25,4.0). phase_sigma (float): Standard deviation of Gaussian phase noise. Default 1.0. time_sigma (float): Standard deviation of Gaussian block time shift in samples. Default 0.0. rng (np.random.Generator, optional): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: Data with added adjacent interference. """ N = len(data) t = np.arange(N) / sample_rate data_filtered = np.convolve(data, filter_weights)[-N:] # band limit original data (maintain data size) phase_noise = rng.normal(0, phase_sigma, N) # Gaussian phase noise interference = data_filtered * np.exp(1j*(2*np.pi*center_frequency*t + phase_noise)) # note: does not check aliasing time_shift = int(np.round(rng.normal(0, time_sigma, 1))[0]) # Gaussian block time shift for data (nearest sample) if time_shift > 0: # time shift with zero fill; # note: may produce discontinuities interference = np.roll(interference, time_shift) interference[0:time_shift] = 0 + 1j*0 elif time_shift < 0: interference = np.roll(interference, time_shift) interference[time_shift:0] = 0 + 1j*0 # set interference power est_power = np.sum(np.abs(interference)**2)/len(interference) interference = np.sqrt(power / est_power) * interference return (data + interference).astype(torchsig_complex_data_type)
[docs] def awgn(data: np.ndarray, noise_power_db: float, rng: Optional[np.random.Generator] = None ) -> np.ndarray: """Adds zero-mean complex additive white Gaussian noise with power of noise_power_db. Args: data: (:class:`numpy.ndarray`): (batch_size, vector_length, ...)-sized data. noise_power_db (:obj:`float`): Defined as 10*log10(E[|n|^2]). random_generator (Optional[np.random.Generator], optional): Random Generator to use. Defaults to None (new generator created internally). Returns: np.ndarray: Data with added noise. """ rng = rng if rng else np.random.default_rng() real_noise = rng.standard_normal(*data.shape) imag_noise = rng.standard_normal(*data.shape) return (data + (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2)).astype(torchsig_complex_data_type)
# TODO: block_agc is very similar to mag_rescale, and # does not actually perform any AGC - consider consolidating
[docs] def block_agc( data: np.ndarray, gain_change_db: float, start_idx: int ) -> np.ndarray: """Implements a large instantaneous jump in receiver gain. Args: data (np.ndarray): IQ data. gain_change_db (float): Gain value to change in dB. start_idx (np.ndarray): Start index for IQ data. Returns: np.ndarray: IQ data with Block AGC applied. """ # convert db to linear units gain_change_linear = 10**(gain_change_db/10) data[start_idx:] *= gain_change_linear return data.astype(torchsig_complex_data_type)
[docs] def channel_swap( data: np.ndarray ) -> np.ndarray: """Swap I and Q channels of IQ data. Args: data (np.ndarray): IQ data. Returns: np.ndarray: IQ data with channels swapped. """ real_component = data.real imag_component = data.imag new_data = np.empty(data.shape, dtype=data.dtype) new_data.real = imag_component new_data.imag = real_component return new_data.astype(torchsig_complex_data_type)
[docs] def cochannel_interference( data: np.ndarray, power: float = 1.0, filter_weights: np.ndarray = dsp.low_pass(0.25, 0.25, 4.0), color: str = 'white', continuous: bool = True, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray: """Applies uncorrelated co-channel interference to the baseband data, modeled as shaped noise with specified parameters. Args: data (np.ndarray): Complex valued IQ data samples. power (float): Interference power (linear, positive). Default 1.0 W (0 dBW). filter_weights: Lowpass interference shaping filter weights. Default low_pass(0.25, 0.25, 4.0). color (str): Base noise color, supports 'white', 'pink', or 'red' noise frequency spectrum types. Default 'white'. continuous (bool): Sets noise to continuous (True) or impulsive (False). Default True. rng (np.random.Generator, optional): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: Data with added uncorrelated co-channel interference. """ N = len(data) noise_samples = dsp.noise_generator(N, power, color, continuous, rng) shaped_noise = np.convolve(noise_samples, filter_weights)[-N:] # correct shaped noise power (do not assume filter is prescaled) est_power = np.sum(np.abs(shaped_noise)**2)/len(shaped_noise) interference = np.sqrt(power / est_power) * shaped_noise return (data + interference).astype(torchsig_complex_data_type)
[docs] def complex_to_2d(data: np.ndarray) -> np.ndarray: """Converts IQ data to two channels (real and imaginary parts). """ return np.stack([data.real, data.imag], axis=0)
[docs] def cut_out( data: np.ndarray, cut_start: float, cut_duration: float, cut_type: str, rng: Optional[np.random.Generator] = None ) -> np.ndarray: """Performs CutOut: replacing values with fill. Args: data (np.ndarray): IQ data cut_start (float): Normalized start of cut region [0.0, 1.0) cut_duration (float): Normalized duration of cut region (0.0, 1.0) cut_type (str): Type of data to fill cut region. * zeros * ones * low_noise * avg_noise * high_noise Raises: ValueError: Invalid `cut_type`. Returns: np.ndarray: CutOut IQ data. """ rng = rng if rng else np.random.default_rng() num_iq_samples = data.shape[0] cut_start = int(cut_start * num_iq_samples) # Create cut mask cut_mask_length = int(num_iq_samples * cut_duration) if cut_mask_length + cut_start > num_iq_samples: cut_mask_length = num_iq_samples - cut_start if cut_type == "zeros": cut_mask = np.zeros(cut_mask_length, dtype=np.complex64) elif cut_type == "ones": cut_mask = np.ones(cut_mask_length) + 1j * np.ones(cut_mask_length) elif cut_type == "low_noise": real_noise = rng.standard_normal(cut_mask_length) imag_noise = rng.standard_normal(cut_mask_length) noise_power_db = -100 cut_mask = ( (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) ) elif cut_type == "avg_noise": real_noise = rng.standard_normal(cut_mask_length) imag_noise = rng.standard_normal(cut_mask_length) avg_power = np.mean(np.abs(data) ** 2) cut_mask = avg_power * (real_noise + 1j * imag_noise) / np.sqrt(2) elif cut_type == "high_noise": real_noise = rng.standard_normal(cut_mask_length) imag_noise = rng.standard_normal(cut_mask_length) noise_power_db = 40 cut_mask = ( (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) ) else: raise ValueError(f"cut_type must be: zeros, ones, low_noise, avg_noise, or high_noise. Found: {cut_type}") # Insert cut mask into data data[cut_start : cut_start + cut_mask_length] = cut_mask return data.astype(torchsig_complex_data_type)
# TODO: improved time-scaling interpolator
[docs] def doppler( data: np.ndarray, velocity: float = 1e1, propagation_speed: float = c, sampling_rate: float = 1.0 ) -> np.ndarray: """Applies wideband Doppler effect through time scaling. Args: data (np.ndarray): Complex valued IQ data samples. velocity (float): Relative velocity in m/s (positive = approaching). Default 10 m/s. propagation_speed (float): Wave speed in medium. Default 2.9979e8 m/s. sampling_rate (float): Data sampling rate. Default 1.0. Returns: np.ndarray: Data with wideband Doppler. """ N = data.size # time scaling factor alpha = propagation_speed / (propagation_speed - velocity) # original and scaled signal sample times t_orig = np.arange(N) / sampling_rate t_new = t_orig * alpha # prevent extrapolation beyond original signal duration t_new = np.clip(t_new, 0, t_orig[-1]) # numpy default interpolator interp_real = np.interp(t_new, t_orig, data.real) interp_imag = np.interp(t_new, t_orig, data.imag) data = interp_real + 1j*interp_imag return (data).astype(torchsig_complex_data_type)
[docs] def drop_samples( data: np.ndarray, drop_starts: np.ndarray, drop_sizes: np.ndarray, fill: str, ) -> np.ndarray: """Drop samples at given locations/durations with fill technique. Supported Fill Techniques: ffill: Forward Fill. Use value at sample one before start. bfill: Backwards Fill. Use value at sample one after end. mean: Mean Fill. Use data mean. zero: Zero Fill. Use 0. Args: data (np.ndarray): IQ data. drop_starts (np.ndarray): Start indicies of drops. drop_sizes (np.ndarray): Durations for each start index. fill (str): Drop sample replacement method. Raises: ValueError: Invalid fill type. Returns: np.ndarray: data array with fill values during drops. """ for idx, start in enumerate(drop_starts): stop = start + drop_sizes[idx] if fill == "ffill": drop_region = np.full( drop_sizes[idx], data[start - 1], dtype=np.complex128 ) elif fill == "bfill": drop_region = np.full( drop_sizes[idx], data[stop], dtype=np.complex128, ) elif fill == "mean": drop_region = np.full( drop_sizes[idx], np.mean(data), dtype=np.complex128 ) elif fill == "zero": drop_region = np.zeros( drop_sizes[idx], dtype=np.complex128 ) else: raise ValueError(f"{fill} fill type unsupported. Must be ffill, bfill, mean, or zero.") data[start:stop] = drop_region return data.astype(torchsig_complex_data_type)
[docs] def fading( data: np.ndarray, coherence_bandwidth: float, power_delay_profile: np.ndarray, rng: np.random.Generator ) -> np.ndarray: """Apply fading channel to signal. Currently only does Rayleigh fading. Taps are generated by interpolating and filtering Gaussian taps. TODO: implement other types of fading. Args: data (np.ndarray): IQ data. coherence_bandwidth (float): coherence bandwidth relative to sample rate [0, 1.0]. power_delay_profile (np.ndarray): power delay profile assign to channel. rng (Optional[np.random.Generator], optional): Random Generator to use. Defaults to None (new generator created internally). Returns: np.ndarray: IQ data with fading applied. """ rng = rng if rng else np.random.default_rng() num_taps = int( np.ceil(1.0 / coherence_bandwidth) ) # filter length to get desired coherence bandwidth power_taps = np.sqrt( np.interp( np.linspace(0, 1.0, 100 * num_taps), np.linspace(0, 1.0, len(power_delay_profile)), power_delay_profile, ) ) # Generate initial taps rayleigh_taps = rng.standard_normal(num_taps) + 1j * rng.standard_normal(num_taps) # multi-path channel # Linear interpolate taps by a factor of 100 -- so we can get accurate coherence bandwidths old_time = np.linspace(0, 1.0, num_taps, endpoint=True) real_tap_function = scipy.interpolate.interp1d(old_time, rayleigh_taps.real) imag_tap_function = scipy.interpolate.interp1d(old_time, rayleigh_taps.imag) new_time = np.linspace(0, 1.0, 100 * num_taps, endpoint=True) rayleigh_taps = real_tap_function(new_time) + 1j * imag_tap_function(new_time) rayleigh_taps *= power_taps # Ensure that we maintain the same amount of power before and after the transform input_power = np.linalg.norm(data) data = sp.upfirdn(rayleigh_taps, data, up=100, down=100)[-data.shape[0] :] output_power = np.linalg.norm(data) data = np.multiply(input_power / output_power, data).astype(np.complex64) return data.astype(torchsig_complex_data_type)
[docs] def intermodulation_products( data: np.ndarray, coeffs: np.ndarray = np.array([1.0, 0.0, 1.0]) ) -> np.ndarray: """Pass IQ data through an optimized memoryless nonlinear response model that creates local intermodulation distortion (IMD) products. Note that since only odd-order IMD products effectively fall in spectrum near the first-order (original) signal, only these are calculated. Args: data (np.ndarray): Complex valued IQ data samples. coeffs (np.ndarray): coefficients of memoryless IMD response such that y(t) = coeffs[0]*x(t) + coeffs[1]*(x(t)**2) + coeffs[2]*(x(t)**3) + ... Defaults to a third-order model: np.array([1.0, 1.0, 1.0]). Returns: np.ndarray: IQ data with local IMD products. """ if (coeffs.size == 0): raise IndexError('Coeffs has length zero.') model_order = coeffs.size distorted_data = np.zeros(len(data),dtype=torchsig_complex_data_type) # only odd-order distortion products are relevant local contributors for i in range(0, model_order, 1): if (i > 0 and np.mod(i,2) == 1 and coeffs[i] != 0.0): raise ValueError('Even-order coefficients must be zero.') i_order_distortion = (np.abs(data) ** (i)) * data distorted_data += coeffs[i] * i_order_distortion # compute the change in spectral magnitudes in order to maintain the same SNR # on the other side of the transform win = sp.windows.blackmanharris(len(data)) input_power = np.max(np.abs(np.fft.fft(data*win))) output_power = np.max(np.abs(np.fft.fft(distorted_data*win))) distorted_data *= input_power/output_power return distorted_data.astype(torchsig_complex_data_type)
[docs] def iq_imbalance( data: np.ndarray, amplitude_imbalance: float, phase_imbalance: float, dc_offset: Tuple[float, float] ) -> np.ndarray: """Applies IQ imbalance to IQ data. Args: data (np.ndarray): IQ data. amplitude_imbalance (float): IQ amplitude imbalance in dB. phase_imbalance (float): IQ phase imbalance in radians [-pi, pi]. dc_offset (Tuple[float, float]): IQ DC (linear) offsets (In-Phase, Quadrature). Returns: np.ndarray: IQ data with IQ Imbalance applied. """ # amplitude imbalance data = 10 ** (amplitude_imbalance / 10.0) * np.real(data) + 1j * 10 ** (amplitude_imbalance / 10.0) * np.imag(data) # phase imbalance data = np.exp(-1j * phase_imbalance / 2.0) * np.real(data) + np.exp(1j * (np.pi / 2.0 + phase_imbalance / 2.0)) * np.imag(data) # DC offset data = (dc_offset[0] + np.real(data)) + 1j * (dc_offset[1] + np.imag(data)) return data.astype(torchsig_complex_data_type)
[docs] def local_oscillator_frequency_drift( data: np.ndarray, drift_ppm: float = 1, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray: """Mixes data with a frequency drifting Local Oscillator (LO), with drift modeled as a random walk. Args: data (np.ndarray): Complex valued IQ data samples. drift_ppm(float): Drift in parts per million (ppm). Default 1. rng (np.random.Generator): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: Data with LO drift applied. """ rng = rng if rng else np.random.default_rng() N = data.size # convert drift PPM units. typically the PPM is in # reference to a 10 MHz oscillator, but we allow for # user-input arbitrary sample rates and assuming a # 10 MHz reference may produce problems when the # sample rate <= 10 MHz. so use PPM in a normalized value # in that it gets it "close" although not perfectly # emulating receiver. drift = drift_ppm * 10e-6 # generate a random phase with appropriate standard deviation random_phase = rng.normal(0,drift,N) # accumulate the phase into a frequency frequency = np.cumsum(random_phase) # frequency drift effect now contained within the complex sinusoid drift_effect = np.exp(2j * np.pi * frequency) # apply frequency drift effect data = data * drift_effect return data.astype(torchsig_complex_data_type)
[docs] def phase_noise( data: np.ndarray, phase_noise_degrees: float = 1.0, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray: """Mixes data with a Local Oscillator (LO) with phase noise modeled as a Gaussian RV. Args: data (np.ndarray): Complex valued IQ data samples. phase_noise_degrees (float): Phase noise in degrees. Used as standard deviation for Gaussian distribution. Defaults to 1.0. rng (np.random.Generator, optional): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: Data mixed with noisy LO. """ rng = rng if rng else np.random.default_rng() N = data.size # generate phase noise with given standard deviation phase_noise_degrees_array = rng.normal(0,phase_noise_degrees,N) # convert to radians phase_noise_radians_array = phase_noise_degrees_array * np.pi / 180 # phase noise effect contained with a complex sinusoid phase_noise_effect = np.exp(1j*phase_noise_radians_array) # apply phase noise effect data = data * phase_noise_effect return data.astype(torchsig_complex_data_type)
[docs] def mag_rescale( data: np.ndarray, start: float | int, scale: float ) -> np.ndarray: """Apply rescaling of input `rescale` starting at time `start`. Args: data (np.ndarray): IQ data. start (float | int): Start time of rescaling. * If int, treated as array index. * If float, treated as normalized start time. scale (float): Scaling factor. Returns: np.ndarray: data rescaled. """ if isinstance(start, float): start = int(data.shape[0] * start) data[start:] *= scale return data.astype(torchsig_complex_data_type)
[docs] def nonlinear_amplifier( data: np.ndarray, gain: float = 1.0, psat_backoff: float = 10.0, phi_rad: float = 0.0, auto_scale: bool = True ) -> np.ndarray: """A memoryless AM/AM, AM/PM nonlinear amplifier function-based model using a hyperbolic tangent output power response defined by gain and saturation power. Args: data (np.ndarray): Complex valued IQ data samples. gain (float): Small-signal linear gain. Default 1.0. psat_backoff (float): Saturated output power factor relative to the input signal mean power. For example, operating at a 2.0 psat_backoff factor with a 1 W mean power signal has saturation power level at 2.0 W. Default 10.0. phi_rad (float): Signal relative phase shift at saturation (radians). Modeled to vary linearly from (0.0 rad, 0.0 power). Default 0.0 rad. auto_scale (bool): Automatically rescale output power to match full-scale peak input power prior to transform, based on peak estimates. Default True. Returns: np.ndarray: Nonlinearly distorted IQ data. """ N = len(data) magnitude = np.abs(data) phase = np.angle(data) in_power = magnitude**2 mean_power_est = np.mean(in_power) # amplitude-to-amplitude modulation (AM/AM) # hyperbolic tangent power response passes # through (0,0) and asymptotically approaches psat psat = mean_power_est * psat_backoff scale_factor = psat / gain out_power = psat * np.tanh(in_power / scale_factor) out_magnitude = out_power**0.5 # amplitude-to-phase modulation (AM/PM) # linear phase shift from origin to Psat out_phase_shift_rad = np.zeros((N,)) if phi_rad != 0.: Pin = np.array([0.0, psat]) Phi = np.array([0.0, phi_rad]) out_phase_shift_rad = np.interp(in_power, Pin, Phi) amp_data = out_magnitude * np.exp(1j * (phase + out_phase_shift_rad)) # auto_scale: rescale output power to match full-scale input power # by estimating peaks for input and output power if auto_scale: win = sp.windows.blackmanharris(N) input_power = np.max(np.abs(np.fft.fft(data*win))) output_power = np.max(np.abs(np.fft.fft(amp_data*win))) amp_data *= input_power/output_power return amp_data.astype(torchsig_complex_data_type)
[docs] def nonlinear_amplifier_table( data: np.ndarray, Pin: np.ndarray = 10**((np.array([-100., -20., -10., 0., 5., 10. ]) / 10)), Pout: np.ndarray = 10**((np.array([ -90., -10., 0., 9., 9.9, 10. ]) / 10)), Phi: np.ndarray = np.deg2rad(np.array([0., -2., -4., 7., 12., 23.])), auto_scale: bool = False ) -> np.ndarray: """A nonlinear amplifier (AM/AM, AM/PM) memoryless model that distorts an input complex signal to simulate an amplifier response, based on interpolating a table of provided power input, power output, and phase change data points. Default very small model parameters depict a 10 dB gain amplifier with P1dB = 9.0 dBW. Pin = 10**((np.array([-100., -20., -10., 0., 5., 10. ]) / 10)) Pout = 10**((np.array([ -90., -10., 0., 9., 9.9, 10. ]) / 10)) Phi = np.deg2rad(np.array([0., -2., -4., 7., 12., 23.])) Args: data (np.ndarray): Complex valued IQ data samples. Pin (np.ndarray): Model signal power input points. Assumes sorted ascending linear values (Watts). Pout (np.ndarray): Model power out corresponding to Pin points (Watts). Phi (np.ndarray): Model output phase shift values (radians) corresponding to Pin points. auto_scale (bool): Automatically rescale output power to match full-scale peak input power prior to transform, based on peak estimates. Default False. Raises: ValueError: If model array arguments are not the same size. Returns: np.ndarray: Nonlinearly distorted IQ data. """ if (len(Pin) != len(Pout)) or (len(Pin) != len(Phi)): raise ValueError('Model array arguments are not the same size.') magnitude = np.abs(data) phase = np.angle(data) # amplitude-to-amplitude modulation (AM/AM) in_power = magnitude**2 out_power = np.interp(in_power, Pin, Pout) out_magnitude = out_power**0.5 # amplitude-to-phase modulation (AM/PM) out_phase_shift_rad = np.interp(in_power, Pin, Phi) amp_data = out_magnitude * np.exp(1j * (phase + out_phase_shift_rad)) # auto_scale: rescale output power to match full-scale input power # by estimating peaks for input and output power if auto_scale: win = sp.windows.blackmanharris(len(data)) input_power = np.max(np.abs(np.fft.fft(data*win))) output_power = np.max(np.abs(np.fft.fft(amp_data*win))) amp_data *= input_power/output_power return amp_data.astype(torchsig_complex_data_type)
[docs] def normalize( data: np.ndarray, norm_order: Optional[float | int | Literal["fro", "nuc"]] = 2, flatten: bool = False, ) -> np.ndarray: """Scale data so that a specfied norm computes to 1. For detailed information, see :func:`numpy.linalg.norm.` * For norm=1, norm = max(sum(abs(x), axis=0)) (sum of the elements) * for norm=2, norm = sqrt(sum(abs(x)^2), axis=0) (square-root of the sum of squares) * for norm=np.inf, norm = max(sum(abs(x), axis=1)) (largest absolute value) Args: data (np.ndarray): (batch_size, vector_length, ...)-sized data to be normalized. norm_order (int): norm order to be passed to np.linalg.norm flatten (bool): boolean specifying if the input array's norm should be calculated on the flattened representation of the input data Returns: np.ndarray: Normalized complex array data. """ if flatten: flat_data = data.reshape(data.size) norm = np.linalg.norm(flat_data, norm_order, keepdims=True) return np.multiply(data, 1.0 / norm) norm = np.linalg.norm(data, norm_order, keepdims=True) return np.multiply(data, 1.0 / norm).astype(torchsig_complex_data_type)
[docs] def passband_ripple( data: np.ndarray, filter_coeffs: np.ndarray, normalize: bool = False ) -> np.ndarray: """Functional for passband ripple transforms. Args: data (np.ndarray): Complex valued IQ data samples. filter_coeffs (np.ndarray): FIR filter coeffecients with desired ripple characteristics. normalize (bool): Normalize filter coefficients for energy preservation. Default False. Returns: np.ndarray: Filtered data. """ N = len(data) if normalize: # filter energy normalization energy = np.sum(np.abs(filter_coeffs)**2) filter_coeffs = filter_coeffs / np.sqrt(energy) data = np.convolve(data, filter_coeffs) #data = data[-N:] # retain data size return data.astype(torchsig_complex_data_type)
[docs] def patch_shuffle( data: np.ndarray, patch_size: int, patches_to_shuffle: np.ndarray, rng: Optional[np.random.Generator] = None ) -> np.ndarray: """Apply shuffling of patches specified by `num_patches`. Args: data: (np.ndarray): (batch_size, vector_length, ...)-sized data. patch_size (int): Size of each patch to shuffle. patches_to_shuffle (np.ndarray): Index of each patch of size patch_size to shuffle. random_generator (Optional[np.random.Generator], optional): Random Generator to use. Defaults to None (new generator created internally). Returns: np.ndarray: Data that has undergone patch shuffling. """ rng = rng if rng else np.random.default_rng() for patch_idx in patches_to_shuffle: patch_start = int(patch_idx * patch_size) patch = data[patch_start : patch_start + patch_size] rng.shuffle(patch) data[patch_start : patch_start + patch_size] = patch return data.astype(torchsig_complex_data_type)
[docs] def phase_offset( data: np.ndarray, phase: float ) -> np.ndarray: """Applies a phase rotation to data. Args: data (np.ndarray): IQ data. phase (float): phase to rotate sample in [-pi, pi]. Returns: np.ndarray: Data that has undergone a phase rotation. """ return (data * np.exp(1j * phase)).astype(torchsig_complex_data_type)
[docs] def quantize( data: np.ndarray, num_bits: int, ref_level_adjustment_db: float = 0.0 ) -> np.ndarray: """Quantize input to number of levels specified. Default implementation is ceiling. Args: data (np.ndarray): IQ data. num_bits (int): Number of bits to simulate ref_level_adjustment_db (float): Changes the relative scaling of the input. For example, ref_level_adjustment_db = 3.0, the average power is now 3 dB *above* full scale and into saturation. For ref_level_adjustment_db = -3.0, the average power is now 3 dB *below* full scale and simulates a loss of dynamic range. Default is 0. Raises: ValueError: Invalid round type. Returns: np.ndarray: Quantized IQ data. """ # calculate number of levels num_levels = int(2**num_bits) # establish quantization levels quant_levels = np.arange(-num_levels//2,num_levels//2) / (num_levels//2) # the distance between two quantization levels quant_level_distance = quant_levels[1]-quant_levels[0] # determine maximum value of signal amplitude max_value_signal_real = np.max(np.abs(data.real)) max_value_signal_imag = np.max(np.abs(data.imag)) max_value_signal = np.max((max_value_signal_real,max_value_signal_imag)) # convert the reference level adjustment into a linear value. # +3 dB -> 3 dB above max scaling (saturation) # -3 dB -> 3 dB below max scaling (dynamic range loss) ref_level_adjustment_linear = 10**(ref_level_adjustment_db/10) # scale the input signal input_signal_scaled = data * ref_level_adjustment_linear / max_value_signal # quantize real and imag seperately quant_signal_real = np.zeros(len(data),dtype=torchsig_float_data_type) quant_signal_imag = np.zeros(len(data),dtype=torchsig_float_data_type) input_signal_scaled_real = input_signal_scaled.real input_signal_scaled_imag = input_signal_scaled.imag # check for saturated values minimum real_saturation_neg_index = np.where(input_signal_scaled_real <= quant_levels[0])[0] imag_saturation_neg_index = np.where(input_signal_scaled_imag <= quant_levels[0])[0] quant_signal_real[real_saturation_neg_index] = quant_levels[0] quant_signal_imag[imag_saturation_neg_index] = quant_levels[0] # check for saturated values maximum real_saturation_pos_index = np.where(input_signal_scaled_real >= quant_levels[-1])[0] imag_saturation_pos_index = np.where(input_signal_scaled_imag >= quant_levels[-1])[0] quant_signal_real[real_saturation_pos_index] = quant_levels[-1] quant_signal_imag[imag_saturation_pos_index] = quant_levels[-1] # calculate which remaining indicies have not yet been quantized all_index = np.arange(0,len(data)) remaining_index = np.setdiff1d(all_index, real_saturation_neg_index) remaining_index = np.setdiff1d(remaining_index, imag_saturation_neg_index) remaining_index = np.setdiff1d(remaining_index, real_saturation_pos_index) remaining_index = np.setdiff1d(remaining_index, imag_saturation_pos_index) # quantize all other levels. by default implements "ceiling" real_index_subset = np.digitize( input_signal_scaled_real[remaining_index], quant_levels) imag_index_subset = np.digitize( input_signal_scaled_imag[remaining_index], quant_levels) quant_signal_real[remaining_index] = quant_levels[real_index_subset] quant_signal_imag[remaining_index] = quant_levels[imag_index_subset] # form the quantized IQ samples quantized_data = quant_signal_real + 1j*quant_signal_imag # undo quantization-based scaling data_unscaled = quantized_data * max_value_signal / ref_level_adjustment_linear return data_unscaled.astype(torchsig_complex_data_type)
[docs] def shadowing( data: np.ndarray, mean_db: float = 4.0, sigma_db: float = 2.0, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray: """Applies RF shadowing to the data, assuming the channel obstructions' loss are lognormal. Refer to T.S. Rappaport, Wireless Communications, Prentice Hall, 2002. Args: data (np.ndarray): Complex valued IQ data samples. mu_db (float): Mean value of shadowing in dB. Default 4.0. sigma_db (float): Shadowing standard deviation. Default 2.0. rng (np.random.Generator, optional): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: Data with shadowing. """ rng = rng if rng else np.random.default_rng() power_db = rng.normal(mean_db, sigma_db) # normal distribution in log domain data = data * 10 ** (power_db / 20) return data.astype(torchsig_complex_data_type)
[docs] def spectral_inversion( data: np.ndarray ) -> np.ndarray: """Applies a spectral inversion to input data. Args: data (np.ndarray): IQ data. Returns: np.ndarray: Spectrally inverted data. """ data.imag *= -1 return data
[docs] def spectrogram( data: np.ndarray, fft_size: int, fft_stride: int ) -> np.ndarray: """Computes spectrogram from IQ data. Directly uses `compute_spectrogram` inside of utils/dsp.py. Args: data (np.ndarray): IQ samples. fft_size (int): The FFT size (number of bins) in the spectrogram. fft_stride (int): The number of data points to move or "hop" over when computing the next FFT. rng (np.random.Generator): Optional random generator. Returns: np.ndarray: Spectrogram computed from IQ data. """ return dsp.compute_spectrogram( data, fft_size=fft_size, fft_stride=fft_stride )
# TODO this function needs clean up
[docs] def spectrogram_drop_samples( data: np.ndarray, drop_starts: np.ndarray, drop_sizes: np.ndarray, fill: str ) -> np.ndarray: """Drop samples at given locations/durations with fill technique. Supported Fill Techniques: ffill: Forward Fill. Use value at sample one before start. bfill: Backwards Fill. Use value at sample one after end. mean: Mean Fill. Use data mean. zero: Zero Fill. Use 0. min: Minimum observed value fill. max: Maximum observed value fill low: Fixed low value fill. Use np.ones * 1e-3. ones: Ones fill. Use np.ones. Args: data (np.ndarray): IQ data. drop_starts (np.ndarray): Start indicies of drops. drop_sizes (np.ndarray): Durations for each start index. fill (str): Drop sample replacement method. Raises: ValueError: Invalid fill type. Returns: np.ndarray: data array with fill values during drops. """ flat_spec = data.reshape(data.shape[0], data.shape[1] * data.shape[2]) for idx, drop_start in enumerate(drop_starts): if fill == "ffill": drop_region_real = np.ones(drop_sizes[idx]) * flat_spec[0, drop_start - 1] drop_region_complex = ( np.ones(drop_sizes[idx]) * flat_spec[1, drop_start - 1] ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real flat_spec[ 1, drop_start : drop_start + drop_sizes[idx] ] = drop_region_complex elif fill == "bfill": drop_region_real = ( np.ones(drop_sizes[idx]) * flat_spec[0, drop_start + drop_sizes[idx]] ) drop_region_complex = ( np.ones(drop_sizes[idx]) * flat_spec[1, drop_start + drop_sizes[idx]] ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real flat_spec[ 1, drop_start : drop_start + drop_sizes[idx] ] = drop_region_complex elif fill == "mean": drop_region_real = np.ones(drop_sizes[idx]) * np.mean(flat_spec[0]) drop_region_complex = np.ones(drop_sizes[idx]) * np.mean(flat_spec[1]) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real flat_spec[ 1, drop_start : drop_start + drop_sizes[idx] ] = drop_region_complex elif fill == "zero": drop_region = np.zeros(drop_sizes[idx]) flat_spec[:, drop_start : drop_start + drop_sizes[idx]] = drop_region elif fill == "min": drop_region_real = np.ones(drop_sizes[idx]) * np.min(np.abs(flat_spec[0])) drop_region_complex = np.ones(drop_sizes[idx]) * np.min( np.abs(flat_spec[1]) ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real flat_spec[ 1, drop_start : drop_start + drop_sizes[idx] ] = drop_region_complex elif fill == "max": drop_region_real = np.ones(drop_sizes[idx]) * np.max(np.abs(flat_spec[0])) drop_region_complex = np.ones(drop_sizes[idx]) * np.max( np.abs(flat_spec[1]) ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real flat_spec[ 1, drop_start : drop_start + drop_sizes[idx] ] = drop_region_complex elif fill == "low": drop_region = np.ones(drop_sizes[idx]) * 1e-3 flat_spec[:, drop_start : drop_start + drop_sizes[idx]] = drop_region elif fill == "ones": drop_region = np.ones(drop_sizes[idx]) flat_spec[:, drop_start : drop_start + drop_sizes[idx]] = drop_region else: raise ValueError(f"fill expects ffill, bfill, mean, zero, min, max, low, ones. Found {fill}") new_data = flat_spec.reshape(data.shape[0], data.shape[1], data.shape[2]) return new_data
[docs] def time_reversal( data: np.ndarray ) -> np.ndarray: """Applies time reversal to data (flips horizontally). Args: data (np.ndarray): IQ data. Returns: np.ndarray: Time flipped IQ data. """ return np.flip(data, axis=0).astype(torchsig_complex_data_type)
[docs] def time_varying_noise( data: np.ndarray, noise_power_low: float, noise_power_high: float, inflections: int, random_regions: bool, rng: np.random.Generator = np.random.default_rng(seed=None) ) -> np.ndarray : """Adds time-varying complex additive white Gaussian noise with power levels in range (`noise_power_low`, `noise_power_high`) dB and with `inflections` number of inflection points spread over the input iq data randomly if `random_regions` is True or evenly spread if False. Args: data (np.ndarray): IQ data. noise_power_low (float): Minimum noise power in dB. noise_power_high (float): Maximum noise power in dB. inflections (int): Number of inflection points over IQ data. random_regions (bool): Inflections points spread randomly (True) or not (False). rng (np.random.Generator, optional): Random number generator. Defaults to np.random.default_rng(seed=None). Returns: np.ndarray: IQ data with time-varying noise. """ real_noise = rng.standard_normal(*data.shape) imag_noise = rng.standard_normal(*data.shape) noise_power = np.zeros(data.shape) if not inflections: #inflections == 0: inflection_indices = np.array([0, data.shape[0]]) else: if random_regions: inflection_indices = np.sort( rng.choice(data.shape[0], size=inflections, replace=False) ) inflection_indices = np.append(inflection_indices, data.shape[0]) inflection_indices = np.insert(inflection_indices, 0, 0) else: inflection_indices = np.arange(inflections + 2) * int( data.shape[0] / (inflections + 1) ) for idx in range(len(inflection_indices) - 1): start_idx = inflection_indices[idx] stop_idx = inflection_indices[idx + 1] duration = stop_idx - start_idx start_power = noise_power_low if not idx % 2 else noise_power_high #idx % 2 == 0 stop_power = noise_power_high if not idx % 2 else noise_power_low noise_power[start_idx:stop_idx] = np.linspace( start_power, stop_power, duration ) return ( data + (10.0 ** (noise_power / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) ).astype(torchsig_complex_data_type)