I want to use torch::cuda::nccl::reduce(inputs, output, root, op, streams, user_comms); to do reduce, I pass the inputs, output, and root.
I saw the code from torch/csrc/cuda/nccl.cpp, and I didn’t saw the cuStreamSynchronize(), but in nccl example there are need cuStreamSynchronize() after ncclReduce(). So if there do not need cuStreamSynchronize() how to make sure the data is reduce done?:
void reduce(
const std::vectorat::Tensor& inputs,
at::Tensor& output,
int32_t root,
int32_t op,
const stream_list& streams,
const comm_list& user_comms) {
#ifdef USE_NCCL
using namespace torch::cuda::nccl::detail;
TORCH_CHECK(
root >= 0 && static_cast<size_t>(root) < inputs.size(), “invalid root”);
check_inputs(inputs, output, root, 1, 1);
const auto len = inputs.size();
auto data_type = to_nccl_data_type(inputs[0]);
const auto count = inputs[0].numel();
auto comms_ref = user_comms.empty() ? get_communicators(inputs)
: ArrayRef<ncclComm_t>(user_comms);
AutoNcclGroup nccl_group_guard;
at::cuda::OptionalCUDAGuard device_guard;
for (const auto i : c10::irange(len)) {
int device = inputs[i].device().index();
device_guard.set_index(device);
// Default to the current stream
const auto stream = (streams.empty() || !streams[i])
? at::cuda::getCurrentCUDAStream(device).stream()
: streams[i]->stream();
ncclComm_t comm = comms_ref[i];
NCCL_CHECK(ncclReduce(
inputs[i].data_ptr(),
static_cast<std::remove_cv_t<decltype(i)>>(root) == i
? output.data_ptr()
: nullptr,
count,
data_type,
to_nccl_red_op(op),
root,
to_nccl_comm(comm),
stream));
}
#else
AT_ERROR(“PyTorch built without NCCL support”);
#endif
}