I am trying to prefetch multiple serialized objects including but not limited to tensor object using multiprocessing.
The objects are serialized with pickle, to avoid tensors beeing deserialized on gpu I have to use the custom Unpickler CPU_Unpickler
that sets the map_location to ‘cpu’.
def data_gen(files: List[Bytes]):
for file in files:
result = get(file)
yield result
def get(obj: Bytes) -> Any:
unpickler = CPU_Unpickler(obj)
result = unpickler.load()
return result
class CPU_Unpickler(pickle.Unpickler):
def find_class(self, module, name):
if module == "torch.storage" and name == "_load_from_bytes":
def closing_bytes(b: bytes):
with io.BytesIO(b) as f:
res = torch.load(f, map_location="cpu")
return res
return lambda b: closing_bytes(b)
else:
return super().find_class(module, name)
with ParallelGenerator(
data_gen(files), max_lookahead=1
) as g:
for data in g:
Here ParallelGenerator is a multiprocessing wrapper like(multiprocessing-generator · PyPI) that Prefetches from the generator data_gen
in the background so that what ever happens in the for loop that iterates over the generator doesnt have to wait for the results.
Unfortunatelly this does not work and produces the following cryptic errors:
File ".../lib/python3.11/multiprocessing/queues.py", line 244, in _feed
File ".../lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
File ".../python3.11/site-packages/torch/multiprocessing/reductions.py", line 568, in reduce_storage
File ".../python3.11/site-packages/torch/storage.py", line 304, in wrapper
File ".../python3.11/site-packages/torch/storage.py", line 374, in _share_fd_cpu_
RuntimeError: unable to open shared memory object </torch_128493_2623044343_478> in read-write mode: Too many open files (24)
I am quite confident this happens in the result = unpickler.load()
call and i am guessing that it is related to torch.load(f, map_location="cpu")
becouse of the error messages.
So any clue to why this might happen or some better way of solving the problem to prefetch data that is heavily io-bound in batches like i am trying to do here?