There are many discussion and posts saying that torch can’t use custom stream for nccl communcation. Since ProcessGroupNCCL
maintains its internal stream pool for nccl.
opened 06:07AM - 17 Sep 24 UTC
oncall: distributed
module: cuda
### 🐛 Describe the bug
I found the example that specifying a CUDA stream usin… g a with-context does not work as expected.
```python
with torch.cuda.stream(my_stream):
dist.all_reduce(source)
```
The full test script is provided below. I executed it with `NCCL_DEBUG=INFO NCCL_DEBUG_SUBSYS=ALL OMP_NUM_THREADS=1 torchrun --nnodes 1 --nproc_per_node 2 test.py`.
```python
import os
import torch
import torch.distributed as dist
def run(rank, size):
device = torch.device(f'cuda:{rank}')
torch.cuda.set_device(device)
# create cuda streams
my_stream = torch.cuda.Stream(device)
if rank == 0:
print("[0] NCCL USER stream: ", my_stream)
# source/output tensors
source = torch.ones(10, dtype=torch.float64, device=device)
# execute allreduce with my_stream
with torch.cuda.stream(my_stream):
dist.all_reduce(source)
dist.destroy_process_group()
if __name__ == "__main__":
world_size = int(os.environ['WORLD_SIZE'])
rank = int(os.environ['LOCAL_RANK'])
dist.init_process_group("nccl", rank=rank, world_size=world_size)
run(rank, world_size)
```
I expected that the allgather operation would be executed on my_stream, but it seems that it was actually performed on a different CUDA stream. Below is a snippet of the output:
> [0] NCCL USER stream: <torch.cuda.Stream device=cuda:0 cuda_stream=**0x55f31fee0fd0**>
snail01:2307143:2307143 [0] NCCL INFO AllReduce: opCount 0 sendbuff 0x7f6c97600000 recvbuff 0x7f6c97600000 count 10 datatype 8 op 0 root 0 comm 0x55f32103c600 [nranks=2] stream **0x55f31fe42810**
My question is:
Q: Is there any way to explicitly specify the CUDA stream for collective operations?
Thank you in advance!
### Versions
```
Collecting environment information...
PyTorch version: 2.4.0a0
Is debug build: False
CUDA used to build PyTorch: 12.4
ROCM used to build PyTorch: N/A
OS: Ubuntu 20.04.6 LTS (x86_64)
GCC version: (Ubuntu 9.4.0-1ubuntu1~20.04.2) 9.4.0
Clang version: Could not collect
CMake version: version 3.22.3
Libc version: glibc-2.31
Python version: 3.10.8 (main, Nov 15 2022, 23:41:17) [GCC 7.5.0] (64-bit runtime)
Python platform: Linux-5.4.0-193-generic-x86_64-with-glibc2.31
Is CUDA available: True
CUDA runtime version: 12.4.131
CUDA_MODULE_LOADING set to: LAZY
GPU models and configuration:
GPU 0: NVIDIA A100-PCIE-40GB
GPU 1: NVIDIA A100-PCIE-40GB
GPU 2: Tesla V100-PCIE-16GB
GPU 3: Tesla V100S-PCIE-32GB
Nvidia driver version: 560.35.03
cuDNN version: Could not collect
HIP runtime version: N/A
MIOpen runtime version: N/A
Is XNNPACK available: True
CPU:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 46 bits physical, 48 bits virtual
CPU(s): 56
On-line CPU(s) list: 0-55
Thread(s) per core: 2
Core(s) per socket: 14
Socket(s): 2
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 79
Model name: Intel(R) Xeon(R) CPU E5-2660 v4 @ 2.00GHz
Stepping: 1
CPU MHz: 1201.536
CPU max MHz: 3200.0000
CPU min MHz: 1200.0000
BogoMIPS: 3999.71
Virtualization: VT-x
L1d cache: 896 KiB
L1i cache: 896 KiB
L2 cache: 7 MiB
L3 cache: 70 MiB
NUMA node0 CPU(s): 0-13,28-41
NUMA node1 CPU(s): 14-27,42-55
Vulnerability Gather data sampling: Not affected
Vulnerability Itlb multihit: KVM: Mitigation: Split huge pages
Vulnerability L1tf: Mitigation; PTE Inversion; VMX conditional cache flushes, SMT vulnerable
Vulnerability Mds: Mitigation; Clear CPU buffers; SMT vulnerable
Vulnerability Meltdown: Mitigation; PTI
Vulnerability Mmio stale data: Mitigation; Clear CPU buffers; SMT vulnerable
Vulnerability Retbleed: Not affected
Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl and seccomp
Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2: Mitigation; Retpolines; IBPB conditional; IBRS_FW; STIBP conditional; RSB filling; PBRSB-eIBRS Not affected; BHI Not affected
Vulnerability Srbds: Not affected
Vulnerability Tsx async abort: Mitigation; Clear CPU buffers; SMT vulnerable
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb cat_l3 cdp_l3 invpcid_single pti intel_ppin ssbd ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm cqm rdt_a rdseed adx smap intel_pt xsaveopt cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local dtherm ida arat pln pts md_clear flush_l1d
Versions of relevant libraries:
[pip3] mypy-extensions==0.4.3
[pip3] numpy==1.26.4
[pip3] torch==2.4.0a0+gitunknown
[pip3] torchvision==0.15.0a0+0dceac0
[pip3] triton==3.0.0
[conda] Could not collect
```
cc @XilunWu @H-Huang @awgu @kwen2501 @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @c-p-i-o @ptrblck @msaroufim
opened 04:23AM - 05 Oct 24 UTC
closed 11:02PM - 01 Nov 24 UTC
oncall: distributed
module: nccl
### 🚀 The feature, motivation and pitch
Currently, I dont see any way to run a … NCCL collective on a custom stream.
This is a big limitation and leading frameworks like vLLM have completely ditched PyTorch distributed API and use their own NCCL wrappers like here: https://github.com/vllm-project/vllm/blob/a95354a36ee65523a499b3eb42f70a4a0ea4322d/vllm/distributed/device_communicators/pynccl.py#L104
Is it possible to run the NCCL collectives on the stream that is set by the user context in PyTorch?
This would be a great addition.
### Alternatives
_No response_
### Additional context
_No response_
cc @XilunWu @H-Huang @awgu @kwen2501 @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @c-p-i-o
// The CUDA streams used by NCCL kernels
std::unordered_map<std::string, at::cuda::CUDAStream> ncclStreams_;
when we write code like below, it actually doesn’t really use comm_stream
but as a dummy compute stream
since nccl will make commucation wait for current stream to ensure input data ready.
a = some_compute()
with torch.cuda.stream(comm_stream):
dist.allgather(output, a)
But when I dive into ProcessGroupNCCL. I find torch will choose nccl stream based on whether it’s an async op. If it is, torch will use internal stream from ncclStreams_
which is a map from device to cuda stream. otherwise, it will simply use current cuda stream
template <typename Fn, typename PreProcess, typename PostProcess>
c10::intrusive_ptr<Work> ProcessGroupNCCL::collective(
std::vector<at::Tensor>& inputs,
std::vector<at::Tensor>& outputs,
Fn fn,
PreProcess pre,
PostProcess post,
OpType opType,
bool asyncOp,
const char* profilingTitle,
bool nanCheck) {
// ....
// in asyncOp=false [default] mode, we use currentStream as ncclStream
// otherwise, we use separate ncclStream and let it sync on currentStream
auto ncclStream = asyncOp ? ncclStreams_.at(key)
: at::cuda::getCurrentCUDAStream(device.index());
The two statements above seem somewhat contradictory, and I’m very curious which one is correct. Can torch actually customize the stream that NCCL will use?
1 Like
H-Huang
(Howard Huang)
September 22, 2025, 8:23pm
2
The ProcessGroupNCCL code is what is correct, when you do:
with torch.cuda.stream(comm_stream):
dist.allgather(output, a)
The allgather will happen in the comm_stream
. If no stream is specified and it is async, then it will use the internal stream.
Thanks,so torch has already supported custom NCCL streams to some extent.
I find only latest torch version can use user specified nccl stream
In torch 2.7, ProcessGroupNCCL
just use its internal stream
// Used many times below, so we stash the unordered_map lookup
auto ncclStream = ncclStreams_.at(key);