I am trying to use gradient compression (powerSGD: GitHub - epfml/powersgd: Practical low-rank gradient compression for distributed optimization: https://arxiv.org/abs/1905.13727) along with apex amp mixed precision “O2”, but I got the following error:
ray::ImplicitFunc.step() (pid=61519, ip=10.0.1.224)
File "/home/anaconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 248, in run
self._entrypoint()
File "/home/anaconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 316, in entrypoint
self._status_reporter.get_checkpoint())
File "/home/anaconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 575, in _trainable_func
output = fn()
File "/home/resnetTraceDDP.py", line 647, in train
File "/home/resnetTraceDDP.py", line 400, in reduce
RuntimeError: Expected object of scalar type Half but got scalar type Float for argument #0 'result' in call to _th_mm_out
My script (integrated into ray tune) is as follows, basically the error happens after the scaled_loss.backward() and during the all_reduce, in the RankKReducer(Reducer): def reduce function, particularly line 400 refers to where torch.matmul(matrix, q, out=p) is at.
class TensorBuffer():
"""
Packs multiple tensors into one flat buffer for efficient
intra-worker communication.
"""
def __init__(self, tensors):
indices = [0]
for tensor in tensors:
new_end = indices[-1] + tensor.nelement()
indices.append(new_end)
self._start_idx = indices[:-1]
self._end_idx = indices[1:]
self._tensors = tensors
self.buffer = torch.cat([t.view(-1) for t in tensors]) # copies
def __getitem__(self, index):
return self.buffer[self._start_idx[index] : self._end_idx[index]].view(*self._tensors[index].shape)
def __len__(self):
return len(self._tensors)
def pack(self, tensors=None):
# Optional. init already does this.
if tensors is None:
tensors = self._tensors
for tensor, entry in zip(tensors, self):
entry[:] = tensor
def unpack(self, tensors):
for tensor, entry in zip(tensors, self):
tensor[:] = entry
def nelement(self):
return self.buffer.nelement()
def element_size(self):
return self.buffer.element_size()
def bits(self):
return 8 * self.nelement() * self.element_size()
def all_reduce(self, async_op=False):
return torch.distributed.all_reduce(self.buffer, async_op=async_op)
def all_gather(self, async_op=False):
n_workers = torch.distributed.get_world_size() if torch.distributed.is_available() else 1
buffers = [torch.empty_like(self.buffer) for i in range(n_workers)]
handle = all_gather(buffers, self.buffer, async_op=async_op)
if async_op:
return buffers, handle
else:
return buffers
class Reducer:
def __init__(self, random_seed, device):
self.rng = np.random.RandomState(random_seed)
M = 1024 * 1024
self.precalc_numbers = (
torch.from_numpy(self.rng.randn(128 * M)).to(device).type(torch.float32)
)
if torch.distributed.is_available():
self.n_workers = torch.distributed.get_world_size()
self.rank = torch.distributed.get_rank()
else:
self.n_workers = 1
self.rank = 0
self.device = device
def reduce(self, grad_in, grad_out, memory_out):
"""Return communicated bits"""
raise NotImplementedError()
class RankKReducer(Reducer):
def __init__(self, random_seed, device, n_power_iterations=0, reuse_query=False, rank=1):
super().__init__(random_seed, device)
assert n_power_iterations == 0
self.rank = rank
self.p_memory = None
self.q_memory = None
self.reuse_query = reuse_query
def set_random(self, vector):
torch.manual_seed(self.rng.randint(1_000_000_000))
vector.data[:] = torch.randn(*vector.shape, device=self.device)
# orthogonalize(vector)
def reduce(self, grad_in, grad_out, memory_out):
"""
Reduce gradients between the workers in place
:param grad_in: dictionary -- send_buffers
:param grad_out: dictionary -- grads
:param memory_out: dictionary -- memories
"""
bits_communicated = 0
# Split the tensors into rank1-ones that will be reduced un-compressed
# and rank > 1 tensors that are compressed
rank1_tensors = [
(tensor, out, mem)
for tensor, out, mem in zip(grad_in, grad_out, memory_out)
if tensor.ndimension() <= 1
]
high_rank_tensors = [
(tensor, out, mem)
for tensor, out, mem in zip(grad_in, grad_out, memory_out)
if tensor.ndimension() > 1
]
# We are building a rank-1 approximation of every tensor
# that can be interpreted as a matrix. Let the approximation be
# M = p q^T
# We are allocating consequtive memory for the p's and q's
memory_is_uninitialized = self.p_memory is None
p_total_size = 0
q_total_size = 0
for tensor, _, _ in high_rank_tensors:
matrix = tensor.view(tensor.shape[0], -1)
n, m = matrix.shape
rank = min(n, m, self.rank)
p_total_size += n * rank
q_total_size += m * rank
if self.p_memory is None:
self.p_memory = torch.empty(p_total_size, device=self.device)
self.q_memory = torch.empty(q_total_size, device=self.device)
# Find them again and make lists of pointers
ps = []
qs = []
p_idx = 0
q_idx = 0
for tensor, _, _ in high_rank_tensors:
matrix = tensor.view(tensor.shape[0], -1)
n, m = matrix.shape
rank = min(n, m, self.rank)
ps.append(self.p_memory[p_idx : p_idx + n * rank].view(n, rank))
qs.append(self.q_memory[q_idx : q_idx + m * rank].view(m, rank))
p_idx += n * rank
q_idx += m * rank
for (tensor, _, _), q, p in zip(high_rank_tensors, qs, ps):
matrix = tensor.view(tensor.shape[0], -1)
n, m = matrix.shape
if self.reuse_query and not memory_is_uninitialized:
# orthogonalize(q)
pass
else:
# Sample a query vector q
self.set_random(q)
for (tensor, _, _), q, p in zip(high_rank_tensors, qs, ps):
matrix = tensor.view(tensor.shape[0], -1)
torch.matmul(matrix, q, out=p) #?
ts = datetime.now().timestamp()
all_reduce(self.p_memory)
ts = datetime.now().timestamp() - ts
bits_communicated += n_bits(self.p_memory)
# Start communicating rank 1 tensors
# TODO: check all times - all_reduce() time
rank1_tensor_list = TensorBuffer([tensor for (tensor, _, _) in rank1_tensors])
ts_2 = datetime.now().timestamp()
rank1_handle = rank1_tensor_list.all_reduce(async_op=True)
ts_2 = datetime.now().timestamp() - ts_2
bits_communicated += rank1_tensor_list.bits()
for p in ps:
orthogonalize(p)
for p, q, (tensor, _, _) in zip(ps, qs, high_rank_tensors):
matrix = tensor.view(tensor.shape[0], -1)
torch.matmul(matrix.t(), p, out=q)
ts_3 = datetime.now().timestamp()
all_reduce(self.q_memory)
ts_3 = datetime.now().timestamp() - ts_3
bits_communicated += n_bits(self.q_memory)
self.q_memory.data[:] /= self.n_workers
for p, q, (tensor, out, mem) in zip(ps, qs, high_rank_tensors):
# Set the output gradient
torch.matmul(p, q.t(), out=out.data[:])
mem.data[:] = tensor - out
rank1_handle.wait()
rank1_tensor_list.buffer /= self.n_workers
rank1_tensor_list.unpack([out for (_, out, _) in rank1_tensors])
return bits_communicated, ts+ts_2+ts_3
def ExactReducer(grad_in, grad_out, memory_out):
n_workers = float(dist.get_world_size())
for mem in memory_out:
mem.zero_()
list_in = grad_in
list_out = grad_out
if n_workers == 1:
for t_in, t_out in zip(list_in, list_out):
t_out[:] = t_in
return 0
buffer = TensorBuffer(list_in)
ts = datetime.now().timestamp()
buffer.all_reduce()
ts = datetime.now().timestamp() - ts
buffer.buffer /= n_workers
bits_communicated = buffer.bits()
buffer.unpack(list_out)
return bits_communicated, ts
def inf_loop(data_loader):
''' wrapper function for endless data loader. '''
for loader in repeat(data_loader):
yield from loader
def test_accuracy(net, device, testloader, criterion):
net.eval()
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(testloader):
inputs, targets = inputs.to(device), targets.to(device)
outputs = net(inputs)
loss = criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return test_loss/(batch_idx+1), 100.*correct/total
def load_data(config, distributed=True, data_dir="./data"):
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
train_dataset = datasets.CIFAR10(root=data_dir, train=True, transform=transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.RandomCrop(32, 4),
transforms.ToTensor(),
normalize,
]), download=True)
if distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset)
else:
train_sampler = None
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=config["batch_size"], sampler=train_sampler,
num_workers=8, pin_memory=True)
# Guideline of setting the num. of workers:
# https://discuss.pytorch.org/t/guidelines-for-assigning-num-workers-to-dataloader/813/4
val_loader = torch.utils.data.DataLoader(
datasets.CIFAR10(root=data_dir, train=False, transform=transforms.Compose([
transforms.ToTensor(),
normalize,
])),
batch_size=128, shuffle=False,
num_workers=8, pin_memory=True)
return train_loader, val_loader
def all_reduce(*args, **kwargs):
if torch.distributed.is_available() and torch.distributed.get_world_size() > 1:
return torch.distributed.all_reduce(*args, **kwargs)
def set_random(random_seed, vector, device):
rng = np.random.RandomState(random_seed)
torch.manual_seed(rng.randint(1_000_000_000))
vector.data[:] = torch.randn(*vector.shape, device=device)
# orthogonalize(vector)
def orthogonalize(matrix, eps=torch.tensor(1e-8)):
n, m = matrix.shape
for i in range(m):
# Normalize the i'th column
col = matrix[:, i : i + 1]
col /= torch.sqrt(torch.sum(col ** 2)) + eps
# Project it on the rest and remove it
if i + 1 < m:
rest = matrix[:, i + 1 :]
# rest -= torch.matmul(col.t(), rest) * col
rest -= torch.sum(col * rest, dim=0) * col
def n_bits(tensor):
return 8 * tensor.nelement() * tensor.element_size()
def l2norm(x):
return torch.sqrt(torch.sum(x ** 2))
def train(config, checkpoint_dir=None, data_dir=None):
# Read in the model related hyperparameters
activation_function = config["activation"]
pool = config["pool"]
opt_level, rank = config["optrank"]
iters = config["num_iters"]
batch = config["batch_size"]
mul = config["multiplier"]
n = config["n"]
ex_rate = config["rate"]
# os.environ["CUDA_VISIBLE_DEVICES"]="0"
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = resnet(depth=n, activation=activation_function, pool=pool,
inplanes=16, num_classes=10, mul=mul)
# model = rn.ResNet18()
model.to(device)
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), config["lr"],
momentum=config["momentum"],
weight_decay=config["weight_decay"])
if opt_level != "O3":
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
else:
model, optimizer = amp.initialize(model, optimizer, opt_level="O3", keep_batchnorm_fp32=True)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[iters*0.6, iters*0.8], last_epoch=-1)
# Warm up and roll back
lr_rollback = False
warm_up_iters = 0
if n >= 110 and config["lr"] == 0.1:
warm_up_iters = 400
lr_rollback=True
train_loader, val_loader = load_data(data_dir=data_dir, config=config)
model.train()
ts = datetime.now().timestamp()
# Model parameters for gradient compression
# Note: for Rank-R PowerSGD, optimizer_memory = True, reducer_reuse_query = True
state = [parameter for parameter in model.parameters()]
memories = [torch.zeros_like(param) for param in state]
send_buffers = [torch.zeros_like(param) for param in state]
bits = 0
exchange_time = 0 # gradient exchange total time
com_time = 0 # compressed & decompressed time. In rank setting, this equal to tot time - all reduce time
ex_ts_acc = 0
com_ts_acc = 0
iter_cnt = 0
if rank != 0:
reducer = RankKReducer(device=device, random_seed=seed, rank=rank)
# Iteration-based training
for i, (input, target) in enumerate(inf_loop(train_loader)):
if n >= 110 and config["lr"] == 0.1:
if warm_up_iters > 0:
for param_group in optimizer.param_groups:
param_group['lr'] = 0.01
warm_up_iters -= 1
else:
if lr_rollback:
for param_group in optimizer.param_groups:
param_group['lr'] = config['lr']
lr_rollback=False
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
input_var = torch.autograd.Variable(input)
target_var = torch.autograd.Variable(target)
# compute output
output = model(input_var)
loss = criterion(output, target_var)
optimizer.zero_grad()
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward() # calculate the gradients
# TODO: Gradient Exchange
# average_gradients(model)
# in what frequency?
if i >= ex_rate-1 and (i+1)%ex_rate == 0:
iter_cnt += 1
if rank != 0:
exchange_time = datetime.now().timestamp()
grads = [param.grad for param in model.parameters()] # different from param.grad.data?
for grad, memory, send_bfr in zip(grads, memories, send_buffers):
send_bfr.data[:] = grad + memory
bits, time = reducer.reduce(send_buffers, grads, memories)
exchange_time = datetime.now().timestamp() - exchange_time
com_time = exchange_time - time # compressed & decompressed time
# accumulated
ex_ts_acc += exchange_time
com_ts_acc += com_time
else:
exchange_time = datetime.now().timestamp()
grads = [param.grad for param in model.parameters()] # different from param.grad.data?
for grad, memory, send_bfr in zip(grads, memories, send_buffers):
send_bfr.data[:] = grad + memory
bits, time = ExactReducer(send_buffers, grads, memories)
exchange_time = datetime.now().timestamp() - exchange_time
com_time = exchange_time - time
# accumulated
ex_ts_acc += exchange_time
com_ts_acc += com_time
if iter_cnt == (150 / ex_rate):
ex = ex_ts_acc / iter_cnt
com = com_ts_acc / iter_cnt
iter_cnt = 0
ex_ts_acc = 0
com_ts_acc = 0
optimizer.step()
lr_scheduler.step()
# Every 150 iterations, report metrics
if i >= 149 and (i+1)%150 == 0:
test_loss, accuracy = test_accuracy(model, device, val_loader, criterion)
tune.report(
time_per_150_iterations = datetime.now().timestamp()-ts,
compression_t_avg = com, # compression & decompression time
grad_exchange_t_avg = ex, # gradient exchange time
test_accuracy = accuracy,
bits_communicated = bits,
opt = opt_level,
rank = rank
)
ts = datetime.now().timestamp()
model.train()
if i+1 == iters:
break
def _iter():
# in total 3
opt = ["O2"]
rank = [7]
for a in opt:
for b in rank:
if (a == "O0" and b != 0) or (a == "O2" and b != 7):
continue
yield a, b
def main(max_num_epochs=200, gpus_per_trial=2):
config = {
"multiplier": tune.grid_search([4]),
"batch_size": tune.grid_search([256]),
"momentum": tune.grid_search([0.9]),
"n": tune.grid_search([110]),
# Fixed
"lr": tune.grid_search([0.1]),
"weight_decay": tune.grid_search([1e-4]),
"activation": tune.grid_search(["relu"]),
"pool": tune.grid_search(["avg"]),
"optrank": tune.grid_search(list(_iter())),
"rate": tune.grid_search([10]),
"num_iters": tune.grid_search([78000])
}
data_dir = os.path.abspath("/home/data")
distributed_train = DistributedTrainableCreator(
partial(train, data_dir=data_dir),
backend="nccl",
num_gpus_per_worker=1,
num_workers=2,
num_cpus_per_worker=10,
num_workers_per_host=1,
timeout_s=300
)
result = tune.run(
distributed_train,
name="Final_train_DDP_Mixed_RESNET_1_21",
config=config,
local_dir=os.path.abspath("/home/result"))
if __name__ == '__main__':
import argparse
# ray.tune.ray_trial_executor.DEFAULT_GET_TIMEOUT = 300000000
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--ray-address",
help="Address of Ray cluster for seamless distributed execution.")
args, _ = parser.parse_known_args()
if args.smoke_test:
ray.init(local_mode=True)
main(max_num_epochs=10, gpus_per_trial=0)
else:
ray.init(address=args.ray_address)
main(max_num_epochs=100, gpus_per_trial=1)
Any one know how to fix this?