API alternative for torch.distributed.launch?

For some reason it is hard (and ugly) to start my own module using torch.distributed.launch. It is inside an application therefore I would need to concatenate an execution cmd line string to do it.

Is there a way that I can call torch.distributed.launch using an API way? (i.e. I then expose my module as a simple function and feed my function as a callback to torch’s distributed module)

Thanks!

As part of torch 1.9.0 we are introducing torch.distributed.run to replace torch.distributed.launch definition is here (pytorch/run.py at master · pytorch/pytorch · GitHub). This eventually calls into a function called elastic_launch (pytorch/api.py at master · pytorch/pytorch · GitHub) which seems to be what you are looking for. For example, you can import it and use it like elastic_launch(LaunchConfig, your_module_function)(your_module_args). Does this satisfy your use case?

1 Like

Thanks! Let me try that out!

Thanks Howard.

Compiled and installed from source, and trying out the elastic_launch API.

One question is, is there a definitive list of every thing needed in environments for this elastic_launch? when using the script, there are bunch of things such as RANK, master_addr to make everything ready. However the elastic_launch’s config only has a subset of these variables.

Haven’t fully traced the code yet, just would like to double check first instead of trail and error finding them.

right now, I am able to get things kicked off but apparently some configurations is missing.

(pid=601762)     "message": "EtcdException: Could not get the list of servers, maybe you provided the wrong host(s) to connect to?",
(pid=601762)     "extraInfo": {
(pid=601762)       "py_callstack": "Traceback (most recent call last):\n  File \"/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/connection.py\", line 170, in _new_conn\n    (self._dns_host, self.port), self.timeout, **extra_kw\n  File \"/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/util/connection.py\", line 96, in create_connection\n

and

Traceback (most recent call last):
  File "train_ray_local.py", line 169, in <module>
    ray.get([client.train.remote(), client2.train.remote()])
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/ray/worker.py", line 1481, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(EtcdException): ray::Network.train() (pid=601762, ip=10.231.13.71)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/util/connection.py", line 96, in create_connection
    raise err
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/util/connection.py", line 86, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

ray::Network.train() (pid=601762, ip=10.231.13.71)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/connectionpool.py", line 394, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/connection.py", line 234, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/http/client.py", line 1277, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/http/client.py", line 1323, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/http/client.py", line 1272, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/http/client.py", line 1032, in _send_output
    self.send(msg)
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/http/client.py", line 972, in send
    self.connect()
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/connection.py", line 200, in connect
    conn = self._new_conn()
  File "/home/centos/anaconda3/envs/dev/lib/python3.7/site-packages/urllib3/connection.py", line 182, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f6a609edb50>: Failed to establish a new connection: [Errno 111] Connection refused

Hi, I should have provided more resources in my previous response, my apologies. Here is the reading about torch.distributed.run and the arguments it supports (Elastic Launch — PyTorch master documentation). It is a superset of the arguments of torch.distributed.launch but also includes fault tolerance provided by TorchElastic Torch Distributed Elastic — PyTorch master documentation.

Regarding LaunchConfig and elastic_launch, they are not yet public APIs and are subject to change. By default it uses an etcd backend which is a third-party distributed key-value store which is why I believe you are hitting your errors. However, if you don’t want third party dependencies, you can use an internally built backend for rendezvous. Here is an example for you:

test.py

from torch.distributed.launcher.api import LaunchConfig, elastic_launch
import os

def my_process(args):
    env_variables = [
        "RANK",
        "WORLD_SIZE",
        "MASTER_ADDR",
        "MASTER_PORT",
        "TORCHELASTIC_MAX_RESTARTS",
        # etc...
    ]
    for env_var in env_variables:
        print(f"{env_var}: {os.environ.get(env_var)}")
    print(f"here are my own args: {args}")

if __name__ == "__main__":
    config = LaunchConfig(min_nodes=1, max_nodes=1, nproc_per_node=2, rdzv_endpoint="localhost:0", rdzv_backend="c10d")
    outputs = elastic_launch(config, my_process)("my args")

Then just run python test.py

The output looks something like (along with some torchelastic logging and warnings):

RANK: 1
WORLD_SIZE: 2
MASTER_ADDR: <my machine>
MASTER_PORT: 51217
TORCHELASTIC_MAX_RESTARTS: 3
here are my own args: my args
RANK: 0
WORLD_SIZE: 2
MASTER_ADDR: <my machine>
MASTER_PORT: 51217
TORCHELASTIC_MAX_RESTARTS: 3
here are my own args: my args

Awesome example! clear and simple. Thanks for it Howard! :slight_smile:

One follow up question, what is the best way to double check the distributed training actually happened. Just want to make sure I am not cheated by some lucky succeeded run of mis-configuration :slight_smile: I read this page (Distributed communication package - torch.distributed — PyTorch 1.8.1 documentation) but not sure what would be the definitive way to tell.

One way might be printing out the forward output on all rank every N iterations, and see if the loss is different and resides on different GPUs. You can also check nvidia-smi to see if they indeed keep multiple GPUs busy.

I tried this example, it works greatly even on multiple machines. But I am trying to understand how these two parameters work, and what the mechanism is:

rdzv_endpoint="localhost:0", rdzv_backend="c10d"

Let us say I have two machines, now I used above two parameters to both machines. How does that work? localhost:0 seems really non-typical. Under the hood, how is etcd involved in this process?

Thanks a lot!

would you please point me to the exact place where all-reduce happens? maybe I can just print out message to confirm that all-reduce happened?

For a distributed setting you should provide the hostname of one of your machines:

rdzv_endpoint="node0.example.com" rdzv_backend="c10d"

The example that Howard gave was meant to be run on a single machine with two worker processes. localhost:0 simply means use a random port on the local machine for the coordination of the nodes. Since you only have one node (that runs two workers) specifying localhost as an endpoint worked in that example. However if you want to run your training on more than one machine (a.k.a. node) the endpoint should be a hostname/FQDN that is reachable by all machines.

Thanks Can!

OK. looks like it works now. To run on multiple machines, i assume we dont’ need to start an etcd instance first. Just simply set rdzv_endpoint=“IP_ADDRESS” rdzv_backend=“c10d”. And it works. Looks like c10d backend takes care of everything for us.

Looking at the output, I am not sure if parallel part actually happened

(pid=15988, ip=10.231.21.63) {"name": "torchelastic.worker.status.SUCCEEDED", "source": "WORKER", "timestamp": 0, "metadata": {"run_id": "95509630884468593764425877023036505025", **"global_rank": 0**, "group_rank": 0, "worker_id": "16109", "role": "default_role", "hostname": "n231-021-063.novalocal", "state": "SUCCEEDED", "total_run_time": 75, "rdzv_backend": "c10d", "raw_error": null, "metadata": "{\"group_world_size\": 1, \"entry_point\": \"sage_main_routine\", \"local_rank\": [0], \"role_rank\": [0], \"role_world_size\": [1]}", "agent_restarts": 0}}
(pid=952664) {"name": "torchelastic.worker.status.SUCCEEDED", "source": "WORKER", "timestamp": 0, "metadata": {"run_id": "176607332379713185119918774283739049986", **"global_rank": 0**, "group_rank": 0, "worker_id": "952994", "role": "default_role", "hostname": "n231-013-071.novalocal", "state": "SUCCEEDED", "total_run_time": 76, "rdzv_backend": "c10d", "raw_error": null, "metadata": "{\"group_world_size\": 1, \"entry_point\": \"sage_main_routine\", \"local_rank\": [0], \"role_rank\": [0], \"role_world_size\": [1]}", "agent_restarts": 0}}

After it finishes, I got the output above. Both says it used group_rank as 0, my setup for the run is like this, each node has 0 or 1 as rank respectively:

    os.environ["MASTER_ADDR"] = "10.231.13.71"
    os.environ["MASTER_PORT"] = "2345"
    os.environ["WORLD_SIZE"] = "2"
    os.environ["RANK"] = str(rank)

Maybe I still missed something in the os environment var setting? Are these four enough?

Thanks Can!

You are welcome!

OK. looks like it works now. To run on multiple machines, i assume we dont’ need to start an etcd instance first. Just simply set rdzv_endpoint=“IP_ADDRESS” rdzv_backend=“c10d”. And it works. Looks like c10d backend takes care of everything for us.

That is right. The new c10d rendezvous backend does not depend on any 3rd party software. You don’t need to start (or even install) etcd.

From the output it looks like your machines run in silo. Each has a world size of 1 and rank 0.

Since you are using our new launcher API you should not set any environment variables in your training script. Our launcher sets all four variables you listed before executing it.

I suggest checking out our “Distributed Elastic” docs for v1.9 RC here which also lists all the environment variables set for you by the launcher API.

Let me know if you still have trouble after reading the docs.

Cheers.

Finished reading the doc. Now I am a bit confused.

This “Distributed Elastic” page describes how to use a script to start the training. But for my case I was seeking for an API version of this functionality. Based on the earlier response from Howard, it is ready, just apply function elastic_launch() on my entry point function. Are you suggesting this function elastic_launch() is actually not ready for use?

Based on what I tried so far, the function way works pretty well in bringing up the processes on multiple machine (thank you guys!) But somehow I need a good example to learn how to set things up properly.

my current configuration:

sage_main_routine():
    os.environ["MASTER_ADDR"] = "10.231.131.7"
    os.environ["MASTER_PORT"] = "2346"
    os.environ["WORLD_SIZE"] = "2"
    os.environ["RANK"] = str(rank)
    os.environ["GROUP_RANK"] = str(rank)

In the main script: 
        config = LaunchConfig(
            min_nodes=1,
            max_nodes=2,
            nproc_per_node=1,
            rdzv_endpoint="10.231.131.7",
            rdzv_backend="c10d",
        )

        outputs = elastic_launch(config, sage.sage_main_routine)(
            Path(FOLDER), args, self.rank
        )

UPDATE: have the 2-node being silo is my accidentally turn off Parallel module. Now I am having errors related to tcp connection timeouts. Either way I think bringing up the processes works great, it is just need some help on how to set the correct set of variables.

As Howard mentioned the elastic_launch function is not part of our public API (yet); therefore, we do not have docs for it, but reading and understanding how the script works and what environment variables it sets, should be helpful since it is a thin wrapper on top of elastic_launch.

Your sage_main_routine should be actually “reversed” like this:

sage_main_routine(foo):
    # These environment variables are set by `elastic_launch`.
    master_addr = os.environ["MASTER_ADDR"]
    master_port = os.environ["MASTER_PORT"]
    world_size = os.environ["WORLD_SIZE"]
    rank = os.environ["RANK"]

    store = TCPStore(master_addr, master_port, world_size, is_master=rank== 0)

    # Rest of the script...

In the main script: 
        config = LaunchConfig(
            min_nodes=1,
            max_nodes=2,
            nproc_per_node=1,
            rdzv_endpoint="10.231.131.7",
            rdzv_backend="c10d",
        )

        foo = "my dummy value"

        outputs = elastic_launch(config, sage.sage_main_routine)(
           foo, # Your custom script arguments
        )
1 Like

Gotcha! Thanks for the tips!

After reading more into how things are implemented, I think now I know what is going on.

We are integrating pytorch into a system where we spawn multiple processes, and the training happens within one of the sub-processes on one single machine. pytorch package imports get leaked into multiple processes including the parent, thus for each time “Using backend: pytorch” is shown, there is one c10d/rendezvous setup/registration happening automatically. Therefore the distributed modules got confused and in consequence connection issues.

What is the easy way to tell a process when we spawning it, saying “ok, you are (or not) the one to initialize distributed modules”? I guess I can hack into pytorch for now, but I am not sure about side-effects doing so. If there is already a mechanism, that would be awesome.

Now I guess I get the reason why this module was originally provided only as a script.

Thank you so much everyone!

Spawning/forking without an exec in general is always tricky and can introduce subtle bugs (like you had in this case) We do not have a mechanism to detect double initializations and it is unlikely that we can come up with a solution that works for everyone considering the size of our user base.

My only suggestion would be to rethink how you import and initialize your Python modules. Ideally only a single subprocess should import the relevant packages from PyTorch while the rest performs other auxiliary tasks.

Now I guess I get the reason why this module was originally provided only as a script.

That is correct. The internals of how the launcher script works is still under development. So I strongly suggest using the script if possible. We do not guarantee any backwards compatibility for our API since it is not public yet.

Thank you so much everyone!

You are welcome! Good luck with your experiments and let us know if you need any other help!

1 Like

Thanks Can! Really appreciate all the information!

My last question, I want to make sure the “parallel training” is indeed happening, would you please share a few code pointers where processes do the parameters sharing with others. I’d like to print out the parameter (maybe before and after as well) to make sure I have things rolling correctly.

My last question, I want to make sure the “parallel training” is indeed happening, would you please share a few code pointers where processes do the parameters sharing with others.

For clarification, the architecture is multiple nodes and multiple processes within a node, with one process doing the training? Also, are you using a DDP model?

I am using two machines, each machine calls elastic_launch() routine once.

And yes, inside my model definition, I am wrapping my model into a DDP. Two machines runs the same exact training routine.

My last question, I want to make sure the “parallel training” is indeed happening, would you please share a few code pointers where processes do the parameters sharing with others.

DDP tutorial Getting Started with Distributed Data Parallel — PyTorch Tutorials 2.1.1+cu121 documentation.

" DDP uses collective communications in the torch.distributed package to synchronize gradients and buffers. More specifically, DDP registers an autograd hook for each parameter given by model.parameters() and the hook will fire when the corresponding gradient is computed in the backward pass. Then DDP uses that signal to trigger gradient synchronization across processes. Please refer to DDP design note for more details."

You can check before and after .backward().

1 Like