I currently exploring the pytorch rpc framework.
I thought rpc is similar to the promise and event loop in javascript, such that everything runs in order, but then found that rpc_async runs simultaneously on the callee side. The following code gives an example.
The question is, how does pytorch implement rpc to make it parallel? Does it create threads at the C++ level?
# this code intends to test the async rpc call,
# to see between two cuda device, when the caller send a large tensor to the callee
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc
import torch
import time
import os
import asyncio
import threading
port_num = 29508
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = str(port_num)
def main():
p1 = mp.Process(target=worker1)
p1.start()
p2 = mp.Process(target=worker2)
p2.start()
p1.join()
p2.join()
_w1_compute_num = 0
def worker1():
#torch.set_num_threads(1)
rpc.init_rpc("worker1", rank=0, world_size=2,
rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
num_worker_threads=8, device_maps={"worker2": {0: 1}},
devices=["cuda:0"]))
print("worker1 finish init, start sleep")
# this loop simulate any time consuming computation
global _w1_compute_num
for i in range(10000):
_w1_compute_num = i
if i % 500 == 0:
print("torch.cuda.memory_allocated: %fGB"%(torch.cuda.memory_allocated("cuda:0")/1024/1024/1024))
print("worker1: torch threads count: ", str(torch.get_num_threads()))
print("worker1: threads count: ", str(len(threading.enumerate())))
print("worker1 finish sleep, start wait")
print("worker1 finish wait, will shutdown")
rpc.shutdown(graceful=True)
print("worker1 finish shutdown")
def worker1_recv(large_t):
print("worker1 received", str(_w1_compute_num))
print("torch.cuda.memory_allocated: %fGB"%(torch.cuda.memory_allocated("cuda:0")/1024/1024/1024))
print(large_t.shape)
print(large_t.device)
z = large_t.pow(2)
print("worker1 pow finish")
print("torch.cuda.memory_allocated: %fGB"%(torch.cuda.memory_allocated("cuda:0")/1024/1024/1024))
def worker2():
#torch.set_num_threads(1)
rpc.init_rpc("worker2", rank=1, world_size=2,
rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
num_worker_threads=8, device_maps={"worker1": {1: 0}},
devices=["cuda:1"]))
print("worker2 finish init")
large_t = torch.randn(2, 32, 100, 100, device="cuda:1", dtype=torch.bfloat16)
print("worker2 calling worker1")
f = rpc.rpc_async("worker1", worker1_recv, args=(large_t,))
print("worker2 finish call worker1")
f.wait()
print("worker2 finish send, will shutdown")
time.sleep(3)
rpc.shutdown(graceful=True)
print("worker2 finish shutdown")
def kill_port(p_port):
process_file = os.popen(f"lsof -i:{p_port} | awk 'NR==2 {{print $2}}'")
pid_to_kill = process_file.read().strip()
print(pid_to_kill)
if pid_to_kill != "":
os.system(f"kill {pid_to_kill}")
time.sleep(1)
if __name__ == "__main__":
kill_port(port_num)
time.sleep(1)
main()