How to use Torchdata to load tar files from s3 so that the datapipe return `(sampel, label)`

I’m trying to learn more about torchdata datapipes.

I’m trying to load a .tar file from s3, which contains wavs and text labels in the form of wav and json files. I hope to end up with a dataset that can be passed to DataLoader(...).

So far, I have:

The code below takes an s3 dir and lists all files in the bucket, for example, returning s3://my_bucket/0.tar

dp_s3_urls = td.datapipes.iter.IterableWrapper(["s3://my_bucket/"])\
        .list_files_by_s3()\
        .shuffle()\
        .sharding_filter()

Load the tar file:

td.datapipes.iter.S3FileLoader(dp_s3_urls).load_from_tar()

which returns:

('s3://my_bucket/0.tar/data/1.json', StreamWrapper<s3:/my_bucket/0.tar/data/1.json,<ExFileObject name=None>>)
('s3://my_bucket/0.tar/data/1.wav', StreamWrapper<s3:/my_bucket/0.tar/data/1.wav,<ExFileObject name=None>>)

I’m stuck with loading this information into a tensor using TorchAudio etc.

If your tar file is structured in a way such that you always get “1.json” before “1.wav”, you can do dp.batch(2), which will give you [(s3_path_json, JsonFileStream), (s3_path_wav, WavFileStream)]. You can write your own map function and pass it into dp.map(fn) to handle those afterward as you see fit.

If the order is not guaranteed, you can use dp.demux(2, _fn) to separate them into JSON and .wav files (and process them separately if you like). Then use one of the zip_with_iter to join them back together.

Thank you for the help @nivek.

Here is the code for anyone interested:

import io
import json
import torchdata
import soundfile

def to_sampels(data):
		a, t = data
		return soundfile.read(io.BytesIO(a[1].read())), json.loads(t[1].read().decode('utf-8'))

dp_s3_urls = torchdata.datapipes.iter.IterableWrapper(["s3://m_bucket/"])\
			.list_files_by_s3()\
			.shuffle()\
			.sharding_filter()

datapipe = torchdata.datapipes.iter.S3FileLoader(dp_s3_urls)\
			.load_from_tar() \
			.batch(2) \
			.map(to_sampels)

for i in dp_s3_urls:
    print(i)

In this example:

dp_s3_urls take a s3 directory find all file in the dir, and outputs a shuffled list of s3 directories (e.g. [“s3://m_bucket/0.tar”, “s3://m_bucket/1.tar”, …, “s3://m_bucket/n.tar”]).

datapipe takes these URLs and loads them into memory, the batch function as mentioned by @nivek combines the (sample, labels). You would also use the following to achieve the same result to group your (sample, labels):

datapipe.groupby(lambda x: os.path.basename(x[0]).split(".")[0],group_size=2, guaranteed_group_size=2)

.

to_sample takes the given streamed data, reads the data and loads the audio using soundfile. The JSON is loaded as a byte string, so this is decoded using utf-8 and finally loaded via the json library.

As mentioned in this post, you can get:

ValueError: curlCode: 77, Problem with the SSL CA cert (path? access rights?)

This was fixed by chaging from S3FileLoader(...) to using FSSpecFileLoader

from torch.utils.data import DataLoader

def to_sampels(data):
		a, t = data
		return soundfile.read(io.BytesIO(a[1].read())), json.loads(t[1].read().decode('utf-8'))

datapipe = torchdata.datapipes.iter.IterableWrapper(["s3://m_bucket/"])\
			.list_files_by_fsspec()\
			.shuffle()\
			.sharding_filter()\
			.open_files_by_fsspec(mode='rb')\
			.load_from_tar() \
			.batch(2) \
			.map(to_sampels)

dataloader = DataLoader(dapapipe, batch_size=1, num_workers=6)

for i in dataloader:
    print(i)

Thanks for the solution. I have problem when using io.BytesIO(data[0][1]) that TypeError: a bytes-like object is required, not ‘StreamWrapper’. Do you have any idea about that? Thanks!

Do you have a minimal code you could share?