Multiprocessing help

I’m trying to apply some graph drawing on a batch of data in torch, but I’m facing some issues. Firstly, I have an overall class that contains all sorts of tensors I need. These are all created by default on the GPU via torch.set_default_tensor_type('torch.cuda.FloatTensor'). A part of my code loads a large set of data into the cpu, and from this large set, I am sampling batches.

Suppose we have a tensor of size problems = (batch, n, 2) , and I wish to draw some graphs using SciPy on them. The problem is, SciPy’s implementation is on the CPU and it also applies only on one instance at a time. For example, I have the following:

    def _parallel_do_graphs(self, probs):
        n_nodes = probs.size(0)
        adj = torch.zeros((n_nodes, n_nodes), device='cpu')

        adj += self._get_knn_graph(probs)
        adj += self._get_dt_graph(probs)

        return adj

    def _get_knn_graph(self, probs):
        num_nodes = probs.size(0)
        dense_adj = torch.zeros((num_nodes, num_nodes), device='cpu')
        np_probs = probs.cpu().numpy()

        kdt = KDTree(np_probs)
        _, idx = kdt.query(np_probs, k=self.env_params['knn'], workers=8)
        idx = torch.LongTensor(idx, device='cpu')
        dense_adj = dense_adj.scatter(dim=-1, index=idx, src=torch.ones(idx.size(), device='cpu'))

        return dense_adj


    def _get_dt_graph(self, probs):
        num_nodes = probs.size(0)
        dense_adj = torch.zeros((num_nodes, num_nodes), device='cpu')
        probs = probs.cpu()

        dt = Delaunay(probs)
        simplices = dt.simplices
        dense_adj[simplices[:, 0], simplices[:, 1]] = 1
        dense_adj[simplices[:, 1], simplices[:, 2]] = 1
        dense_adj[simplices[:, 0], simplices[:, 2]] = 1

        return dense_adj

    def make_graphs(self, add_edge_type=True, add_edge_dist=True, flip=False):
        assert self.problems is not None
        batch, num_nodes, _ = self.problems.size()
        self.problem_edges = torch.zeros((batch, num_nodes, num_nodes), device='cpu')

        # sequential approach
        # for i in range(batch):
        #     adj = torch.zeros((num_nodes, num_nodes), device='cpu')
        #     # call a graph function - can call multiple and add them together
        #     adj += self._get_knn_graph(self.problems[i])
        #     adj += self._get_dt_graph(self.problems[i])
        #     self.problem_edges[i] = adj

        # parallel approach
        processes = 128


        params = [(self.problems[i]) for i in range(batch)]
        pool = Pool(processes=processes)
       
        all_graph = pool.imap(func=self._parallel_do_graphs, iterable=params)
            

        pool.close()

        self.problem_edges = torch.stack(all_graph, dim=0)

        # remove all self loops, clamp back to 1, then add back self loops as value 2
        self.problem_edges = self.problem_edges * (1.0 - torch.eye(num_nodes, device='cpu'))
        self.problem_edges = self.problem_edges.clamp(min=0.0, max=1.0)

        # construct the edges and the attributes
        if add_edge_dist:
            # add a small value so that the self-loop is taken and remove it, its feature value is 0
            dist = torch.cdist(self.problems.cpu(), self.problems.cpu()) * self.problem_edges
            dist = dist + ((self.problem_edges + torch.eye(num_nodes, device='cpu')[None, :, :].repeat(batch, 1, 1)) * 1e-5)
            _, self.problem_edge_dist = pyg.utils.dense_to_sparse(dist.to(self.problems.device))
            self.problem_edge_dist -= 1e-5
            self.problem_edge_dist = self.problem_edge_dist.view(-1, 1)

        # add self-loops and change its type to 2
        self.problem_edges = self.problem_edges + (torch.eye(num_nodes, device='cpu')[None, :, :].repeat(batch, 1, 1) * 2.0)

        if add_edge_type:
            self.problem_edges, self.problem_edge_type = pyg.utils.dense_to_sparse(self.problem_edges.to(self.problems.device))

        if flip:
            # flip the edges for bipartite message passing s.t. row 0 are variable nodes and row 1 are factor nodes
            self.problem_edges = self.problem_edges.flip([0])

        return

Note that in the sequential approach, I am simply looping through my batch of data, applying the kNN graph and Triangulation graph functions, then updating the main matrix. In the parallel approach, I hope to distribute this task to multiple workers as the sequential approach is really slow.

However, torch is complaining to me that I need to use the spawn start process instead. I find that using this is extremely slow and also causes a lot of tensor replication. On the other hand, when I try to apply the normal multiprocessing here, it complains that the CUDA tensors cannot be replicated in such a way. I’m very confused why this is so because, self.problems is sampled from a larger set, of which it is in CPU. I’m also only operating on self.problems, which is on the CPU at the moment. However, I do have other tensors in this object class that are on the GPU. Is that why torch is complaining to me about this? Because I’m really puzzled as to why it keeps telling me I can’t use the original fork method instead. Or does it have to do with the fact that torch.set_default_tensor_type() is being used here?