Dataset creation from live data feed


I am trying to create an end-to-end pipeline of models that can do OCR in a “standard” way: a first model receives an image, detects lines, outputs line coordinates; a second model takes images + lines, crops lines out, and does the text detection. My question is how to make the data pipeline for the second model, so to keep latency as low as possible, high GPU load, while being able to accept new files constantly.

As it is rather CPU-heavy (things like non-max suppression and image rotation do take time), I need multiprocessing. I have tried the following pseudocode-setup for getitem:

  • if there is a crop in the buffer, return it
  • otherwise, look for images to crop from;
    • if you find one (use file locks to avoid interactions with other workers and), crop all the lines in that image and return the first, add the rest to the buffer
    • otherwise sleep 10ms and check again

But this creates two issues for me:

  • The main one is that if I have, say, 4 workers and one of them doesn’t have any file left to process, the dataloader will dutifully wait for it to end without asking data from the other workers. So at any time there will be three unfinished files left until a new one arrives. I could partially fix this by adding crops to a “common” buffer across all workers saving crops on disk, but (1) passing through the disk is very suboptimal and (2) it will only reduce the problem from having “3 files waiting” to having “3 lines waiting”, but that still means at least one file left unprocessed…
  • The second problem, similar but more easily solvable with some hacking, comes in with batching. Even if the above is solved, I would at some point have an incomplete batch of lines and no further file to process. In that case it should be more efficient to pad to a full batch and disregard the final images, but I guess that to do that I have to disable batching and do it by hand inside the inference code (i.e. loop until I either get N images if at least one worker is working, or pad if all workers are waiting or after a small timeout), or is there a cleaner way?

So, long story short: is there a command to make a dataset worker tell the data loader “skip me, go to the next worker”? Like a specific exception that could be raised and somehow caught inside the data loader?

Thanks in advance for any hint!