I am learning the FSDP example here but they used example that are not downloadable (has download restiction). I am sick and tired of poorly written tutorial like this whereas they take examples of undownloadable dataset.
https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html
I tried to remedy by substiting the mnist dataset by writing completely randomized dataset creation (whole code pasted below) but I must be doing something with train_size which I set to 30,000 for randomized data creation:
WORLD_SIZE = torch.cuda.device_count()
mp.spawn(fsdp_main,
args=(WORLD_SIZE, args),
nprocs=WORLD_SIZE,
join=True)
whole code:
import os
import argparse
import functools
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optimfrom torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoaderfrom torch.optim.lr_scheduler import StepLR
import torch.distributed as dist
import torch.multiprocessing as mpfrom torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import (
CPUOffload,
BackwardPrefetch,
)
from torch.distributed.fsdp.wrap import (
size_based_auto_wrap_policy,
enable_wrap,
wrap,
)CONFIG_USE_RANDOM_DATA=1
Parameters and DataLoaders
input_size = 1000
output_size = 10
batch_size = 1000
data_size = 60000
train_size = 30000
valid_size = 20000
tset_size = 10000
def setup(rank, world_size):
os.environ[‘MASTER_ADDR’] = ‘localhost’
os.environ[‘MASTER_PORT’] = ‘12355’# initialize the process group dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()class Net(nn.Module):
def init(self):
super(Net, self).init()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = self.dropout1(x) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output
def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None):
model.train()
ddp_loss = torch.zeros(2).to(rank)
if sampler:
sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target, reduction=‘sum’)
loss.backward()
optimizer.step()
ddp_loss[0] += loss.item()
ddp_loss[1] += len(data)dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) if rank == 0: print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1]))
def test(model, rank, world_size, test_loader):
model.eval()
correct = 0
ddp_loss = torch.zeros(3).to(rank)
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(rank), target.to(rank)
output = model(data)
ddp_loss[0] += F.nll_loss(output, target, reduction=‘sum’).item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item()
ddp_loss[2] += len(data)dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) if rank == 0: test_loss = ddp_loss[0] / ddp_loss[2] print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( test_loss, int(ddp_loss[1]), int(ddp_loss[2]), 100. * ddp_loss[1] / ddp_loss[2]))
def fsdp_main(rank, world_size, args):
setup(rank, world_size)transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) if not CONFIG_USE_RANDOM_DATA: train_kwargs = {'batch_size': args.batch_size} test_kwargs = {'batch_size': args.test_batch_size} cuda_kwargs = {'num_workers': 2, 'pin_memory': True, 'shuffle': False} train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs) else: dataset1 = datasets.MNIST('../data', train=True, download=True, transform=transform) dataset2 = datasets.MNIST('../data', train=False, sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) train_kwargs = {'batch_size': args.batch_size, 'sampler': sampler1} test_kwargs = {'batch_size': args.test_batch_size, 'sampler': sampler2} cuda_kwargs = {'num_workers': 2, 'pin_memory': True, 'shuffle': False} train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs) if CONFIG_USE_RANDOM_DATA: train_loader = torch.utils.data.DataLoader(dataset=RandomDataset(input_size, train_size), **train_kwargs) test_loader = torch.utul.data.DataLoader(dataset=RandomDataset(input_size, test_size), **test_kwargs) else: train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) my_auto_wrap_policy = functools.partial( size_based_auto_wrap_policy, min_num_params=100 ) torch.cuda.set_device(rank) init_start_event = torch.cuda.Event(enable_timing=True) init_end_event = torch.cuda.Event(enable_timing=True) model = Net().to(rank) model = FSDP(model) optimizer = optim.Adadelta(model.parameters(), lr=args.lr) scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma) init_start_event.record() for epoch in range(1, args.epochs + 1): train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) test(model, rank, world_size, test_loader) scheduler.step() init_end_event.record() if rank == 0: print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") print(f"{model}") if args.save_model: # use a barrier to make sure training is done on all ranks dist.barrier() states = model.state_dict() if rank == 0: torch.save(states, "mnist_cnn.pt") cleanup()
#2.5 Finally, parse the arguments and set the main function
if name == ‘main’:
# Training settings
parser = argparse.ArgumentParser(description=‘PyTorch MNIST Example’)
parser.add_argument(‘–batch-size’, type=int, default=64, metavar=‘N’,
help=‘input batch size for training (default: 64)’)
parser.add_argument(‘–test-batch-size’, type=int, default=1000, metavar=‘N’,
help=‘input batch size for testing (default: 1000)’)
parser.add_argument(‘–epochs’, type=int, default=10, metavar=‘N’,
help=‘number of epochs to train (default: 14)’)
parser.add_argument(‘–lr’, type=float, default=1.0, metavar=‘LR’,
help=‘learning rate (default: 1.0)’)
parser.add_argument(‘–gamma’, type=float, default=0.7, metavar=‘M’,
help=‘Learning rate step gamma (default: 0.7)’)
parser.add_argument(‘–no-cuda’, action=‘store_true’, default=False,
help=‘disables CUDA training’)
parser.add_argument(‘–seed’, type=int, default=1, metavar=‘S’,
help=‘random seed (default: 1)’)
parser.add_argument(‘–save-model’, action=‘store_true’, default=False,
help=‘For Saving the current Model’)
args = parser.parse_args()torch.manual_seed(args.seed) WORLD_SIZE = torch.cuda.device_count() mp.spawn(fsdp_main, args=(WORLD_SIZE, args), nprocs=WORLD_SIZE, join=True)
Executing gives following error:
[root@gt-pla-u25-08 3-fsdp]# python3 ex1-fsdp.py
W0528 06:38:51.695650 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54881 via signal SIGTERM
W0528 06:38:51.696214 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54882 via signal SIGTERM
W0528 06:38:51.697674 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54883 via signal SIGTERM
W0528 06:38:51.699138 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54884 via signal SIGTERM
W0528 06:38:51.701952 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54886 via signal SIGTERM
W0528 06:38:51.702436 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54887 via signal SIGTERM
W0528 06:38:51.705028 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54888 via signal SIGTERM
Traceback (most recent call last):
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 225, in
mp.spawn(fsdp_main,
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 281, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=“spawn”)
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 237, in start_processes
while not context.join():
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 188, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:– Process 4 terminated with the following error:
Traceback (most recent call last):
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 75, in _wrap
fn(i, *args)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 180, in fsdp_main
train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 87, in train
for batch_idx, (data, target) in enumerate(train_loader):
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py”, line 631, in next
data = self._next_data()
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py”, line 1346, in _next_data
return self._process_data(data)
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py”, line 1372, in _process_data
data.reraise()
File “/usr/local/lib64/python3.9/site-packages/torch/_utils.py”, line 705, in reraise
raise exception
IndexError: Caught IndexError in DataLoader worker process 0.
Original Traceback (most recent call last):
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/_utils/worker.py”, line 308, in _worker_loop
data = fetcher.fetch(index) # type: ignore[possibly-undefined]
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/_utils/fetch.py”, line 51, in fetch
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/_utils/fetch.py”, line 51, in
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/…/classes.py”, line 13, in getitem
return self.data[index]
IndexError: index 41548 is out of bounds for dimension 0 with size 30000
Increating train_size to 60000 gave following error:
[root@gt-pla-u25-08 3-fsdp]# python3 ex1-fsdp.py
W0528 06:41:33.675591 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59623 via signal SIGTERM
W0528 06:41:33.675977 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59624 via signal SIGTERM
W0528 06:41:33.676160 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59626 via signal SIGTERM
W0528 06:41:33.677502 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59627 via signal SIGTERM
W0528 06:41:33.679066 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59628 via signal SIGTERM
W0528 06:41:33.681031 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59629 via signal SIGTERM
W0528 06:41:33.683310 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59630 via signal SIGTERM
Traceback (most recent call last):
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 225, in
mp.spawn(fsdp_main,
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 281, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=“spawn”)
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 237, in start_processes
while not context.join():
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 188, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:– Process 2 terminated with the following error:
Traceback (most recent call last):
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 75, in _wrap
fn(i, *args)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 180, in fsdp_main
train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 87, in train
for batch_idx, (data, target) in enumerate(train_loader):
ValueError: too many values to unpack (expected 2)