How to inference LLM with Multi-GPU

Hi,

I’m trying to only inference LLMs(llama 3.2 1B Instruct & llama 3.2 3B Instruct) in multi-GPU server.

I’ve tried to use pytorch DDP(DistributedDataParallel) while it keeps facing error in the end step gathering.

Below is my code.

import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import AutoTokenizer, AutoModelForCausalLM, logging as transformers_logging
import pandas as pd
import warnings
import nltk
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from bert_score import score as bert_score
from torch.utils.data import Dataset, DataLoader, DistributedSampler
from tqdm import tqdm
from torch.cuda.amp import autocast

torch.cuda.empty_cache()

nltk.download('wordnet')

warnings.filterwarnings("ignore", category=UserWarning, message=".*Some weights of RobertaModel were not initialized.*")
warnings.filterwarnings("ignore", category=UserWarning, message=".*The hypothesis contains 0 counts of.*")
transformers_logging.set_verbosity_error()


os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"

# DDP init
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

# DDP cleanup
def cleanup():
    dist.destroy_process_group()

# dataset class
class EvaluationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        return {
            'findings': row['findings'],
            'impression': row['impression'],
            'model_output_1b': row.get('model_output_1b', None),
            'model_output_3b': row.get('model_output_3b', None),
            'idx': idx 
        }

# model, data load
def load_model_and_data(rank, world_size, data_path, batch_size=8):
    setup(rank, world_size)
    
    # init model & tokenizer
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-3B-Instruct")
    model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-3B-Instruct").to(rank)
    model = DDP(model, device_ids=[rank])

    # pad token
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # ROUGE scorer init
    rouge_scorer_instance = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

    # data load
    print(f"[Rank {rank}] Loading data...")
    data = pd.read_csv(data_path)
    dataset = EvaluationDataset(data)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
    dataloader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)

    # eval
    print(f"[Rank {rank}] Starting evaluation...")
    smoothing = SmoothingFunction().method1
    results = []

    for batch in tqdm(dataloader, desc=f"Evaluating on rank {rank}"):
        findings = batch['findings']
        reference = batch['impression']
        idx = batch['idx'] 
        
        for model_type in ['1b', '3b']:
            prediction = batch[f'model_output_{model_type}']
            
         
            if isinstance(reference, list):
                reference = " ".join(reference)
            if isinstance(prediction, list):
                prediction = " ".join(prediction)
            
  
            rouge_scores = rouge_scorer_instance.score(reference, prediction)
            bleu = sentence_bleu([reference.split()], prediction.split(), smoothing_function=smoothing)
            meteor = meteor_score([reference.split()], prediction.split())
            _, _, bert_f1 = bert_score([prediction], [reference], lang="en")
            
 
            prompts = [f"""
            Given the findings: "{findings}"
            
            Evaluate the following summary based on these findings and rate each aspect on a int scale of 1 to 10:
            
            Summary: "{summary}"
            """ for findings, summary in zip(findings, [prediction] * len(findings))]
            
            with autocast():
                inputs = tokenizer(prompts, return_tensors="pt", padding=True, truncation=True).to(rank)
                outputs = model.module.generate(inputs.input_ids, max_new_tokens=50, pad_token_id=tokenizer.eos_token_id)
                output_texts = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
            
            relevance, coherence, consistency, fluency = None, None, None, None
            for output_text in output_texts:
                try:
                    relevance = int(output_text.split("Relevance=")[1].split("\n")[0].strip())
                    coherence = int(output_text.split("Coherence=")[1].split("\n")[0].strip())
                    consistency = int(output_text.split("Consistency=")[1].split("\n")[0].strip())
                    fluency = int(output_text.split("Fluency=")[1].split("\n")[0].strip())
                except (IndexError, ValueError):
                    pass

            results.append({
                'idx': idx[0].item(),
                'findings': findings[0],
                'impression': reference,
                'model': model_type,
                f'ROUGE-1_{model_type}': rouge_scores['rouge1'].fmeasure,
                f'ROUGE-2_{model_type}': rouge_scores['rouge2'].fmeasure,
                f'ROUGE-L_{model_type}': rouge_scores['rougeL'].fmeasure,
                f'BLEU_{model_type}': bleu,
                f'METEOR_{model_type}': meteor,
                f'BERTScore_{model_type}': bert_f1.mean().item(),
                f'Relevance_{model_type}': relevance,
                f'Coherence_{model_type}': coherence,
                f'Consistency_{model_type}': consistency,
                f'Fluency_{model_type}': fluency
            })


    rank_results_path = f'rank_{rank}_results.csv'
    pd.DataFrame(results).to_csv(rank_results_path, index=False)
    print(f"[Rank {rank}] Intermediate results saved to '{rank_results_path}'")

    cleanup()
    print(f"[Rank {rank}] Cleanup completed. Process finished.")


def main():
    world_size = 4 
    data_path = 'bla.csv'
    batch_size = 8 
    torch.multiprocessing.spawn(load_model_and_data, args=(world_size, data_path, batch_size), nprocs=world_size, join=True)

 
    if dist.is_initialized() and dist.get_rank() == 0:
        all_rank_files = [f'rank_{rank}_results.csv' for rank in range(world_size)]
        combined_df = pd.concat([pd.read_csv(file) for file in all_rank_files])

        original_data = pd.read_csv(data_path)
        final_df = pd.merge(original_data, combined_df, on='idx', how='left')

        final_df.to_csv('evaluation_with_all_metrics.csv', index=False)
        print("All ranks have completed. Final results saved to 'evaluation_with_all_metrics.csv'")
        

        for file in all_rank_files:
            os.remove(file)
            print(f"Deleted intermediate file '{file}'")

if __name__ == "__main__":
    print("Starting distributed evaluation...")
    main()
    print("Distributed evaluation completed.")

cc: @kwen2501 @lessw2020