Hi, I am doing an experiment mimicking a classic federated learning setting on CIFAR10 with ResNet18 for image classification. I divided the dataset itself into a bunch of training set each for one client. I calculated the full gradient of each client in each epoch (by deep copying the global model and do loss.backward() on each of the mini batch after scaling it.) and then I would like to apply a compressor to the gradient. The compressed gradients for each clients are then averaged and applied to each parameter in a standard gradient descent fashion. However, I find the model fails to learn with no improvements in training losses and accuracy, even if I do not apply the compressor at all (which means I just take the gradients and average them).
Here is my code:
def get_gradient(model, client_loader, criterion, device, configs):
model.to(device)
model.zero_grad()
model.train()
for inputs, targets in client_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
batch_size = inputs.size(0)
scaled_loss = loss * (batch_size / len(client_loader.dataset))
scaled_loss.backward()
gradients = []
for param in model.parameters():
if param.grad is not None:
gradients.append(param.grad.clone().detach())
else:
gradients.append(None)
gradients.append(param.grad.clone().detach())
return gradients
def flatten_gradients(grad_list):
flat = [g.view(-1) for g in grad_list if g is not None]
return torch.cat(flat)
def unflatten_gradient(flat_tensor, param_shapes):
grads = []
offset = 0
for shape in param_shapes:
if shape is None:
grads.append(None)
else:
numel = torch.tensor(shape).prod().item()
grads.append(flat_tensor[offset:offset+numel].view(shape))
offset += numel
return grads
def rand_k_global_compressor(grad_list, k, param_shapes):
flat_grad = flatten_gradients(grad_list)
numel = flat_grad.numel()
if k >= numel:
compressed_flat = flat_grad
else:
indices = torch.randperm(numel, device=flat_grad.device)[:k]
compressed_flat = torch.zeros_like(flat_grad)
compressed_flat[indices] = flat_grad[indices] * (numel / k)
compressed_grad_list = unflatten_gradient(compressed_flat, param_shapes)
return compressed_grad_list
def train_FedSGD(global_model, criterion, clients, train_loader, test_loader, device, configs, epoch):
# =================== Client Partial Participation ===================
num_clients = len(clients)
num_selected_clients = max(1, int(configs["participation_rate"] * num_clients))
selected_client_indices = random.sample(range(num_clients), num_selected_clients)
# =================== Get Local Gradients ===================
client_idx = 0
client_gradients_list = []
param_shapes = [p.shape if p.requires_grad else None for p in global_model.parameters()]
for idx in selected_client_indices:
client_idx += 1
local_model = copy.deepcopy(global_model)
client_loader = clients[idx]
client_gradient = get_gradient(local_model, client_loader, criterion, device, configs)
if configs["compression"]:
client_gradient = rand_k_global_compressor(client_gradient, configs["rand-k"], param_shapes)
client_gradients_list.append(client_gradient)
del local_model
# =================== Aggregrating gradients ===================
averaged_grads = []
for grads_per_param in zip(*client_gradients_list):
if grads_per_param[0] is None:
averaged_grads.append(None)
else:
stacked = torch.stack(grads_per_param)
averaged_grads.append(torch.mean(stacked, dim=0))
# =================== Applying gradients to model ===================
with torch.no_grad():
for param, grad in zip(global_model.parameters(), averaged_grads):
if grad is not None:
param.data -= configs["lr"] * grad
# =================== Evaluating Training Loss and Test Accuracy ===================
train_loss = evaluate_train_loss(global_model, train_loader, device)
test_acc = evaluate_test_acc(global_model, test_loader, device)
# =================== Optionally Save the Gloabl Model ===================
save_dir = configs.get("checkpoint_dir", "checkpoints")
os.makedirs(save_dir, exist_ok=True)
if (configs["epoch_checkpoint"] >= 1 and epoch % configs["epoch_checkpoint"] == 0) or epoch == configs["epochs"]:
compression_tag = f"randk{configs['rand-k']}" if configs["compression"] else "fullgrad"
checkpoint_path = os.path.join(
save_dir,
f"FedSGD_{compression_tag}_lr{configs['lr']}_epoch{epoch}.pt"
)
torch.save(global_model.state_dict(), checkpoint_path)
return train_loss, test_acc
and I call the following when I try to execute the above functions:
if __name__ == "__main__":
configs = get_configurations()
set_seed(configs["seed"])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
global_model = get_model(model_name=configs["model_name"], is_pretrained=configs["is_pretrained"])
global_model.to(device)
global_model.train()
client_loaders_list = get_client_loaders(configs["num_clients"], configs["batch_size"])
test_loader = get_test_loader(configs["dataset_name"], configs["batch_size"])
train_loader = get_train_loader(configs["dataset_name"], configs["batch_size"])
criterion = nn.CrossEntropyLoss(reduction='mean')
if configs["meta_method"] == "FedAvg":
train_loss_hist, test_acc_hist = [], []
for epoch in range(1, configs["epochs"] + 1):
train_loss, test_acc = train_FedAvg(global_model, criterion, client_loaders_list, train_loader,
test_loader, device, configs, epoch)
print(f"[Epoch {epoch}] | train_loss: {train_loss:.4f} | test_acc: {test_acc['accuracy']:.2f}")
train_loss_hist.append(train_loss)
test_acc_hist.append(test_acc)
elif configs["meta_method"] == "FedSGD-Randk":
train_loss_hist, test_acc_hist = [], []
for epoch in range(1, configs["epochs"] + 1):
train_loss, test_acc = train_FedSGD(global_model, criterion, client_loaders_list, train_loader,
test_loader, device, configs, epoch)
print(f"[Epoch {epoch}] | train_loss: {train_loss:.4f} | test_acc: {test_acc['accuracy']:.2f}")
train_loss_hist.append(train_loss)
test_acc_hist.append(test_acc)
I have gone through 10 times over the code and try to replace the manual GD step by optimizer.step with SGD but still does not work. Could anyone please help me with locating the main issue here?