Figuring out how the autograd engine doesn't block while initializing threads

Hello, I am new to PyTorch and have been trying to figure out more about what makes it work behind the scenes.

I am basically getting stuck in what I think is a permanently blocked wait call, but clearly is not since everything runs fine. I will try to go line by line of what I think is going on. I apologize in advance if this type of question is not allowed, but I have been staring at the source code for a couple hours over the past two nights without much luck.

https://github.com/pytorch/pytorch/blob/master/torch/csrc/autograd/engine.cpp

auto Engine::execute(const edge_list& roots,
                     const variable_list& inputs,
                     bool keep_graph,
                     bool create_graph,
                     const edge_list& outputs) -> variable_list {
  std::call_once(start_threads_flag, &Engine::start_threads, this);
  • The first line of Engine::execute is calling the Engine::start_threads function
auto Engine::start_threads() -> void {
  int num_devices = at::getNumGPUs();
  // One for CPU, plus one for every GPU device
  int num_threads = num_devices + 1;
  ready_queues = std::vector<std::shared_ptr<ReadyQueue>>(num_threads);
  for (auto& queue : ready_queues)
    queue.reset(new ReadyQueue());
  for (int i = 0; i < num_threads; ++i) {
    std::thread t(&Engine::thread_init, this, i - 1);
    t.detach();
  }
}
  • Engine::start_threads is tallying up the number of available devices and creating a vector of a ReadyQueue pointers with as many pointers as devices (there will be one ReadyQueue per device)
  • We then reset each pointer to point to a new ReadyQueue
  • Next each thread is initialized with Engine::thread_init.
// TODO: Engine is not written in a way that it can deal with anything that's
// not CUDA.
auto Engine::thread_init(int device) -> void {
  THInferNumThreads();
  // NB: We MUST NOT construct the guard for device -1,
  // as in some settings we compile with cuda, but
  // have lazy stubs for CUDA functionality (so actually
  // attempting to setup a guard(-1) will cause an
  // error, because it will still query cudaGetDevice).
  at::OptionalDeviceGuard guard;
  if (device != -1) {
    if (at::hasCUDA()) {
      guard.reset_device(at::Device(at::DeviceType::CUDA, device));
    }
    if (at::hasHIP()) {
      guard.reset_device(at::Device(at::DeviceType::HIP, device));
    }
  }
  worker_device = device;
  thread_main(nullptr);
}
  • worker_device is a thread local variable and will be increasing for each device

Calling thread_main with a nullptr is where I start to run into trouble.

auto Engine::thread_main(GraphTask *graph_task) -> void {
  auto queue = ready_queues[worker_device + 1];
  // Why the test on graph_task->outstanding_tasks?  See
  // Note [Reentrant backwards]
  while (!graph_task || graph_task->outstanding_tasks > 0) {
    FunctionTask task = queue->pop();
    if (task.fn && !task.base->has_error.load()) {
      GradMode::set_enabled(task.base->grad_mode);
      try {
        evaluate_function(task);
      } catch (std::exception& e) {
        thread_on_exception(task, e);
      }
    }
    // Notify downstream about the completion of tasks depending
    // on both where the task was executed, and who owned the overall
    // graph (in case of reentrant execution.)  See Note [Reentrant backwards].
    auto base_owner = task.base->owner;
    // Task from a non-worker thread. Easy case.
    if (base_owner == NO_DEVICE) {
      if (--task.base->outstanding_tasks == 0) {
        std::lock_guard<std::mutex> lock(task.base->mutex);
        task.base->not_done.notify_all();
      }
    } else {
      // If it's a task initiated from this thread, decrease the counter, but
      // don't do anything - loop condition will do all checks for us next.
      if (base_owner == worker_device) {
        --task.base->outstanding_tasks;
      // Otherwise send a dummy function task to the owning thread just to
      // ensure that it's not sleeping. If it has work, it might see that
      // graph_task->outstanding_tasks == 0 before it gets to the task, but
      // it's a no-op anyway.
      } else if (base_owner != worker_device) {
        if (--task.base->outstanding_tasks == 0) {
          // Synchronize outstanding_tasks with queue mutex
          std::atomic_thread_fence(std::memory_order_release);
          ready_queue(base_owner).push(FunctionTask(task.base, nullptr, InputBuffer(0)));
        }
      }
    }
  }
}
  • First, we get a ReadyQueue pointer from our ready_queues vector
  • Next, we enter the while loop as !graph_task is !nullptr which is true
  • We then pop a task off our ReadyQueue
auto ReadyQueue::pop() -> FunctionTask {
  std::unique_lock<std::mutex> lock(mutex);
  not_empty.wait(lock, [this]{ return !heap.empty(); });
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
  auto task = std::move(const_cast<FunctionTask&>(heap.top())); heap.pop();
  return task;
}
  • We first lock our mutex
  • Now we wait and block until the heap (our ReadyQueue) is not empty

The issue I have is that the ReadyQueue is empty and this won’t unblock, and we will always be waiting for the RQ to be non-empty? How does this release and unblock to allow Function tasks to be added to our queue?

Thanks!

EDIT: Can be removed, needed to brush on some of my C++ concurrency :slight_smile: Think I got it now.

Hi,

The point is that the thread_init function is called in the newly created thread and thus asynchronously from the main thread that adds work to the queues.