Thank you for the help @nivek.
Here is the code for anyone interested:
import io
import json
import torchdata
import soundfile
def to_sampels(data):
a, t = data
return soundfile.read(io.BytesIO(a[1].read())), json.loads(t[1].read().decode('utf-8'))
dp_s3_urls = torchdata.datapipes.iter.IterableWrapper(["s3://m_bucket/"])\
.list_files_by_s3()\
.shuffle()\
.sharding_filter()
datapipe = torchdata.datapipes.iter.S3FileLoader(dp_s3_urls)\
.load_from_tar() \
.batch(2) \
.map(to_sampels)
for i in dp_s3_urls:
print(i)
In this example:
dp_s3_urls
take a s3
directory find all file in the dir, and outputs a shuffled list of s3 directories (e.g. [“s3://m_bucket/0.tar”, “s3://m_bucket/1.tar”, …, “s3://m_bucket/n.tar”]).
datapipe
takes these URLs and loads them into memory, the batch function as mentioned by @nivek combines the (sample, labels)
. You would also use the following to achieve the same result to group your (sample, labels)
:
datapipe.groupby(lambda x: os.path.basename(x[0]).split(".")[0],group_size=2, guaranteed_group_size=2)
.
to_sample
takes the given streamed data, reads the data and loads the audio using soundfile
. The JSON is loaded as a byte string, so this is decoded using utf-8
and finally loaded via the json
library.
As mentioned in this post, you can get:
ValueError: curlCode: 77, Problem with the SSL CA cert (path? access rights?)
This was fixed by chaging from S3FileLoader(...)
to using FSSpecFileLoader
from torch.utils.data import DataLoader
def to_sampels(data):
a, t = data
return soundfile.read(io.BytesIO(a[1].read())), json.loads(t[1].read().decode('utf-8'))
datapipe = torchdata.datapipes.iter.IterableWrapper(["s3://m_bucket/"])\
.list_files_by_fsspec()\
.shuffle()\
.sharding_filter()\
.open_files_by_fsspec(mode='rb')\
.load_from_tar() \
.batch(2) \
.map(to_sampels)
dataloader = DataLoader(dapapipe, batch_size=1, num_workers=6)
for i in dataloader:
print(i)