I’m trying to build a sequential pipeline of multiple worker processes sharing a CUDA tensor through queues. Everything seems to work fine for a sequence of 2 workers. However, if I add a third one I get:
RuntimeError: invalid device pointer: 0x7efe3f400000 at /pytorch/aten/src/THC/THCCachingAllocator.cpp:301
When using device=‘cpu’ everything works fine.
import os import torch from torch.multiprocessing import set_start_method, Queue, spawn try: set_start_method('spawn') except RuntimeError: pass device = 'cuda' def reader(i, out_q): while True: try: if not out_q.full(): data = torch.ones((3, 256, 256)).float().to(device).share_memory_() out_q.put(data) print('reader: added frame') except Exception as e: print('Reader ERROR:', e) def worker1(i, in_q, out_q): while True: try: if not out_q.full(): data = in_q.get() out_q.put(data) print('worker 1: passed frame') except Exception as e: print('Worker 1 ERROR:', e) def worker2(i, in_q): while True: try: data = in_q.get() print('worker 2: received frame') except Exception as e: print('Worker 2 ERROR:', e) def run(): in_q = Queue(maxsize=1) out_q = Queue(maxsize=1) r = spawn(reader, args=(in_q, ), join=False) w1 = spawn(worker1, args=(in_q, out_q), join=False) w2 = spawn(worker2, args=(out_q,), join=False) while True: pass if __name__ == '__main__': os.environ["CUDA_VISIBLE_DEVICES"] = "0" run()
reader: added frame reader: added frame worker 1: passed frame Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 213, in reduce_tensor (device, handle, storage_size_bytes, storage_offset_bytes) = storage._share_cuda_() RuntimeError: invalid device pointer: 0x7efe3f400000 at /pytorch/aten/src/THC/THCCachingAllocator.cpp:301