Can not use randomDataset call in spawned calls...?

I am learning the FSDP example here but they used example that are not downloadable (has download restiction). I am sick and tired of poorly written tutorial like this whereas they take examples of undownloadable dataset.

https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html

I tried to remedy by substiting the mnist dataset by writing completely randomized dataset creation (whole code pasted below) but I must be doing something with train_size which I set to 30,000 for randomized data creation:

WORLD_SIZE = torch.cuda.device_count()
mp.spawn(fsdp_main,
    args=(WORLD_SIZE, args),
    nprocs=WORLD_SIZE,
    join=True)

whole code:

import os
import argparse
import functools
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader

from torch.optim.lr_scheduler import StepLR
import torch.distributed as dist
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import (
CPUOffload,
BackwardPrefetch,
)
from torch.distributed.fsdp.wrap import (
size_based_auto_wrap_policy,
enable_wrap,
wrap,
)

CONFIG_USE_RANDOM_DATA=1

Parameters and DataLoaders

input_size = 1000
output_size = 10
batch_size = 1000
data_size = 60000
train_size = 30000
valid_size = 20000
tset_size = 10000
def setup(rank, world_size):
os.environ[‘MASTER_ADDR’] = ‘localhost’
os.environ[‘MASTER_PORT’] = ‘12355’

# initialize the process group
dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
dist.destroy_process_group()

class Net(nn.Module):
def init(self):
super(Net, self).init()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)

def forward(self, x):

    x = self.conv1(x)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = F.max_pool2d(x, 2)
    x = self.dropout1(x)
    x = torch.flatten(x, 1)
    x = self.fc1(x)
    x = F.relu(x)
    x = self.dropout2(x)
    x = self.fc2(x)
    output = F.log_softmax(x, dim=1)
    return output

def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None):
model.train()
ddp_loss = torch.zeros(2).to(rank)
if sampler:
sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target, reduction=‘sum’)
loss.backward()
optimizer.step()
ddp_loss[0] += loss.item()
ddp_loss[1] += len(data)

dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)
if rank == 0:
    print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1]))

def test(model, rank, world_size, test_loader):
model.eval()
correct = 0
ddp_loss = torch.zeros(3).to(rank)
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(rank), target.to(rank)
output = model(data)
ddp_loss[0] += F.nll_loss(output, target, reduction=‘sum’).item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item()
ddp_loss[2] += len(data)

dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)

if rank == 0:
    test_loss = ddp_loss[0] / ddp_loss[2]
    print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
        test_loss, int(ddp_loss[1]), int(ddp_loss[2]),
        100. * ddp_loss[1] / ddp_loss[2]))

def fsdp_main(rank, world_size, args):
setup(rank, world_size)

transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

if not CONFIG_USE_RANDOM_DATA:
    train_kwargs = {'batch_size': args.batch_size}
    test_kwargs = {'batch_size': args.test_batch_size}
    cuda_kwargs = {'num_workers': 2,
                    'pin_memory': True,
                    'shuffle': False}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)
else:
    dataset1 = datasets.MNIST('../data', train=True, download=True,
                        transform=transform)
    dataset2 = datasets.MNIST('../data', train=False,
    sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True)
    sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size)

    train_kwargs = {'batch_size': args.batch_size, 'sampler': sampler1}
    test_kwargs = {'batch_size': args.test_batch_size, 'sampler': sampler2}
    cuda_kwargs = {'num_workers': 2,
                    'pin_memory': True,
                    'shuffle': False}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)

if CONFIG_USE_RANDOM_DATA:
    train_loader = torch.utils.data.DataLoader(dataset=RandomDataset(input_size, train_size), **train_kwargs)
    test_loader = torch.utul.data.DataLoader(dataset=RandomDataset(input_size, test_size), **test_kwargs)
else:
    train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

my_auto_wrap_policy = functools.partial(
    size_based_auto_wrap_policy, min_num_params=100
)
torch.cuda.set_device(rank)


init_start_event = torch.cuda.Event(enable_timing=True)
init_end_event = torch.cuda.Event(enable_timing=True)

model = Net().to(rank)

model = FSDP(model)

optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
init_start_event.record()
for epoch in range(1, args.epochs + 1):
    train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
    test(model, rank, world_size, test_loader)
    scheduler.step()

init_end_event.record()

if rank == 0:
    print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec")
    print(f"{model}")

if args.save_model:
    # use a barrier to make sure training is done on all ranks
    dist.barrier()
    states = model.state_dict()
    if rank == 0:
        torch.save(states, "mnist_cnn.pt")

cleanup()

#2.5 Finally, parse the arguments and set the main function

if name == ‘main’:
# Training settings
parser = argparse.ArgumentParser(description=‘PyTorch MNIST Example’)
parser.add_argument(‘–batch-size’, type=int, default=64, metavar=‘N’,
help=‘input batch size for training (default: 64)’)
parser.add_argument(‘–test-batch-size’, type=int, default=1000, metavar=‘N’,
help=‘input batch size for testing (default: 1000)’)
parser.add_argument(‘–epochs’, type=int, default=10, metavar=‘N’,
help=‘number of epochs to train (default: 14)’)
parser.add_argument(‘–lr’, type=float, default=1.0, metavar=‘LR’,
help=‘learning rate (default: 1.0)’)
parser.add_argument(‘–gamma’, type=float, default=0.7, metavar=‘M’,
help=‘Learning rate step gamma (default: 0.7)’)
parser.add_argument(‘–no-cuda’, action=‘store_true’, default=False,
help=‘disables CUDA training’)
parser.add_argument(‘–seed’, type=int, default=1, metavar=‘S’,
help=‘random seed (default: 1)’)
parser.add_argument(‘–save-model’, action=‘store_true’, default=False,
help=‘For Saving the current Model’)
args = parser.parse_args()

torch.manual_seed(args.seed)

WORLD_SIZE = torch.cuda.device_count()
mp.spawn(fsdp_main,
    args=(WORLD_SIZE, args),
    nprocs=WORLD_SIZE,
    join=True)

Executing gives following error:

[root@gt-pla-u25-08 3-fsdp]# python3 ex1-fsdp.py
W0528 06:38:51.695650 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54881 via signal SIGTERM
W0528 06:38:51.696214 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54882 via signal SIGTERM
W0528 06:38:51.697674 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54883 via signal SIGTERM
W0528 06:38:51.699138 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54884 via signal SIGTERM
W0528 06:38:51.701952 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54886 via signal SIGTERM
W0528 06:38:51.702436 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54887 via signal SIGTERM
W0528 06:38:51.705028 140375835883328 torch/multiprocessing/spawn.py:145] Terminating process 54888 via signal SIGTERM
Traceback (most recent call last):
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 225, in
mp.spawn(fsdp_main,
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 281, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=“spawn”)
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 237, in start_processes
while not context.join():
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 188, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:

– Process 4 terminated with the following error:
Traceback (most recent call last):
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 75, in _wrap
fn(i, *args)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 180, in fsdp_main
train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 87, in train
for batch_idx, (data, target) in enumerate(train_loader):
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py”, line 631, in next
data = self._next_data()
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py”, line 1346, in _next_data
return self._process_data(data)
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py”, line 1372, in _process_data
data.reraise()
File “/usr/local/lib64/python3.9/site-packages/torch/_utils.py”, line 705, in reraise
raise exception
IndexError: Caught IndexError in DataLoader worker process 0.
Original Traceback (most recent call last):
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/_utils/worker.py”, line 308, in _worker_loop
data = fetcher.fetch(index) # type: ignore[possibly-undefined]
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/_utils/fetch.py”, line 51, in fetch
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/usr/local/lib64/python3.9/site-packages/torch/utils/data/_utils/fetch.py”, line 51, in
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/…/classes.py”, line 13, in getitem
return self.data[index]
IndexError: index 41548 is out of bounds for dimension 0 with size 30000

Increating train_size to 60000 gave following error:

[root@gt-pla-u25-08 3-fsdp]# python3 ex1-fsdp.py
W0528 06:41:33.675591 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59623 via signal SIGTERM
W0528 06:41:33.675977 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59624 via signal SIGTERM
W0528 06:41:33.676160 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59626 via signal SIGTERM
W0528 06:41:33.677502 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59627 via signal SIGTERM
W0528 06:41:33.679066 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59628 via signal SIGTERM
W0528 06:41:33.681031 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59629 via signal SIGTERM
W0528 06:41:33.683310 139771223697216 torch/multiprocessing/spawn.py:145] Terminating process 59630 via signal SIGTERM
Traceback (most recent call last):
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 225, in
mp.spawn(fsdp_main,
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 281, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=“spawn”)
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 237, in start_processes
while not context.join():
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 188, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:

– Process 2 terminated with the following error:
Traceback (most recent call last):
File “/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 75, in _wrap
fn(i, *args)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 180, in fsdp_main
train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 87, in train
for batch_idx, (data, target) in enumerate(train_loader):
ValueError: too many values to unpack (expected 2)

It seems your custom Dataset returns a single tensor:

so either assign the data only or additionally return a target.

Contributions are more than welcome in case you want to improve tutorials. :slight_smile:

actually title is misnomer, initially randomdataset was unrecognized and i was scratching my head where it was imported from. turns out another example somewhere in their site had class defined so i picked up that code but still does not work because other code example uses it differently:

for data in rand_loader:

after defining as:

class RandomDataset(Dataset):

def __init__(self, size, length):
    print("GG: RandomDataset.__init__(size=", size, "length: ", length, ")")
    self.len = length
    self.data = torch.randn(length, size)

def __getitem__(self, index):
    print("GG: RandomDataset.__getitem__(index=", index, ")")
    return self.data[index]

def __len__(self):
    print("GG: RandomDataset.__len__() returning self.len: ", self.len)
    return self.len

In this example,
for batch_idx, (data, target) in enumerate(train_loader):

So i made following mods for this example:

#for batch_idx, (data, target) in enumerate(train_loader):
for data, target in train_loader:

since batch_idx does not seem to be used anywhere anyways.
and defined randomdataset1 slightly modded version as well:

class RandomDataset1(Dataset):

def __init__(self, size, length):
    self.len = length
    self.data = torch.randn(length, size)
    self.label = torch.randn(length, 1)

def __getitem__(self, index):
    return self.data[index], self.label[index]

def __len__(self):
    return self.len

to include label and return both data, and label.
After run, similar error, so still looking at what went wrong…

GG: train entered: rank: 1 : world_size: 2 , train_loader: <torch.utils.data.dataloader.DataLoader object at 0x7f25c54c2cd0> , epoch: 1
W0529 05:57:28.003877 140141330396992 torch/multiprocessing/spawn.py:145] Terminating process 147 via signal SIGTERM
Traceback (most recent call last):
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 205, in
mp.spawn(fsdp_main,
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 281, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=“spawn”)
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 237, in start_processes
while not context.join():
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 188, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:

– Process 0 terminated with the following error:
Traceback (most recent call last):
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/multiprocessing/spawn.py”, line 75, in _wrap
fn(i, *args)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 159, in fsdp_main
train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
File “/root/extdir/gg/git/codelab/gpu/ml/pytorch/distributed/tutorials/3-fsdp/ex1-fsdp.py”, line 63, in train
for data, target in train_loader:
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/utils/data/dataloader.py”, line 631, in next
data = self._next_data()
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/utils/data/dataloader.py”, line 1346, in _next_data
return self._process_data(data)
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/utils/data/dataloader.py”, line 1372, in _process_data
data.reraise()
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/_utils.py”, line 705, in reraise
raise exception
IndexError: Caught IndexError in DataLoader worker process 0.
Original Traceback (most recent call last):
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/utils/data/_utils/worker.py”, line 308, in _worker_loop
data = fetcher.fetch(index) # type: ignore[possibly-undefined]
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/utils/data/_utils/fetch.py”, line 51, in fetch
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/utils/data/_utils/fetch.py”, line 51, in
data = [self.dataset[idx] for idx in possibly_batched_index]
File “…/classes.py”, line 55, in getitem
return self.data[index], self.label[index]
IndexError: index 35845 is out of bounds for dimension 0 with size 30000

The index used in __getitem__ is invalid so instead of defining the self.len manually, compute it from the actually used data:

def __len__(self):
    return len(self.data)

nice catch! seems like working. I would try investigate why self.len worked on other example and here is not. Might have to do with actual sharding.

I am getting shape error now but that should be taken care of creating randomdataset to be same shape as mnist.

return self._conv_forward(input, self.weight, self.bias)

File “/home/miniconda3/envs/root-test/lib/python3.9/site-packages/torch/nn/modules/conv.py”, line 456, in _conv_forward
return F.conv2d(input, weight, bias, self.stride,
RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [64, 10]