RuntimeError: Expected object of scalar type Half but got scalar type Float for argument #0 'result' in call to _th_mm_out

I am trying to use gradient compression (powerSGD: GitHub - epfml/powersgd: Practical low-rank gradient compression for distributed optimization: https://arxiv.org/abs/1905.13727) along with apex amp mixed precision “O2”, but I got the following error:

ray::ImplicitFunc.step() (pid=61519, ip=10.0.1.224)
  File "/home/anaconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 248, in run
    self._entrypoint()
  File "/home/anaconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 316, in entrypoint
    self._status_reporter.get_checkpoint())
  File "/home/anaconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 575, in _trainable_func
    output = fn()
  File "/home/resnetTraceDDP.py", line 647, in train
  File "/home/resnetTraceDDP.py", line 400, in reduce
RuntimeError: Expected object of scalar type Half but got scalar type Float for argument #0 'result' in call to _th_mm_out

My script (integrated into ray tune) is as follows, basically the error happens after the scaled_loss.backward() and during the all_reduce, in the RankKReducer(Reducer): def reduce function, particularly line 400 refers to where torch.matmul(matrix, q, out=p) is at.

class TensorBuffer():
    """
    Packs multiple tensors into one flat buffer for efficient
    intra-worker communication.
    """
    def __init__(self, tensors):
        indices = [0]
        for tensor in tensors:
            new_end = indices[-1] + tensor.nelement()
            indices.append(new_end)

        self._start_idx = indices[:-1]
        self._end_idx = indices[1:]
        self._tensors = tensors

        self.buffer = torch.cat([t.view(-1) for t in tensors]) # copies
    
    def __getitem__(self, index):
        return self.buffer[self._start_idx[index] : self._end_idx[index]].view(*self._tensors[index].shape)

    def __len__(self):
        return len(self._tensors)

    def pack(self, tensors=None):
        # Optional. init already does this.
        if tensors is None:
            tensors = self._tensors
        for tensor, entry in zip(tensors, self):
            entry[:] = tensor

    def unpack(self, tensors):
        for tensor, entry in zip(tensors, self):
            tensor[:] = entry

    def nelement(self):
        return self.buffer.nelement()

    def element_size(self):
        return self.buffer.element_size()

    def bits(self):
        return 8 * self.nelement() * self.element_size()

    def all_reduce(self, async_op=False):
        return torch.distributed.all_reduce(self.buffer, async_op=async_op)
    
    def all_gather(self, async_op=False):
        n_workers = torch.distributed.get_world_size() if torch.distributed.is_available() else 1
        buffers = [torch.empty_like(self.buffer) for i in range(n_workers)]
        handle = all_gather(buffers, self.buffer, async_op=async_op)
        if async_op:
            return buffers, handle
        else:
            return buffers

class Reducer:
    def __init__(self, random_seed, device):
        self.rng = np.random.RandomState(random_seed)
        M = 1024 * 1024
        self.precalc_numbers = (
            torch.from_numpy(self.rng.randn(128 * M)).to(device).type(torch.float32)
        )
        if torch.distributed.is_available():
            self.n_workers = torch.distributed.get_world_size()
            self.rank = torch.distributed.get_rank()
        else:
            self.n_workers = 1
            self.rank = 0
        self.device = device

    def reduce(self, grad_in, grad_out, memory_out):
        """Return communicated bits"""
        raise NotImplementedError()

class RankKReducer(Reducer):
    def __init__(self, random_seed, device, n_power_iterations=0, reuse_query=False, rank=1):
        super().__init__(random_seed, device)
        assert n_power_iterations == 0
        self.rank = rank
        self.p_memory = None
        self.q_memory = None
        self.reuse_query = reuse_query

    def set_random(self, vector):
        torch.manual_seed(self.rng.randint(1_000_000_000))
        vector.data[:] = torch.randn(*vector.shape, device=self.device)
        # orthogonalize(vector)

    def reduce(self, grad_in, grad_out, memory_out):
        """
        Reduce gradients between the workers in place
        :param grad_in: dictionary -- send_buffers 
        :param grad_out: dictionary -- grads 
        :param memory_out: dictionary -- memories 
        """
        bits_communicated = 0

        # Split the tensors into rank1-ones that will be reduced un-compressed
        # and rank > 1 tensors that are compressed
        rank1_tensors = [
            (tensor, out, mem)
            for tensor, out, mem in zip(grad_in, grad_out, memory_out)
            if tensor.ndimension() <= 1
        ]
        high_rank_tensors = [
            (tensor, out, mem)
            for tensor, out, mem in zip(grad_in, grad_out, memory_out)
            if tensor.ndimension() > 1
        ]

        # We are building a rank-1 approximation of every tensor
        # that can be interpreted as a matrix. Let the approximation be
        # M = p q^T
        # We are allocating consequtive memory for the p's and q's

        memory_is_uninitialized = self.p_memory is None

        p_total_size = 0
        q_total_size = 0
        for tensor, _, _ in high_rank_tensors:
            matrix = tensor.view(tensor.shape[0], -1)
            n, m = matrix.shape
            rank = min(n, m, self.rank)
            p_total_size += n * rank
            q_total_size += m * rank

        if self.p_memory is None:
            self.p_memory = torch.empty(p_total_size, device=self.device)
            self.q_memory = torch.empty(q_total_size, device=self.device)

        # Find them again and make lists of pointers
        ps = []
        qs = []
        p_idx = 0
        q_idx = 0
        for tensor, _, _ in high_rank_tensors:
            matrix = tensor.view(tensor.shape[0], -1)
            n, m = matrix.shape
            rank = min(n, m, self.rank)
            ps.append(self.p_memory[p_idx : p_idx + n * rank].view(n, rank))
            qs.append(self.q_memory[q_idx : q_idx + m * rank].view(m, rank))
            p_idx += n * rank
            q_idx += m * rank

        for (tensor, _, _), q, p in zip(high_rank_tensors, qs, ps):
            matrix = tensor.view(tensor.shape[0], -1)
            n, m = matrix.shape

            if self.reuse_query and not memory_is_uninitialized:
                # orthogonalize(q)
                pass
            else:
                # Sample a query vector q
                self.set_random(q)

        for (tensor, _, _), q, p in zip(high_rank_tensors, qs, ps):
            matrix = tensor.view(tensor.shape[0], -1)
            torch.matmul(matrix, q, out=p) #?

        ts = datetime.now().timestamp()
        all_reduce(self.p_memory)
        ts = datetime.now().timestamp() - ts 

        bits_communicated += n_bits(self.p_memory)

        # Start communicating rank 1 tensors
        # TODO: check all times - all_reduce() time 
       
        rank1_tensor_list = TensorBuffer([tensor for (tensor, _, _) in rank1_tensors])

        ts_2 = datetime.now().timestamp()
        rank1_handle = rank1_tensor_list.all_reduce(async_op=True)
        ts_2 = datetime.now().timestamp() - ts_2

        bits_communicated += rank1_tensor_list.bits()

        for p in ps:
            orthogonalize(p)

        for p, q, (tensor, _, _) in zip(ps, qs, high_rank_tensors):
            matrix = tensor.view(tensor.shape[0], -1)
            torch.matmul(matrix.t(), p, out=q)
            
        ts_3 = datetime.now().timestamp()
        all_reduce(self.q_memory)
        ts_3 = datetime.now().timestamp() - ts_3 

        bits_communicated += n_bits(self.q_memory)
        self.q_memory.data[:] /= self.n_workers

        for p, q, (tensor, out, mem) in zip(ps, qs, high_rank_tensors):
            # Set the output gradient
            torch.matmul(p, q.t(), out=out.data[:])
            mem.data[:] = tensor - out 

        rank1_handle.wait()
        rank1_tensor_list.buffer /= self.n_workers
        rank1_tensor_list.unpack([out for (_, out, _) in rank1_tensors])

        return bits_communicated, ts+ts_2+ts_3

def ExactReducer(grad_in, grad_out, memory_out):
    n_workers = float(dist.get_world_size())
    for mem in memory_out:
        mem.zero_()
    list_in = grad_in
    list_out = grad_out

    if n_workers == 1: 
        for t_in, t_out in zip(list_in, list_out):
            t_out[:] = t_in
        return 0 

    buffer = TensorBuffer(list_in)
    ts = datetime.now().timestamp()
    buffer.all_reduce()
    ts = datetime.now().timestamp() - ts 

    buffer.buffer /= n_workers
    bits_communicated = buffer.bits()
    buffer.unpack(list_out)
    return bits_communicated, ts 


def inf_loop(data_loader):
    ''' wrapper function for endless data loader. '''
    for loader in repeat(data_loader):
        yield from loader

def test_accuracy(net, device, testloader, criterion):
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    return test_loss/(batch_idx+1), 100.*correct/total


def load_data(config, distributed=True, data_dir="./data"):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])

    train_dataset = datasets.CIFAR10(root=data_dir, train=True, transform=transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomCrop(32, 4),
            transforms.ToTensor(),
            normalize,
        ]), download=True) 

    if distributed:
        train_sampler = torch.utils.data.distributed.DistributedSampler(
            train_dataset)
    else:
        train_sampler = None
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=config["batch_size"], sampler=train_sampler,
        num_workers=8, pin_memory=True)

    # Guideline of setting the num. of workers: 
    # https://discuss.pytorch.org/t/guidelines-for-assigning-num-workers-to-dataloader/813/4
    val_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10(root=data_dir, train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            normalize,
        ])),
        batch_size=128, shuffle=False,
        num_workers=8, pin_memory=True)

    return train_loader, val_loader

def all_reduce(*args, **kwargs):
    if torch.distributed.is_available() and torch.distributed.get_world_size() > 1:
        return torch.distributed.all_reduce(*args, **kwargs)

def set_random(random_seed, vector, device):
    rng = np.random.RandomState(random_seed)
    torch.manual_seed(rng.randint(1_000_000_000))
    vector.data[:] = torch.randn(*vector.shape, device=device)
    # orthogonalize(vector)

def orthogonalize(matrix, eps=torch.tensor(1e-8)):
    n, m = matrix.shape
    for i in range(m):
        # Normalize the i'th column
        col = matrix[:, i : i + 1]
        col /= torch.sqrt(torch.sum(col ** 2)) + eps
        # Project it on the rest and remove it
        if i + 1 < m:
            rest = matrix[:, i + 1 :]
            # rest -= torch.matmul(col.t(), rest) * col
            rest -= torch.sum(col * rest, dim=0) * col

def n_bits(tensor):
    return 8 * tensor.nelement() * tensor.element_size()

def l2norm(x):
    return torch.sqrt(torch.sum(x ** 2))
    
def train(config, checkpoint_dir=None, data_dir=None):
    # Read in the model related hyperparameters
    activation_function = config["activation"]
    pool = config["pool"]
    opt_level, rank = config["optrank"]
    iters = config["num_iters"]
    batch = config["batch_size"]
    mul = config["multiplier"]
    n = config["n"]
    ex_rate = config["rate"]
  
    # os.environ["CUDA_VISIBLE_DEVICES"]="0"
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = resnet(depth=n, activation=activation_function, pool=pool, 
                          inplanes=16, num_classes=10, mul=mul)
    # model = rn.ResNet18()
    model.to(device)

    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.SGD(model.parameters(), config["lr"],
                                momentum=config["momentum"],
                                weight_decay=config["weight_decay"])
    
    if opt_level != "O3":
        model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
    else:
        model, optimizer = amp.initialize(model, optimizer, opt_level="O3", keep_batchnorm_fp32=True)

    lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[iters*0.6, iters*0.8], last_epoch=-1)

    # Warm up and roll back 
    lr_rollback = False 
    warm_up_iters = 0
    if n >= 110 and config["lr"] == 0.1:
        warm_up_iters = 400
        lr_rollback=True

    train_loader, val_loader = load_data(data_dir=data_dir, config=config)
    model.train()
    ts = datetime.now().timestamp() 

    # Model parameters for gradient compression 
    # Note: for Rank-R PowerSGD, optimizer_memory = True, reducer_reuse_query = True 
    state = [parameter for parameter in model.parameters()]
    memories = [torch.zeros_like(param) for param in state]
    send_buffers = [torch.zeros_like(param) for param in state]
    bits = 0 
    exchange_time = 0 # gradient exchange total time 
    com_time = 0 # compressed & decompressed time. In rank setting, this equal to tot time - all reduce time
    ex_ts_acc = 0 
    com_ts_acc = 0
    iter_cnt = 0  

    if rank != 0: 
        reducer = RankKReducer(device=device, random_seed=seed, rank=rank)

    # Iteration-based training 
    for i, (input, target) in enumerate(inf_loop(train_loader)):
        if n >= 110 and config["lr"] == 0.1:
            if warm_up_iters > 0:
                for param_group in optimizer.param_groups:
                    param_group['lr'] = 0.01
                warm_up_iters -= 1
            else:
                if lr_rollback:
                    for param_group in optimizer.param_groups:
                        param_group['lr'] = config['lr']
                lr_rollback=False

        input = input.cuda(non_blocking=True)
        target = target.cuda(non_blocking=True)
        input_var = torch.autograd.Variable(input)
        target_var = torch.autograd.Variable(target)

        # compute output 
        output = model(input_var)
        loss = criterion(output, target_var)

        optimizer.zero_grad()
        with amp.scale_loss(loss, optimizer) as scaled_loss:
            scaled_loss.backward() # calculate the gradients 
  
        # TODO: Gradient Exchange 
        # average_gradients(model)
        
        # in what frequency?
        if i >= ex_rate-1 and (i+1)%ex_rate == 0: 
            iter_cnt += 1 

            if rank != 0: 
                exchange_time = datetime.now().timestamp()
                grads = [param.grad for param in model.parameters()] # different from param.grad.data?
                for grad, memory, send_bfr in zip(grads, memories, send_buffers):
                    send_bfr.data[:] = grad + memory 
                bits, time = reducer.reduce(send_buffers, grads, memories)
                exchange_time = datetime.now().timestamp() - exchange_time
                com_time = exchange_time - time # compressed & decompressed time 
                # accumulated 
                ex_ts_acc += exchange_time
                com_ts_acc += com_time

            else:
                exchange_time = datetime.now().timestamp()
                grads = [param.grad for param in model.parameters()] # different from param.grad.data?
                for grad, memory, send_bfr in zip(grads, memories, send_buffers):
                    send_bfr.data[:] = grad + memory 
                bits, time = ExactReducer(send_buffers, grads, memories)
                exchange_time = datetime.now().timestamp() - exchange_time
                com_time = exchange_time - time 
                # accumulated 
                ex_ts_acc += exchange_time
                com_ts_acc += com_time
            
            if iter_cnt == (150 / ex_rate):
                ex = ex_ts_acc / iter_cnt
                com = com_ts_acc / iter_cnt 
                iter_cnt = 0 
                ex_ts_acc = 0
                com_ts_acc = 0
                
        optimizer.step()
        lr_scheduler.step() 

        # Every 150 iterations, report metrics   
        if i >= 149 and (i+1)%150 == 0:
            test_loss, accuracy = test_accuracy(model, device, val_loader, criterion)

            tune.report(
                time_per_150_iterations = datetime.now().timestamp()-ts,
                compression_t_avg = com, # compression & decompression time  
                grad_exchange_t_avg = ex, # gradient exchange time 
                test_accuracy = accuracy, 
                bits_communicated = bits,
                opt = opt_level,
                rank = rank 
            )
            ts = datetime.now().timestamp()
        model.train()
        
        if i+1 == iters:
            break 

def _iter():
    # in total 3 
    opt = ["O2"]
    rank = [7]
    for a in opt:
        for b in rank:
            if (a == "O0" and b != 0) or (a == "O2" and b != 7):
                continue 
            yield a, b

def main(max_num_epochs=200, gpus_per_trial=2):
    config = { 
        "multiplier": tune.grid_search([4]),
        "batch_size": tune.grid_search([256]), 
        "momentum": tune.grid_search([0.9]),
        "n": tune.grid_search([110]), 
        # Fixed 
        "lr": tune.grid_search([0.1]),  
        "weight_decay": tune.grid_search([1e-4]),
        "activation": tune.grid_search(["relu"]),
        "pool": tune.grid_search(["avg"]),
        "optrank": tune.grid_search(list(_iter())),  
        "rate": tune.grid_search([10]),
        "num_iters": tune.grid_search([78000]) 
    }

    data_dir = os.path.abspath("/home/data")
    distributed_train = DistributedTrainableCreator(
        partial(train, data_dir=data_dir),
        backend="nccl",
        num_gpus_per_worker=1,
        num_workers=2,
        num_cpus_per_worker=10,
        num_workers_per_host=1,
        timeout_s=300
    )
    result = tune.run(
        distributed_train,
        name="Final_train_DDP_Mixed_RESNET_1_21",
        config=config,
        local_dir=os.path.abspath("/home/result"))

if __name__ == '__main__':
    import argparse
    # ray.tune.ray_trial_executor.DEFAULT_GET_TIMEOUT = 300000000

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--smoke-test", action="store_true", help="Finish quickly for testing")
    parser.add_argument(
        "--ray-address",
        help="Address of Ray cluster for seamless distributed execution.")
    args, _ = parser.parse_known_args()

    if args.smoke_test:
        ray.init(local_mode=True)
        main(max_num_epochs=10, gpus_per_trial=0)
    else:
        ray.init(address=args.ray_address)
        main(max_num_epochs=100, gpus_per_trial=1) 

Any one know how to fix this?

We recommend to use the native mixed-precision training via torch.cuda.amp.
Have a look at the docs for some examples.

1 Like