Mila
Deep Neural Network Library
|
High-performance data loading class for GPT-2 tokenized datasets with multi-threaded processing. More...
Public Member Functions | |
DatasetReader (const std::string &file_path, size_t batch_size, size_t seq_len, const DSReaderConfig &config=DSReaderConfig()) | |
Constructs a new DatasetReader object. | |
~DatasetReader () | |
Destroys the DatasetReader object. | |
template<typename TensorType = DeviceTensor<int>> requires (std::same_as<TensorType, HostTensor<int>> || std::same_as<TensorType, PinnedTensor<int>> || std::same_as<TensorType, DeviceTensor<int>>) | |
std::pair< TensorType, TensorType > | next_batch () |
Fetches the next batch of training data with inputs and targets. | |
void | pause () |
Pauses the background loading threads. | |
void | resume () |
Resumes the background loading threads after being paused. | |
Private Member Functions | |
void | allocate_memory (int *&buffer) |
Allocates memory for tensor buffers. | |
void | free_memory (int *&buffer) |
Frees previously allocated memory. | |
std::span< const int > | get_tokens (size_t position, size_t count) |
Gets a non-owning view of tokens at the specified position. | |
void | initialize_dataset () |
Initializes the dataset access without loading the entire file into memory. | |
void | load_window (size_t start_token) |
Loads a specific window of tokens from the dataset file. | |
void | log (const std::string &message, int level) |
Logs messages through the configured logger. | |
void | preprocess_batches () |
Background thread function that preprocesses raw data into input/target pairs. | |
void | read_from_disk () |
Background thread function that reads data from the dataset file. | |
Private Attributes | |
std::queue< std::pair< int *, int * > > | batch_queue_ |
Queue for preprocessed input/target pairs. | |
size_t | batch_size_ |
Number of sequences in each batch. | |
DSReaderConfig | config_ |
Configuration settings. | |
std::mutex | control_mutex_ |
Mutex for control operations. | |
size_t | current_window_end_ |
Last token index in current window. | |
size_t | current_window_start_ |
First token index in current window. | |
std::condition_variable | cv_io_ |
Condition variable for I/O thread synchronization. | |
std::condition_variable | cv_processing_ |
Condition variable for processing thread synchronization. | |
std::ifstream | file_ |
File stream for the dataset. | |
std::string | file_path_ |
Path to the dataset file. | |
size_t | file_size_ |
Size of the dataset file in bytes. | |
std::thread | io_thread_ |
Thread for disk I/O operations. | |
std::mutex | mutex_ |
Mutex for protecting shared queues. | |
size_t | num_tokens_ |
Total number of tokens in the dataset. | |
std::atomic< bool > | paused_ { false } |
Flag to signal threads to pause. | |
int * | pinned_inputs_ |
Buffer for input tensors. | |
int * | pinned_targets_ |
Buffer for target tensors. | |
std::thread | processing_thread_ |
Thread for data preprocessing. | |
std::queue< int * > | raw_data_queue_ |
Queue for raw data batches. | |
size_t | seq_len_ |
Length of each sequence in tokens. | |
std::atomic< bool > | stop_ { false } |
Flag to signal threads to stop. | |
std::unique_ptr< int[]> | token_window_ |
Window buffer for tokens. | |
size_t | token_window_size_ |
Size of token window in tokens. | |
High-performance data loading class for GPT-2 tokenized datasets with multi-threaded processing.
The DatasetReader class provides an efficient way to read, preprocess, and batch tokenized datasets for language model training. It implements a multi-threaded pipeline with background I/O operations and preprocessing to minimize training latency:
|
inline |
Constructs a new DatasetReader object.
file_path | Path to the tokenized dataset file containing integers. |
batch_size | Number of sequences in each batch. |
seq_len | Length of each sequence in tokens. |
config | Additional configuration parameters. |
std::invalid_argument | If batch_size or seq_len is zero. |
std::runtime_error | If file operations fail or memory allocation fails. |
The constructor performs several initialization steps:
|
inline |
Destroys the DatasetReader object.
Safely stops all background threads and releases allocated memory.
|
inlineprivate |
Allocates memory for tensor buffers.
buffer | Reference to pointer that will hold the allocated memory. |
std::runtime_error | If CUDA memory allocation fails. |
std::bad_alloc | If standard memory allocation fails. |
Attempts to allocate CUDA pinned memory for optimal performance. Falls back to standard allocation if CUDA is not available.
|
inlineprivate |
Frees previously allocated memory.
buffer | Reference to pointer to free and nullify. |
|
inlineprivate |
Gets a non-owning view of tokens at the specified position.
position | Starting token position |
count | Number of tokens to access |
std::runtime_error | If the position is invalid or reading fails. |
|
inlineprivate |
Initializes the dataset access without loading the entire file into memory.
std::runtime_error | If reading fails or initialization fails. |
Uses a streaming approach to access the dataset file with a fixed-size token window buffer, using std::span for safe, non-owning memory views.
|
inlineprivate |
Loads a specific window of tokens from the dataset file.
start_token | Index of the first token in the window to load. |
std::runtime_error | If reading fails. |
|
inlineprivate |
Logs messages through the configured logger.
message | The message text to log. |
level | The log level (higher values mean less important). |
|
inline |
Fetches the next batch of training data with inputs and targets.
TensorType | The tensor type to return (defaults to DeviceTensor<int>). |
std::runtime_error | If timeout occurs waiting for batch or if the reader was stopped. |
This method blocks until a preprocessed batch is available (up to 5 seconds timeout). The returned tensors contain shifted versions of the same data - targets are inputs shifted by one position (implementing language modeling next-token prediction).
|
inline |
Pauses the background loading threads.
This can be used to temporarily reduce system load without destroying the reader. Background threads will wait until resume() is called.
|
inlineprivate |
Background thread function that preprocesses raw data into input/target pairs.
Takes raw data from raw_data_queue_, creates input/target pairs by shifting sequences, and places results in batch_queue_ for consumption.
|
inlineprivate |
Background thread function that reads data from the dataset file.
Continuously reads batches from the dataset file using the window buffer and pushes them to the raw_data_queue_ for preprocessing.
|
inline |
Resumes the background loading threads after being paused.
Wakes up the background threads and continues data loading operations.
|
private |
Queue for preprocessed input/target pairs.
|
private |
Number of sequences in each batch.
|
private |
Configuration settings.
|
private |
Mutex for control operations.
|
private |
Last token index in current window.
|
private |
First token index in current window.
|
private |
Condition variable for I/O thread synchronization.
|
private |
Condition variable for processing thread synchronization.
|
private |
File stream for the dataset.
|
private |
Path to the dataset file.
|
private |
Size of the dataset file in bytes.
|
private |
Thread for disk I/O operations.
|
private |
Mutex for protecting shared queues.
|
private |
Total number of tokens in the dataset.
|
private |
Flag to signal threads to pause.
|
private |
Buffer for input tensors.
|
private |
Buffer for target tensors.
|
private |
Thread for data preprocessing.
|
private |
Queue for raw data batches.
|
private |
Length of each sequence in tokens.
|
private |
Flag to signal threads to stop.
|
private |
Window buffer for tokens.
|
private |
Size of token window in tokens.