torchsig.datasets.datamodules.TorchSigDataModule¶
- class torchsig.datasets.datamodules.TorchSigDataModule(root: str, metadata, dataset_size: int, dataset_splits: list[float] | list[int] = [0.7, 0.2, 0.1], batch_size: int = 1, num_workers: int = 1, collate_fn: callable | None = None, create_batch_size: int = 8, create_num_workers: int = 4, file_writer: BaseFileHandler = <class 'torchsig.utils.file_handlers.hdf5.HDF5Writer'>, file_reader: BaseFileHandler = <class 'torchsig.utils.file_handlers.hdf5.HDF5Reader'>, overwrite: bool = False, impairment_level: int = 0, transforms=[], target_labels: list[str] | None = None, seed: int | None = None)[source]¶
Bases:
LightningDataModulePyTorch Lightning DataModule for creating and loading TorchSig datasets.
- This DataModule handles:
Dataset creation or loading from disk via a file handler.
Splitting into train/val/test subsets.
Batching, collation, and worker seeding for training.
- root¶
Directory where datasets are stored or created.
- dataset_size¶
Total number of samples in the dataset.
- dataset_splits¶
Fractions or counts for train/val/test splits.
- dataset_metadata¶
Metadata describing the dataset.
- impairment_level¶
Optional interference level for synthetic impairments.
- transforms¶
Transforms applied to the input data.
- target_labels¶
Names of target metadata fields to include.
- batch_size¶
Batch size for the training/validation/testing DataLoaders.
- num_workers¶
Number of worker processes for data loading.
- collate_fn¶
Custom collate function for batching.
- create_batch_size¶
Batch size used during on-disk dataset creation.
- create_num_workers¶
Number of workers used during dataset creation.
- file_writer¶
FileHandler class for disk I/O.
- file_reader¶
FileReader class for disk I/O.
- overwrite¶
If True, existing on-disk data will be overwritten.
- seed¶
Optional random seed for reproducibility.
- train¶
Initialized training dataset (set in setup()).
- val¶
Initialized validation dataset (set in setup()).
- test¶
Initialized test dataset (set in setup()).
Methods
Create an instance from torch.utils.data.Dataset.
Primary way of loading a datamodule from a checkpoint.
Called when loading a checkpoint, implement to reload datamodule state given datamodule state_dict.
Override to alter or apply batch augmentations to your batch after it is transferred to the device.
Override to alter or apply batch augmentations to your batch before it is transferred to the device.
Called when the trainer execution is interrupted by an exception.
An iterable or collection of iterables specifying prediction samples.
Prepares the dataset by creating new datasets if they do not exist on disk.
Remove ignored hyperparameters from the stored state.
Save arguments to
hparamsattribute.Sets up the train and validation datasets for the given stage.
Called when saving a checkpoint, implement to generate and save datamodule state.
Called at the end of fit (train + validate), validate, test, or predict.
Returns the DataLoader for the test dataset.
Returns the DataLoader for the training dataset.
Override this hook if your
DataLoaderreturns tensors wrapped in a custom data structure.Returns the DataLoader for the validation dataset.
Attributes
CHECKPOINT_HYPER_PARAMS_KEYCHECKPOINT_HYPER_PARAMS_NAMECHECKPOINT_HYPER_PARAMS_TYPEThe collection of hyperparameters saved with
save_hyperparameters().The collection of hyperparameters saved with
save_hyperparameters().name- __init__(root: str, metadata, dataset_size: int, dataset_splits: list[float] | list[int] = [0.7, 0.2, 0.1], batch_size: int = 1, num_workers: int = 1, collate_fn: callable | None = None, create_batch_size: int = 8, create_num_workers: int = 4, file_writer: BaseFileHandler = <class 'torchsig.utils.file_handlers.hdf5.HDF5Writer'>, file_reader: BaseFileHandler = <class 'torchsig.utils.file_handlers.hdf5.HDF5Reader'>, overwrite: bool = False, impairment_level: int = 0, transforms=[], target_labels: list[str] | None = None, seed: int | None = None)[source]¶
Initialize the TorchSigDataModule.
- Parameters:
root – Path to store or load the dataset.
metadata – Metadata object, YAML file path, or dict describing classes and settings.
dataset_size – Total number of samples to generate or load.
dataset_splits – Fractions or counts for train/val/test splits. Defaults to [0.70, 0.20, 0.10].
batch_size – Batch size for data loaders. Defaults to 1.
num_workers – Number of worker processes for data loading. Defaults to 1.
collate_fn – Custom function to collate batch samples. Defaults to None.
create_batch_size – Batch size when writing data to disk. Defaults to 8.
create_num_workers – Workers used when creating the on-disk dataset. Defaults to 4.
file_writer – FileWriter class for disk I/O.
file_reader – FileReader class for disk I/O.
overwrite – If True, existing data at root will be overwritten. Defaults to False.
impairment_level – Level of synthetic impairment to apply. Defaults to 0 (no impairment).
transforms – List of transforms applied to each sample’s input. Defaults to [].
target_labels – Names of metadata fields to include. Defaults to None.
seed – Seed for randomness and reproducibility. Defaults to None.
- Raises:
ValueError – If dataset_splits don’t sum to 1.0 (when using fractions).
FileNotFoundError – If metadata file path is invalid.
- prepare_data() None[source]¶
Prepares the dataset by creating new datasets if they do not exist on disk.
The datasets are created using the DatasetCreator class. If the dataset already exists on disk, it is loaded back into memory.
- Raises:
FileNotFoundError – If the root directory cannot be created.
RuntimeError – If dataset creation fails.
- setup(stage: str = 'train') None[source]¶
Sets up the train and validation datasets for the given stage.
- Parameters:
stage – The stage of the DataModule, typically ‘train’ or ‘test’. Defaults to ‘train’.
- Raises:
FileNotFoundError – If the dataset files are not found at the specified root.
ValueError – If dataset splits are invalid.
- train_dataloader() DataLoader[source]¶
Returns the DataLoader for the training dataset.
- Returns:
A PyTorch DataLoader for the training dataset.
- Raises:
RuntimeError – If the training dataset is not initialized.
- val_dataloader() DataLoader[source]¶
Returns the DataLoader for the validation dataset.
- Returns:
A PyTorch DataLoader for the validation dataset.
- Raises:
RuntimeError – If the validation dataset is not initialized.
- test_dataloader() DataLoader[source]¶
Returns the DataLoader for the test dataset.
- Returns:
A PyTorch DataLoader for the test dataset.
- Raises:
RuntimeError – If the test dataset is not initialized.
- __str__() str¶
Return a string representation of the datasets that are set up.
- Returns:
A string representation of the datasets that are setup.
- classmethod from_datasets(train_dataset: Dataset | Iterable[Dataset] | None = None, val_dataset: Dataset | Iterable[Dataset] | None = None, test_dataset: Dataset | Iterable[Dataset] | None = None, predict_dataset: Dataset | Iterable[Dataset] | None = None, batch_size: int = 1, num_workers: int = 0, **datamodule_kwargs: Any) LightningDataModule¶
Create an instance from torch.utils.data.Dataset.
- Parameters:
train_dataset – Optional dataset or iterable of datasets to be used for train_dataloader()
val_dataset – Optional dataset or iterable of datasets to be used for val_dataloader()
test_dataset – Optional dataset or iterable of datasets to be used for test_dataloader()
predict_dataset – Optional dataset or iterable of datasets to be used for predict_dataloader()
batch_size – Batch size to use for each dataloader. Default is 1. This parameter gets forwarded to the
__init__if the datamodule has such a name defined in its signature.num_workers – Number of subprocesses to use for data loading. 0 means that the data will be loaded in the main process. Number of CPUs available. This parameter gets forwarded to the
__init__if the datamodule has such a name defined in its signature.**datamodule_kwargs – Additional parameters that get passed down to the datamodule’s
__init__.
- property hparams: AttributeDict | MutableMapping¶
The collection of hyperparameters saved with
save_hyperparameters(). It is mutable by the user. For the frozen set of initial hyperparameters, usehparams_initial.- Returns:
Mutable hyperparameters dictionary
- property hparams_initial: AttributeDict¶
The collection of hyperparameters saved with
save_hyperparameters(). These contents are read-only. Manual updates to the saved hyperparameters can instead be performed throughhparams.- Returns:
immutable initial hyperparameters
- Return type:
AttributeDict
- load_from_checkpoint(checkpoint_path: str | Path | IO, map_location: device | str | int | Callable[[UntypedStorage, str], UntypedStorage | None] | dict[device | str | int, device | str | int] | None = None, hparams_file: str | Path | None = None, weights_only: bool | None = None, **kwargs: Any) Self¶
Primary way of loading a datamodule from a checkpoint. When Lightning saves a checkpoint it stores the arguments passed to
__init__in the checkpoint under"datamodule_hyper_parameters".Any arguments specified through **kwargs will override args stored in
"datamodule_hyper_parameters".- Parameters:
checkpoint_path – Path to checkpoint. This can also be a URL, or file-like object
map_location – If your checkpoint saved a GPU model and you now load on CPUs or a different number of GPUs, use this to map to the new setup. The behaviour is the same as in
torch.load().hparams_file –
Optional path to a
.yamlor.csvfile with hierarchical structure as in this example:dataloader: batch_size: 32
You most likely won’t need this since Lightning will always save the hyperparameters to the checkpoint. However, if your checkpoint weights don’t have the hyperparameters saved, use this method to pass in a
.yamlfile with the hparams you’d like to use. These will be converted into adictand passed into yourLightningDataModulefor use.If your datamodule’s
hparamsargument isNamespaceand.yamlfile has hierarchical structure, you need to refactor your datamodule to treathparamsasdict.weights_only – If
True, restricts loading tostate_dictsof plaintorch.Tensorand other primitive types. If loading a checkpoint from a trusted source that contains annn.Module, useweights_only=False. If loading checkpoint from an untrusted source, we recommend usingweights_only=True. For more information, please refer to the PyTorch Developer Notes on Serialization Semantics.**kwargs – Any extra keyword args needed to init the datamodule. Can also be used to override saved hyperparameter values.
- Returns:
LightningDataModuleinstance with loaded weights and hyperparameters (if available).
Note
load_from_checkpointis a class method. You must use yourLightningDataModuleclass to call it instead of theLightningDataModuleinstance, or aTypeErrorwill be raised.Example:
# load weights without mapping ... datamodule = MyLightningDataModule.load_from_checkpoint('path/to/checkpoint.ckpt') # or load weights and hyperparameters from separate files. datamodule = MyLightningDataModule.load_from_checkpoint( 'path/to/checkpoint.ckpt', hparams_file='/path/to/hparams_file.yaml' ) # override some of the params with new values datamodule = MyLightningDataModule.load_from_checkpoint( PATH, batch_size=32, num_workers=10, )
- load_state_dict(state_dict: dict[str, Any]) None¶
Called when loading a checkpoint, implement to reload datamodule state given datamodule state_dict.
- Parameters:
state_dict – the datamodule state returned by
state_dict.
- on_after_batch_transfer(batch: Any, dataloader_idx: int) Any¶
Override to alter or apply batch augmentations to your batch after it is transferred to the device.
Note
To check the current state of execution of this hook you can use
self.trainer.training/testing/validating/predictingso that you can add different logic as per your requirement.- Parameters:
batch – A batch of data that needs to be altered or augmented.
dataloader_idx – The index of the dataloader to which the batch belongs.
- Returns:
A batch of data
Example:
def on_after_batch_transfer(self, batch, dataloader_idx): batch['x'] = gpu_transforms(batch['x']) return batch
- on_before_batch_transfer(batch: Any, dataloader_idx: int) Any¶
Override to alter or apply batch augmentations to your batch before it is transferred to the device.
Note
To check the current state of execution of this hook you can use
self.trainer.training/testing/validating/predictingso that you can add different logic as per your requirement.- Parameters:
batch – A batch of data that needs to be altered or augmented.
dataloader_idx – The index of the dataloader to which the batch belongs.
- Returns:
A batch of data
Example:
def on_before_batch_transfer(self, batch, dataloader_idx): batch['x'] = transforms(batch['x']) return batch
- on_exception(exception: BaseException) None¶
Called when the trainer execution is interrupted by an exception.
- predict_dataloader() Any¶
An iterable or collection of iterables specifying prediction samples.
For more information about multiple dataloaders, see this section.
It’s recommended that all data downloads and preparation happen in
prepare_data().predict()
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.
- Returns:
A
torch.utils.data.DataLoaderor a sequence of them specifying prediction samples.
- remove_ignored_hparams(ignore_list: list[str]) None¶
Remove ignored hyperparameters from the stored state.
This allows derived classes to drop hyperparameters previously saved by base classes.
- Parameters:
ignore_list – Names of hyperparameters to remove.
- save_hyperparameters(*args: Any, ignore: Sequence[str] | str | None = None, frame: FrameType | None = None, logger: bool = True) None¶
Save arguments to
hparamsattribute.- Parameters:
args – single object of dict, NameSpace or OmegaConf or string names or arguments from class
__init__ignore – an argument name or a list of argument names from class
__init__to be ignoredframe – a frame object. Default is None
logger – Whether to send the hyperparameters to the logger. Default: True
- Example::
>>> from pytorch_lightning.core.mixins import HyperparametersMixin >>> class ManuallyArgsModel(HyperparametersMixin): ... def __init__(self, arg1, arg2, arg3): ... super().__init__() ... # manually assign arguments ... self.save_hyperparameters('arg1', 'arg3') ... def forward(self, *args, **kwargs): ... ... >>> model = ManuallyArgsModel(1, 'abc', 3.14) >>> model.hparams "arg1": 1 "arg3": 3.14
>>> from pytorch_lightning.core.mixins import HyperparametersMixin >>> class AutomaticArgsModel(HyperparametersMixin): ... def __init__(self, arg1, arg2, arg3): ... super().__init__() ... # equivalent automatic ... self.save_hyperparameters() ... def forward(self, *args, **kwargs): ... ... >>> model = AutomaticArgsModel(1, 'abc', 3.14) >>> model.hparams "arg1": 1 "arg2": abc "arg3": 3.14
>>> from pytorch_lightning.core.mixins import HyperparametersMixin >>> class SingleArgModel(HyperparametersMixin): ... def __init__(self, params): ... super().__init__() ... # manually assign single argument ... self.save_hyperparameters(params) ... def forward(self, *args, **kwargs): ... ... >>> model = SingleArgModel(Namespace(p1=1, p2='abc', p3=3.14)) >>> model.hparams "p1": 1 "p2": abc "p3": 3.14
>>> from pytorch_lightning.core.mixins import HyperparametersMixin >>> class ManuallyArgsModel(HyperparametersMixin): ... def __init__(self, arg1, arg2, arg3): ... super().__init__() ... # pass argument(s) to ignore as a string or in a list ... self.save_hyperparameters(ignore='arg2') ... def forward(self, *args, **kwargs): ... ... >>> model = ManuallyArgsModel(1, 'abc', 3.14) >>> model.hparams "arg1": 1 "arg3": 3.14
- state_dict() dict[str, Any]¶
Called when saving a checkpoint, implement to generate and save datamodule state.
- Returns:
A dictionary containing datamodule state.
- teardown(stage: str) None¶
Called at the end of fit (train + validate), validate, test, or predict.
- Parameters:
stage – either
'fit','validate','test', or'predict'
- transfer_batch_to_device(batch: Any, device: device, dataloader_idx: int) Any¶
Override this hook if your
DataLoaderreturns tensors wrapped in a custom data structure.The data types listed below (and any arbitrary nesting of them) are supported out of the box:
For anything else, you need to define how the data is moved to the target device (CPU, GPU, TPU, …).
Note
This hook should only transfer the data and not modify it, nor should it move the data to any other device than the one passed in as argument (unless you know what you are doing). To check the current state of execution of this hook you can use
self.trainer.training/testing/validating/predictingso that you can add different logic as per your requirement.- Parameters:
batch – A batch of data that needs to be transferred to a new device.
device – The target device as defined in PyTorch.
dataloader_idx – The index of the dataloader to which the batch belongs.
- Returns:
A reference to the data on the new device.
Example:
def transfer_batch_to_device(self, batch, device, dataloader_idx): if isinstance(batch, CustomBatch): # move all tensors in your custom data structure to the device batch.samples = batch.samples.to(device) batch.targets = batch.targets.to(device) elif dataloader_idx == 0: # skip device transfer for the first dataloader or anything you wish pass else: batch = super().transfer_batch_to_device(batch, device, dataloader_idx) return batch
See also
move_data_to_device()apply_to_collection()