How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel?

Typical EncoderDecoderModel that works on a Pre-coded Dataset

The code snippet snippet as below is frequently used to train an EncoderDecoderModel from Huggingface’s transformer library

from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast

multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)


tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")

...

And a pre-processed/coded dataset can be used to train the model as such, when using the wmt14 dataset:

import datasets

train_data = datasets.load_dataset("wmt14", "de-en", split="train")
val_data = datasets.load_dataset("wmt14", "de-en", split="validation[:10%]")


from functools import partial

def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512, batch_size=2): 
    inputs = tokenizer([segment["en"] for segment in batch['translation']], 
                       padding="max_length", truncation=True, max_length=encoder_max_length)
    outputs = tokenizer([segment["de"] for segment in batch['translation']], 
                       padding="max_length", truncation=True, max_length=encoder_max_length)


    batch["input_ids"] = inputs.input_ids
    batch["attention_mask"] = inputs.attention_mask
    batch["decoder_input_ids"] = outputs.input_ids
    batch["decoder_attention_mask"] = outputs.attention_mask
    batch["labels"] = outputs.input_ids.copy()

    # because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`. 
    # We have to make sure that the PAD token is ignored
    batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
    return batch


def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
    bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                         "decoder_attention_mask", "labels"]
    
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                                batch_size=batch_size
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True, 
                           batch_size=batch_size
                          )
    dataset.set_format(type="torch", columns=bert_wants_to_see)
    return dataset

train_data = munge_dataset_to_pacify_bert(train_data)
val_data = munge_dataset_to_pacify_bert(val_data)

Then the training can be done easily as such:

from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments


# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
    output_dir="./",
    evaluation_strategy="steps",
    ...
)


# instantiate trainer
trainer = Seq2SeqTrainer(
    model=multibert,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=train_data,
    eval_dataset=val_data,
)

trainer.train()

A working example can be found on something like: Neural Plasticity - Bert2Bert on WMT14 | Kaggle

However, parallel data used to an EncoderDecoderModel usually exists as .txt or .tsv files, not a pre-coded dataset

Given a large .tsv file (e.g. 1 billion lines), e.g.

hello world\tHallo Welt
how are you?\twie gehts?
...\t...

Step 1: we can convert into the parquet / pyarrow format, one can do something like:

import vaex  # Using vaex 
import sys

filename = "train.en-de.tsv"

df = vaex.from_csv(filename, sep="\t", header=None, names=["src", "trg"], convert=True, chunk_size=50_000_000)

df.export(f"{filename}.parquet")

Step 2: Then we will can read it into a Pyarrow table to fit into the datasets.Dataset object and use the munge_dataset_to_pacify_bert() as shown above, e.g

from datasets import Dataset, load_from_disk
import pyarrow as pa

_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset')

_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset')

train_data = munge_dataset_to_pacify_bert(_ds)

train_data.save_to_disk('train.en-de.tsv.parquet.hfdataset')

While the process above works well for small-ish dataset, e.g. 1-5 million lines of data, when the scale of the goes to 500 million to 1 billion, it seems like the last .save_to_disk() function is no where in sight.

Breaking down the steps in the munge_dataset_to_pacify_bert(), there are 2 sub-functions:

  • dataset.map(_process_data_to_model_inputs, batched=True, batch_size=batch_size)
  • dataset.set_format(type="torch", columns=bert_wants_to_see)

For the .map() process, it’s possible to scale in parallel threads by specifying by

dataset.map(_process_data_to_model_inputs, 
    batched=True, batch_size=batch_size, 
    num_proc=32  # num of parallel threads.
    )

And when I tried to process with

  • num_proc=32
  • batch_size=100

The .map() function finishes the processing of 500 million lines in 18 hours of compute time on Intel Xeon E5-2686 @ 2.3GHz with 32 processor cores, optimally.

But somehow the .map() function created 32 temp .arrow files and 128 tmp... binary files. Seemingly the last save_to_disk function has been running for more than 10+ hours and have not finished combining the temp files in parts to save the final HF Dataset to disk.


Given the above context, my questions in parts are:

Question (Part 1): When the mapping function ends and created the temp .arrow and tmp... files, is there a way to read these individually instead of try to save them into a final directory using the save_to_disk() function?


Question (Part 2): Why is the save_to_disk() function so slow after the mapping and how can the mapped processed data be saved in a faster manner?


Question (Part 3): Is there a way to avoid the .set_format() function after the .map() and make it part of the _process_data_to_model_inputs function?


Also asked on python - How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel? - Stack Overflow

Bumping the no. of hours the save_to_disk, 42 hours later, it’s still trying to save…

Also, asking on How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel? - 🤗Datasets - Hugging Face Forums