|
Mila
Deep Neural Network Library
|
High-performance data loading class for GPT-2 tokenized datasets with multi-threaded processing. More...

Public Member Functions | |
| DatasetReader (const std::string &file_path, size_t batch_size, size_t seq_len, const DSReaderConfig &config=DSReaderConfig()) | |
| Constructs a new DatasetReader object. | |
| ~DatasetReader () | |
| Destroys the DatasetReader object. | |
| template<typename TensorType = DeviceTensor<int>> requires (std::same_as<TensorType, HostTensor<int>> || std::same_as<TensorType, PinnedTensor<int>> || std::same_as<TensorType, DeviceTensor<int>>) | |
| std::pair< TensorType, TensorType > | next_batch () |
| Fetches the next batch of training data with inputs and targets. | |
| void | pause () |
| Pauses the background loading threads. | |
| void | resume () |
| Resumes the background loading threads after being paused. | |
Private Member Functions | |
| void | allocate_memory (int *&buffer) |
| Allocates memory for tensor buffers. | |
| void | free_memory (int *&buffer) |
| Frees previously allocated memory. | |
| std::span< const int > | get_tokens (size_t position, size_t count) |
| Gets a non-owning view of tokens at the specified position. | |
| void | initialize_dataset () |
| Initializes the dataset access without loading the entire file into memory. | |
| void | load_window (size_t start_token) |
| Loads a specific window of tokens from the dataset file. | |
| void | log (const std::string &message, int level) |
| Logs messages through the configured logger. | |
| void | preprocess_batches () |
| Background thread function that preprocesses raw data into input/target pairs. | |
| void | read_from_disk () |
| Background thread function that reads data from the dataset file. | |
Private Attributes | |
| std::queue< std::pair< int *, int * > > | batch_queue_ |
| Queue for preprocessed input/target pairs. | |
| size_t | batch_size_ |
| Number of sequences in each batch. | |
| DSReaderConfig | config_ |
| Configuration settings. | |
| std::mutex | control_mutex_ |
| Mutex for control operations. | |
| size_t | current_window_end_ |
| Last token index in current window. | |
| size_t | current_window_start_ |
| First token index in current window. | |
| std::condition_variable | cv_io_ |
| Condition variable for I/O thread synchronization. | |
| std::condition_variable | cv_processing_ |
| Condition variable for processing thread synchronization. | |
| std::ifstream | file_ |
| File stream for the dataset. | |
| std::string | file_path_ |
| Path to the dataset file. | |
| size_t | file_size_ |
| Size of the dataset file in bytes. | |
| std::thread | io_thread_ |
| Thread for disk I/O operations. | |
| std::mutex | mutex_ |
| Mutex for protecting shared queues. | |
| size_t | num_tokens_ |
| Total number of tokens in the dataset. | |
| std::atomic< bool > | paused_ { false } |
| Flag to signal threads to pause. | |
| int * | pinned_inputs_ |
| Buffer for input tensors. | |
| int * | pinned_targets_ |
| Buffer for target tensors. | |
| std::thread | processing_thread_ |
| Thread for data preprocessing. | |
| std::queue< int * > | raw_data_queue_ |
| Queue for raw data batches. | |
| size_t | seq_len_ |
| Length of each sequence in tokens. | |
| std::atomic< bool > | stop_ { false } |
| Flag to signal threads to stop. | |
| std::unique_ptr< int[]> | token_window_ |
| Window buffer for tokens. | |
| size_t | token_window_size_ |
| Size of token window in tokens. | |
High-performance data loading class for GPT-2 tokenized datasets with multi-threaded processing.
The DatasetReader class provides an efficient way to read, preprocess, and batch tokenized datasets for language model training. It implements a multi-threaded pipeline with background I/O operations and preprocessing to minimize training latency:
|
inline |
Constructs a new DatasetReader object.
| file_path | Path to the tokenized dataset file containing integers. |
| batch_size | Number of sequences in each batch. |
| seq_len | Length of each sequence in tokens. |
| config | Additional configuration parameters. |
| std::invalid_argument | If batch_size or seq_len is zero. |
| std::runtime_error | If file operations fail or memory allocation fails. |
The constructor performs several initialization steps:

|
inline |
Destroys the DatasetReader object.
Safely stops all background threads and releases allocated memory.

|
inlineprivate |
Allocates memory for tensor buffers.
| buffer | Reference to pointer that will hold the allocated memory. |
| std::runtime_error | If CUDA memory allocation fails. |
| std::bad_alloc | If standard memory allocation fails. |
Attempts to allocate CUDA pinned memory for optimal performance. Falls back to standard allocation if CUDA is not available.


|
inlineprivate |
Frees previously allocated memory.
| buffer | Reference to pointer to free and nullify. |

|
inlineprivate |
Gets a non-owning view of tokens at the specified position.
| position | Starting token position |
| count | Number of tokens to access |
| std::runtime_error | If the position is invalid or reading fails. |


|
inlineprivate |
Initializes the dataset access without loading the entire file into memory.
| std::runtime_error | If reading fails or initialization fails. |
Uses a streaming approach to access the dataset file with a fixed-size token window buffer, using std::span for safe, non-owning memory views.


|
inlineprivate |
Loads a specific window of tokens from the dataset file.
| start_token | Index of the first token in the window to load. |
| std::runtime_error | If reading fails. |


|
inlineprivate |
Logs messages through the configured logger.
| message | The message text to log. |
| level | The log level (higher values mean less important). |

|
inline |
Fetches the next batch of training data with inputs and targets.
| TensorType | The tensor type to return (defaults to DeviceTensor<int>). |
| std::runtime_error | If timeout occurs waiting for batch or if the reader was stopped. |
This method blocks until a preprocessed batch is available (up to 5 seconds timeout). The returned tensors contain shifted versions of the same data - targets are inputs shifted by one position (implementing language modeling next-token prediction).

|
inline |
Pauses the background loading threads.
This can be used to temporarily reduce system load without destroying the reader. Background threads will wait until resume() is called.

|
inlineprivate |
Background thread function that preprocesses raw data into input/target pairs.
Takes raw data from raw_data_queue_, creates input/target pairs by shifting sequences, and places results in batch_queue_ for consumption.


|
inlineprivate |
Background thread function that reads data from the dataset file.
Continuously reads batches from the dataset file using the window buffer and pushes them to the raw_data_queue_ for preprocessing.


|
inline |
Resumes the background loading threads after being paused.
Wakes up the background threads and continues data loading operations.

|
private |
Queue for preprocessed input/target pairs.
|
private |
Number of sequences in each batch.
|
private |
Configuration settings.
|
private |
Mutex for control operations.
|
private |
Last token index in current window.
|
private |
First token index in current window.
|
private |
Condition variable for I/O thread synchronization.
|
private |
Condition variable for processing thread synchronization.
|
private |
File stream for the dataset.
|
private |
Path to the dataset file.
|
private |
Size of the dataset file in bytes.
|
private |
Thread for disk I/O operations.
|
private |
Mutex for protecting shared queues.
|
private |
Total number of tokens in the dataset.
|
private |
Flag to signal threads to pause.
|
private |
Buffer for input tensors.
|
private |
Buffer for target tensors.
|
private |
Thread for data preprocessing.
|
private |
Queue for raw data batches.
|
private |
Length of each sequence in tokens.
|
private |
Flag to signal threads to stop.
|
private |
Window buffer for tokens.
|
private |
Size of token window in tokens.