Mila
Deep Neural Network Library
Loading...
Searching...
No Matches
Mila::Dnn::Gpt2::DatasetReader Class Referenceexport

High-performance data loading class for GPT-2 tokenized datasets with multi-threaded processing. More...

Collaboration diagram for Mila::Dnn::Gpt2::DatasetReader:

Public Member Functions

 DatasetReader (const std::string &file_path, size_t batch_size, size_t seq_len, const DSReaderConfig &config=DSReaderConfig())
 Constructs a new DatasetReader object.
 
 ~DatasetReader ()
 Destroys the DatasetReader object.
 
template<typename TensorType = DeviceTensor<int>>
requires (std::same_as<TensorType, HostTensor<int>> || std::same_as<TensorType, PinnedTensor<int>> || std::same_as<TensorType, DeviceTensor<int>>)
std::pair< TensorType, TensorType > next_batch ()
 Fetches the next batch of training data with inputs and targets.
 
void pause ()
 Pauses the background loading threads.
 
void resume ()
 Resumes the background loading threads after being paused.
 

Private Member Functions

void allocate_memory (int *&buffer)
 Allocates memory for tensor buffers.
 
void free_memory (int *&buffer)
 Frees previously allocated memory.
 
std::span< const int > get_tokens (size_t position, size_t count)
 Gets a non-owning view of tokens at the specified position.
 
void initialize_dataset ()
 Initializes the dataset access without loading the entire file into memory.
 
void load_window (size_t start_token)
 Loads a specific window of tokens from the dataset file.
 
void log (const std::string &message, int level)
 Logs messages through the configured logger.
 
void preprocess_batches ()
 Background thread function that preprocesses raw data into input/target pairs.
 
void read_from_disk ()
 Background thread function that reads data from the dataset file.
 

Private Attributes

std::queue< std::pair< int *, int * > > batch_queue_
 Queue for preprocessed input/target pairs.
 
size_t batch_size_
 Number of sequences in each batch.
 
DSReaderConfig config_
 Configuration settings.
 
std::mutex control_mutex_
 Mutex for control operations.
 
size_t current_window_end_
 Last token index in current window.
 
size_t current_window_start_
 First token index in current window.
 
std::condition_variable cv_io_
 Condition variable for I/O thread synchronization.
 
std::condition_variable cv_processing_
 Condition variable for processing thread synchronization.
 
std::ifstream file_
 File stream for the dataset.
 
std::string file_path_
 Path to the dataset file.
 
size_t file_size_
 Size of the dataset file in bytes.
 
std::thread io_thread_
 Thread for disk I/O operations.
 
std::mutex mutex_
 Mutex for protecting shared queues.
 
size_t num_tokens_
 Total number of tokens in the dataset.
 
std::atomic< bool > paused_ { false }
 Flag to signal threads to pause.
 
int * pinned_inputs_
 Buffer for input tensors.
 
int * pinned_targets_
 Buffer for target tensors.
 
std::thread processing_thread_
 Thread for data preprocessing.
 
std::queue< int * > raw_data_queue_
 Queue for raw data batches.
 
size_t seq_len_
 Length of each sequence in tokens.
 
std::atomic< bool > stop_ { false }
 Flag to signal threads to stop.
 
std::unique_ptr< int[]> token_window_
 Window buffer for tokens.
 
size_t token_window_size_
 Size of token window in tokens.
 

Detailed Description

High-performance data loading class for GPT-2 tokenized datasets with multi-threaded processing.

The DatasetReader class provides an efficient way to read, preprocess, and batch tokenized datasets for language model training. It implements a multi-threaded pipeline with background I/O operations and preprocessing to minimize training latency:

  • Streams data from files using a sliding window approach
  • Supports pinned memory allocation for efficient GPU transfers
  • Implements a producer-consumer pattern with configurable queue sizes
  • Can be paused and resumed for controlled resource utilization
  • Automatically handles dataset wrap-around for continuous training

Constructor & Destructor Documentation

◆ DatasetReader()

Mila::Dnn::Gpt2::DatasetReader::DatasetReader ( const std::string &  file_path,
size_t  batch_size,
size_t  seq_len,
const DSReaderConfig config = DSReaderConfig() 
)
inline

Constructs a new DatasetReader object.

Parameters
file_pathPath to the tokenized dataset file containing integers.
batch_sizeNumber of sequences in each batch.
seq_lenLength of each sequence in tokens.
configAdditional configuration parameters.
Exceptions
std::invalid_argumentIf batch_size or seq_len is zero.
std::runtime_errorIf file operations fail or memory allocation fails.

The constructor performs several initialization steps:

  1. Opens the dataset file and initializes window-based streaming access
  2. Allocates memory for input/target tensors (using pinned memory if CUDA is available)
  3. Starts background threads for parallel I/O and preprocessing
Here is the call graph for this function:

◆ ~DatasetReader()

Mila::Dnn::Gpt2::DatasetReader::~DatasetReader ( )
inline

Destroys the DatasetReader object.

Safely stops all background threads and releases allocated memory.

Here is the call graph for this function:

Member Function Documentation

◆ allocate_memory()

void Mila::Dnn::Gpt2::DatasetReader::allocate_memory ( int *&  buffer)
inlineprivate

Allocates memory for tensor buffers.

Parameters
bufferReference to pointer that will hold the allocated memory.
Exceptions
std::runtime_errorIf CUDA memory allocation fails.
std::bad_allocIf standard memory allocation fails.

Attempts to allocate CUDA pinned memory for optimal performance. Falls back to standard allocation if CUDA is not available.

Here is the call graph for this function:
Here is the caller graph for this function:

◆ free_memory()

void Mila::Dnn::Gpt2::DatasetReader::free_memory ( int *&  buffer)
inlineprivate

Frees previously allocated memory.

Parameters
bufferReference to pointer to free and nullify.
Here is the caller graph for this function:

◆ get_tokens()

std::span< const int > Mila::Dnn::Gpt2::DatasetReader::get_tokens ( size_t  position,
size_t  count 
)
inlineprivate

Gets a non-owning view of tokens at the specified position.

Parameters
positionStarting token position
countNumber of tokens to access
Returns
std::span<const int> A view of the requested tokens
Exceptions
std::runtime_errorIf the position is invalid or reading fails.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ initialize_dataset()

void Mila::Dnn::Gpt2::DatasetReader::initialize_dataset ( )
inlineprivate

Initializes the dataset access without loading the entire file into memory.

Exceptions
std::runtime_errorIf reading fails or initialization fails.

Uses a streaming approach to access the dataset file with a fixed-size token window buffer, using std::span for safe, non-owning memory views.

Here is the call graph for this function:
Here is the caller graph for this function:

◆ load_window()

void Mila::Dnn::Gpt2::DatasetReader::load_window ( size_t  start_token)
inlineprivate

Loads a specific window of tokens from the dataset file.

Parameters
start_tokenIndex of the first token in the window to load.
Exceptions
std::runtime_errorIf reading fails.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ log()

void Mila::Dnn::Gpt2::DatasetReader::log ( const std::string &  message,
int  level 
)
inlineprivate

Logs messages through the configured logger.

Parameters
messageThe message text to log.
levelThe log level (higher values mean less important).
Here is the caller graph for this function:

◆ next_batch()

template<typename TensorType = DeviceTensor<int>>
requires (std::same_as<TensorType, HostTensor<int>> || std::same_as<TensorType, PinnedTensor<int>> || std::same_as<TensorType, DeviceTensor<int>>)
std::pair< TensorType, TensorType > Mila::Dnn::Gpt2::DatasetReader::next_batch ( )
inline

Fetches the next batch of training data with inputs and targets.

Template Parameters
TensorTypeThe tensor type to return (defaults to DeviceTensor<int>).
Returns
std::pair<TensorType, TensorType> A pair containing (input tensor, target tensor).
Exceptions
std::runtime_errorIf timeout occurs waiting for batch or if the reader was stopped.

This method blocks until a preprocessed batch is available (up to 5 seconds timeout). The returned tensors contain shifted versions of the same data - targets are inputs shifted by one position (implementing language modeling next-token prediction).

Here is the call graph for this function:

◆ pause()

void Mila::Dnn::Gpt2::DatasetReader::pause ( )
inline

Pauses the background loading threads.

This can be used to temporarily reduce system load without destroying the reader. Background threads will wait until resume() is called.

Here is the call graph for this function:

◆ preprocess_batches()

void Mila::Dnn::Gpt2::DatasetReader::preprocess_batches ( )
inlineprivate

Background thread function that preprocesses raw data into input/target pairs.

Takes raw data from raw_data_queue_, creates input/target pairs by shifting sequences, and places results in batch_queue_ for consumption.

Here is the call graph for this function:
Here is the caller graph for this function:

◆ read_from_disk()

void Mila::Dnn::Gpt2::DatasetReader::read_from_disk ( )
inlineprivate

Background thread function that reads data from the dataset file.

Continuously reads batches from the dataset file using the window buffer and pushes them to the raw_data_queue_ for preprocessing.

Here is the call graph for this function:
Here is the caller graph for this function:

◆ resume()

void Mila::Dnn::Gpt2::DatasetReader::resume ( )
inline

Resumes the background loading threads after being paused.

Wakes up the background threads and continues data loading operations.

Here is the call graph for this function:

Member Data Documentation

◆ batch_queue_

std::queue<std::pair<int*, int*> > Mila::Dnn::Gpt2::DatasetReader::batch_queue_
private

Queue for preprocessed input/target pairs.

◆ batch_size_

size_t Mila::Dnn::Gpt2::DatasetReader::batch_size_
private

Number of sequences in each batch.

◆ config_

DSReaderConfig Mila::Dnn::Gpt2::DatasetReader::config_
private

Configuration settings.

◆ control_mutex_

std::mutex Mila::Dnn::Gpt2::DatasetReader::control_mutex_
private

Mutex for control operations.

◆ current_window_end_

size_t Mila::Dnn::Gpt2::DatasetReader::current_window_end_
private

Last token index in current window.

◆ current_window_start_

size_t Mila::Dnn::Gpt2::DatasetReader::current_window_start_
private

First token index in current window.

◆ cv_io_

std::condition_variable Mila::Dnn::Gpt2::DatasetReader::cv_io_
private

Condition variable for I/O thread synchronization.

◆ cv_processing_

std::condition_variable Mila::Dnn::Gpt2::DatasetReader::cv_processing_
private

Condition variable for processing thread synchronization.

◆ file_

std::ifstream Mila::Dnn::Gpt2::DatasetReader::file_
private

File stream for the dataset.

◆ file_path_

std::string Mila::Dnn::Gpt2::DatasetReader::file_path_
private

Path to the dataset file.

◆ file_size_

size_t Mila::Dnn::Gpt2::DatasetReader::file_size_
private

Size of the dataset file in bytes.

◆ io_thread_

std::thread Mila::Dnn::Gpt2::DatasetReader::io_thread_
private

Thread for disk I/O operations.

◆ mutex_

std::mutex Mila::Dnn::Gpt2::DatasetReader::mutex_
private

Mutex for protecting shared queues.

◆ num_tokens_

size_t Mila::Dnn::Gpt2::DatasetReader::num_tokens_
private

Total number of tokens in the dataset.

◆ paused_

std::atomic<bool> Mila::Dnn::Gpt2::DatasetReader::paused_ { false }
private

Flag to signal threads to pause.

◆ pinned_inputs_

int* Mila::Dnn::Gpt2::DatasetReader::pinned_inputs_
private

Buffer for input tensors.

◆ pinned_targets_

int* Mila::Dnn::Gpt2::DatasetReader::pinned_targets_
private

Buffer for target tensors.

◆ processing_thread_

std::thread Mila::Dnn::Gpt2::DatasetReader::processing_thread_
private

Thread for data preprocessing.

◆ raw_data_queue_

std::queue<int*> Mila::Dnn::Gpt2::DatasetReader::raw_data_queue_
private

Queue for raw data batches.

◆ seq_len_

size_t Mila::Dnn::Gpt2::DatasetReader::seq_len_
private

Length of each sequence in tokens.

◆ stop_

std::atomic<bool> Mila::Dnn::Gpt2::DatasetReader::stop_ { false }
private

Flag to signal threads to stop.

◆ token_window_

std::unique_ptr<int[]> Mila::Dnn::Gpt2::DatasetReader::token_window_
private

Window buffer for tokens.

◆ token_window_size_

size_t Mila::Dnn::Gpt2::DatasetReader::token_window_size_
private

Size of token window in tokens.


The documentation for this class was generated from the following file: