I’m trying to apply some graph drawing on a batch of data in torch, but I’m facing some issues. Firstly, I have an overall class that contains all sorts of tensors I need. These are all created by default on the GPU via
torch.set_default_tensor_type('torch.cuda.FloatTensor'). A part of my code loads a large set of data into the cpu, and from this large set, I am sampling batches.
Suppose we have a tensor of size
problems = (batch, n, 2) , and I wish to draw some graphs using SciPy on them. The problem is, SciPy’s implementation is on the CPU and it also applies only on one instance at a time. For example, I have the following:
def _parallel_do_graphs(self, probs): n_nodes = probs.size(0) adj = torch.zeros((n_nodes, n_nodes), device='cpu') adj += self._get_knn_graph(probs) adj += self._get_dt_graph(probs) return adj def _get_knn_graph(self, probs): num_nodes = probs.size(0) dense_adj = torch.zeros((num_nodes, num_nodes), device='cpu') np_probs = probs.cpu().numpy() kdt = KDTree(np_probs) _, idx = kdt.query(np_probs, k=self.env_params['knn'], workers=8) idx = torch.LongTensor(idx, device='cpu') dense_adj = dense_adj.scatter(dim=-1, index=idx, src=torch.ones(idx.size(), device='cpu')) return dense_adj def _get_dt_graph(self, probs): num_nodes = probs.size(0) dense_adj = torch.zeros((num_nodes, num_nodes), device='cpu') probs = probs.cpu() dt = Delaunay(probs) simplices = dt.simplices dense_adj[simplices[:, 0], simplices[:, 1]] = 1 dense_adj[simplices[:, 1], simplices[:, 2]] = 1 dense_adj[simplices[:, 0], simplices[:, 2]] = 1 return dense_adj def make_graphs(self, add_edge_type=True, add_edge_dist=True, flip=False): assert self.problems is not None batch, num_nodes, _ = self.problems.size() self.problem_edges = torch.zeros((batch, num_nodes, num_nodes), device='cpu') # sequential approach # for i in range(batch): # adj = torch.zeros((num_nodes, num_nodes), device='cpu') # # call a graph function - can call multiple and add them together # adj += self._get_knn_graph(self.problems[i]) # adj += self._get_dt_graph(self.problems[i]) # self.problem_edges[i] = adj # parallel approach processes = 128 params = [(self.problems[i]) for i in range(batch)] pool = Pool(processes=processes) all_graph = pool.imap(func=self._parallel_do_graphs, iterable=params) pool.close() self.problem_edges = torch.stack(all_graph, dim=0) # remove all self loops, clamp back to 1, then add back self loops as value 2 self.problem_edges = self.problem_edges * (1.0 - torch.eye(num_nodes, device='cpu')) self.problem_edges = self.problem_edges.clamp(min=0.0, max=1.0) # construct the edges and the attributes if add_edge_dist: # add a small value so that the self-loop is taken and remove it, its feature value is 0 dist = torch.cdist(self.problems.cpu(), self.problems.cpu()) * self.problem_edges dist = dist + ((self.problem_edges + torch.eye(num_nodes, device='cpu')[None, :, :].repeat(batch, 1, 1)) * 1e-5) _, self.problem_edge_dist = pyg.utils.dense_to_sparse(dist.to(self.problems.device)) self.problem_edge_dist -= 1e-5 self.problem_edge_dist = self.problem_edge_dist.view(-1, 1) # add self-loops and change its type to 2 self.problem_edges = self.problem_edges + (torch.eye(num_nodes, device='cpu')[None, :, :].repeat(batch, 1, 1) * 2.0) if add_edge_type: self.problem_edges, self.problem_edge_type = pyg.utils.dense_to_sparse(self.problem_edges.to(self.problems.device)) if flip: # flip the edges for bipartite message passing s.t. row 0 are variable nodes and row 1 are factor nodes self.problem_edges = self.problem_edges.flip() return
Note that in the sequential approach, I am simply looping through my batch of data, applying the kNN graph and Triangulation graph functions, then updating the main matrix. In the parallel approach, I hope to distribute this task to multiple workers as the sequential approach is really slow.
However, torch is complaining to me that I need to use the
spawn start process instead. I find that using this is extremely slow and also causes a lot of tensor replication. On the other hand, when I try to apply the normal multiprocessing here, it complains that the CUDA tensors cannot be replicated in such a way. I’m very confused why this is so because,
self.problems is sampled from a larger set, of which it is in CPU. I’m also only operating on
self.problems, which is on the CPU at the moment. However, I do have other tensors in this object class that are on the GPU. Is that why torch is complaining to me about this? Because I’m really puzzled as to why it keeps telling me I can’t use the original
fork method instead. Or does it have to do with the fact that
torch.set_default_tensor_type() is being used here?