Multiprocessing using torch.multiprocessing

I’m trying to get something working similarly to keras’ “fit_generator” method. Basically, I have a (very) large data file of mini-batches and I want to have my CPU grab mini-batches and populate a queue parallel to my GPU taking mini-batches from the queue and training on them. By having the CPU work in parallel to the GPU (as opposed to having the CPU grab a batch and making the GPU wait for the CPU before it trains on that batch) I should be able to reduce my training time by about half. I have benchmarked how long it takes the CPU to grab a mini-batch, and it’s taking a comparable amount of time to how long it takes my GPU to train on one mini-batch, so parallelizing the CPU and GPU should work alright. I haven’t found a built-in method in pytorch to do this, if there is one, please let me know. I did take a look at the data_loader in the utils; however, my understanding of the multiprocessing there is that it uses multiple workers to load the data (for faster data-loading) and it has nothing to do with making the loading of the data parallel to the training on the data. Is that a correct understanding?

So I have tried to use the torch.multiprocessing module to do what I want, but I’m not able to complete the training as I always get some sort of error right before training is completed. Basically, I have set up my code to have 2 functions, a loader function, and a trainer function like so:

    def data_gen(que,PATH,epochs,steps_per_epoch,batch_size=32):
        for epoch in range(epochs):
            for j in range(steps_per_epoch):
                with h5py.File(PATH,'r') as f:
                    X = f['X'][j*batch_size:(j+1)*batch_size]
                    Y = f['Y'][j*batch_size:(j+1)*batch_size]

                    X = autograd.Variable(torch.Tensor(X).resize_(batch_size,256,25).cpu())
                    Y = autograd.Variable(torch.Tensor(Y).cpu())

                    que.put((X,Y))
        que.put(None)
        que.close()
        return

    def train_network(que,net,optimizer,epochs,steps):
        print('Training for %s epochs...' %epochs)
        for epoch in range(epochs):
            for step in range(steps):
                data = que.get()
                if(data == None):
                    break
                net.zero_grad()
                net.hid = net.init_hid()
                inp,labels = data
                inp   = inp.cuda()
                labels = labels.cuda()
                out,hid = net(inp)
                loss = F.binary_cross_entropy(out,labels)
                loss.backward()
                optimizer.step()
        return

And then I run the two processes in parallel like so:

    if __name__ == '__main__':
        tmp.set_start_method('spawn')
        que = tmp.Queue(maxsize=10)
        loader = tmp.Process(target=data_gen, args=(que,PATH,epochs,steps), kwargs={'batch_size':batch_size})
        loader.start()
        trainer = tmp.Process(target=train_network, args=(que,net,optimizer,epochs,steps))
        trainer.start()
        loader.join()
        trainer.join()

I have the que put in a None value at the end so I can break out of the loop in case my training loop for some reason tries to run on more minibatches than I have. I am 99% confident though that the loader and the trainer should run for an equal number of total steps. The code runs, and it does appear to speed up the training process (I’ve been trying to prototype this code on a small subset of the data so it’s sometimes hard to tell how much speed up I’m getting), but at the end of training, I always get an error:

    Process Process-2:
    Traceback (most recent call last):
      File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
        self.run()
      File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "/media/digitalstorm/Storage/RNN_Prototype/Lazuli_rnnprototype.py", line 307, in train_network
        data = que.get()
      File "/usr/lib/python3.6/multiprocessing/queues.py", line 113, in get
        return _ForkingPickler.loads(res)
      File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
        fd = df.detach()
      File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 57, in detach
        with _resource_sharer.get_connection(self._id) as conn:
      File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 87, in get_connection
        c = Client(address, authkey=process.current_process().authkey)
      File "/usr/lib/python3.6/multiprocessing/connection.py", line 487, in Client
        c = SocketClient(address)
      File "/usr/lib/python3.6/multiprocessing/connection.py", line 614, in SocketClient
        s.connect(address)
    FileNotFoundError: [Errno 2] No such file or directory

or, if I mess around a bit with where I put the que.close() statement, I sometimes get an error like this:

    Process Process-2:
    Traceback (most recent call last):
      File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
        self.run()
      File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "/media/digitalstorm/Storage/RNN_Prototype/Lazuli_rnnprototype.py", line 306, in train_network
        data = que.get()
      File "/usr/lib/python3.6/multiprocessing/queues.py", line 113, in get
        return _ForkingPickler.loads(res)
      File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
        fd = df.detach()
      File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 58, in detach
        return reduction.recv_handle(conn)
      File "/usr/lib/python3.6/multiprocessing/reduction.py", line 182, in recv_handle
        return recvfds(s, 1)[0]
      File "/usr/lib/python3.6/multiprocessing/reduction.py", line 153, in recvfds
        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
    ConnectionResetError: [Errno 104] Connection reset by peer

I don’t know where I’m going wrong. Admittedly, I am a novice at multiprocessing, so it’s hard for me to debug what went wrong exactly. Any help would be appreciated, thanks!

EDIT: I have done a bit more debugging, and the point at which one of these errors is thrown is linearly proportional to the que size. The larger the que size, the earlier the error is thrown. I think the issue has to be that the loader is shutting down the que after it finishes loading all the items to que and the trainer is subsequently unable to access the que. I don’t know why this is happening though. I looked around online and saw that maybe the fix is to add “trainer.join()” and “loader.join()” after I start them, however, I did do that and the result is unchanged. I’ve also deleted the “que.close()” statement just in case that was the issue (even though the documentation of multiprocessing says that que.close won’t actually shut down the que until all the items have left the que) and that didn’t help either.

I’ve found a work around to this problem by forcing the loader to go to sleep for 2 seconds after it finishes loading everything. This gives time for the trainer to finish training before the loader comes back and shuts everything down. I can’t explain why the behavior was this way though.

@Enumaris why didn’t you just use DDP (DistributedDataParallel — PyTorch 1.7.1 documentation)?