I’m trying to build a sequential pipeline of multiple worker processes sharing a CUDA tensor through queues. Everything seems to work fine for a sequence of 2 workers. However, if I add a third one I get:
RuntimeError: invalid device pointer: 0x7efe3f400000 at /pytorch/aten/src/THC/THCCachingAllocator.cpp:301
When using device=‘cpu’ everything works fine.
Example code:
import os
import torch
from torch.multiprocessing import set_start_method, Queue, spawn
try:
set_start_method('spawn')
except RuntimeError:
pass
device = 'cuda'
def reader(i, out_q):
while True:
try:
if not out_q.full():
data = torch.ones((3, 256, 256)).float().to(device).share_memory_()
out_q.put(data)
print('reader: added frame')
except Exception as e:
print('Reader ERROR:', e)
def worker1(i, in_q, out_q):
while True:
try:
if not out_q.full():
data = in_q.get()
out_q.put(data)
print('worker 1: passed frame')
except Exception as e:
print('Worker 1 ERROR:', e)
def worker2(i, in_q):
while True:
try:
data = in_q.get()
print('worker 2: received frame')
except Exception as e:
print('Worker 2 ERROR:', e)
def run():
in_q = Queue(maxsize=1)
out_q = Queue(maxsize=1)
r = spawn(reader, args=(in_q, ), join=False)
w1 = spawn(worker1, args=(in_q, out_q), join=False)
w2 = spawn(worker2, args=(out_q,), join=False)
while True:
pass
if __name__ == '__main__':
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
run()
Full output:
reader: added frame
reader: added frame
worker 1: passed frame
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 213, in reduce_tensor
(device, handle, storage_size_bytes, storage_offset_bytes) = storage._share_cuda_()
RuntimeError: invalid device pointer: 0x7efe3f400000 at /pytorch/aten/src/THC/THCCachingAllocator.cpp:301