Description
I am currently working on implementing nanoGPT using PyTorch Lightning. My goal is to load a large memmapped OpenWebText dataset (16GB) using a PyTorch dataset and a PyTorch Lightning data module for training in a Multi-GPU (8) setting. However, I am encountering difficulties as the loading process appears to be stuck and fails to work altogether. Strangely, the loading works fine for single GPU training and when the file is small.
I would appreciate guidance and suggestions on how to resolve this loading issue, particularly when dealing with large memmapped datasets.
Environment
The training script is running in a SLURM managed environment.
Setup
module load openmpi
module load cuda/12.1
export MASTER_ADDR=`hostname`
export MASTER_PORT=12802
export NCCL_PROTO=simple
export FI_EFA_FORK_SAFE=1
export FI_LOG_LEVEL=1
export FI_EFA_USE_DEVICE_RDMA=1
export NCCL_DEBUG=info
export PYTHONFAULTHANDLER=1
export CUDA_LAUNCH_BLOCKING=0
export OMPI_MCA_mtl_base_verbose=1
export FI_EFA_ENABLE_SHM_TRANSFER=0
export FI_PROVIDER=efa
export FI_EFA_TX_MIN_CREDITS=64
export NCCL_TREE_THRESHOLD=0
Python Packages
datasets==2.13.1
einops==0.6.1
jsonargparse==4.21.1
lightning==2.0.2
numpy==1.24.3
setuptools==67.7.2
torch==2.0.1
torchinfo==1.8.0
tqdm==4.65.0
transformers==4.29.2
pydantic==1.10.8
tensorboard==2.13.0
Implementation
Data Preparation Script
import os
from dataclasses import dataclass
from typing import Optional, Union
import numpy as np
from datasets import load_dataset
from jsonargparse import CLI
from lightning import seed_everything
from tqdm.auto import tqdm
from transformers import AutoTokenizer
@dataclass
class Config:
dataset_name_or_path: str
tokenizer_name_or_path: str
output_dir: str
streaming: bool = False
val_split: Optional[Union[int, float]] = None
seed: int = 0
num_proc: int = 4
batch_size: int = 1_000
def cli_main(config: Config) -> None:
# Setup: Reproducibility & Output directories
seed_everything(config.seed)
os.makedirs(config.output_dir, exist_ok=True)
# DataLoading & Preparation
dataset = load_dataset(config.dataset_name_or_path, streaming=config.streaming)
# Split training dataset into training & validation sets
if config.val_split and "val" not in dataset.keys():
split_dataset = dataset["train"].train_test_split(
test_size=config.val_split, seed=config.seed, shuffle=True
)
dataset["train"] = split_dataset.pop("train")
dataset["val"] = split_dataset.pop("test")
# Run tokenization
tokenizer = AutoTokenizer.from_pretrained(config.tokenizer_name_or_path)
# For packed datasets the eos token must be provided
if tokenizer.eos_token is None:
tokenizer.eos_token = tokenizer.pad_token
def add_eos_batched(ids):
return [
x + [tokenizer.eos_token_id] if x[-1] != tokenizer.eos_token_id else x
for x in ids
]
def tokenize_examples(examples):
ids = tokenizer(examples["text"]).input_ids
# Optionally append eos token
ids = add_eos_batched(ids)
length = [len(x) for x in ids]
return {"ids": ids, "len": length}
dataset = dataset.map(
tokenize_examples,
remove_columns=dataset["train"].column_names,
desc="Running Tokenization",
batched=True,
num_proc=config.num_proc,
)
# Save tokenized dataset
for split_name, split in dataset.items():
filepath = os.path.join(config.output_dir, f"{split_name}.bin")
np_split = np.memmap(
filepath, dtype=np.uint16, mode="w+", shape=(sum(split["len"]),)
)
num_samples = len(split["len"])
num_batches = int(np.ceil(num_samples / config.batch_size))
desc = f"Saving {split_name} dataset. Num samples: {num_samples}. Num batches: {num_batches}"
offset = 0
for batch_idx in tqdm(range(num_batches), desc=desc):
batch = split.shard(
num_shards=num_batches, index=batch_idx, contiguous=True
).with_format("numpy")
batch = np.concatenate(batch["ids"])
np_split[offset : offset + len(batch)] = batch
offset += len(batch)
np_split.flush()
if __name__ == "__main__":
config = CLI(Config, as_positional=False)
cli_main(config)
DataLoading
import os
from dataclasses import dataclass
from typing import Optional
import numpy as np
import torch
from lightning import LightningDataModule
from torch.utils.data import DataLoader, Dataset
class PackedDataset(Dataset):
def __init__(
self, filepath: str, block_size: int = 1024, filemode: str = "r"
) -> None:
super().__init__()
self.filepath = filepath
self.block_size = block_size
self.data = np.memmap(filepath, dtype=np.uint16, mode=filemode)
def __len__(self) -> int:
return len(self.data) - self.block_size
def __getitem__(self, index: int) -> torch.Tensor:
return torch.as_tensor(
self.data[index : index + self.block_size].astype(np.int64)
)
@dataclass
class PackedDataModuleConfig:
data_dir: str
block_size: int
filemode: str = "r"
train_batch_size: Optional[int] = None
val_batch_size: Optional[int] = None
test_batch_size: Optional[int] = None
num_workers: int = 4
pin_memory: bool = False
persistent_workers: bool = False
class PackedDataModule(LightningDataModule):
def __init__(self, config: PackedDataModuleConfig) -> None:
super().__init__()
self.config = config
self.splits = os.listdir(self.config.data_dir)
def setup(self, stage: Optional[str] = None) -> None:
if self.config.train_batch_size and "train.bin" in self.splits:
self.train_dataset = PackedDataset(
os.path.join(self.config.data_dir, "train.bin"),
self.config.block_size,
self.config.filemode,
)
if self.config.val_batch_size and "val.bin" in self.splits:
self.val_dataset = PackedDataset(
os.path.join(self.config.data_dir, "val.bin"),
self.config.block_size,
self.config.filemode,
)
if self.config.test_batch_size and "test.bin" in self.splits:
self.test_dataset = PackedDataset(
os.path.join(self.config.data_dir, "test.bin"),
self.config.block_size,
self.config.filemode,
)
def train_dataloader(self) -> Optional[DataLoader]:
if hasattr(self, "train_dataset"):
return self._create_dataloader(
self.train_dataset, self.config.train_batch_size
)
def val_dataloader(self) -> Optional[DataLoader]:
if hasattr(self, "val_dataset"):
return self._create_dataloader(self.val_dataset, self.config.val_batch_size)
def test_dataloader(self) -> Optional[DataLoader]:
if hasattr(self, "test_dataset"):
return self._create_dataloader(
self.test_dataset, self.config.test_batch_size
)
def _create_dataloader(self, dataset: Dataset, batch_size: int) -> DataLoader:
return DataLoader(
dataset,
batch_size=batch_size,
shuffle=False, # Causes the process to hang for large files
num_workers=self.config.num_workers,
pin_memory=self.config.pin_memory,
persistent_workers=self.config.persistent_workers,
)