Hello, I am new to PyTorch and have been trying to figure out more about what makes it work behind the scenes.
I am basically getting stuck in what I think is a permanently blocked wait
call, but clearly is not since everything runs fine. I will try to go line by line of what I think is going on. I apologize in advance if this type of question is not allowed, but I have been staring at the source code for a couple hours over the past two nights without much luck.
https://github.com/pytorch/pytorch/blob/master/torch/csrc/autograd/engine.cpp
auto Engine::execute(const edge_list& roots,
const variable_list& inputs,
bool keep_graph,
bool create_graph,
const edge_list& outputs) -> variable_list {
std::call_once(start_threads_flag, &Engine::start_threads, this);
- The first line of
Engine::execute
is calling theEngine::start_threads
function
auto Engine::start_threads() -> void {
int num_devices = at::getNumGPUs();
// One for CPU, plus one for every GPU device
int num_threads = num_devices + 1;
ready_queues = std::vector<std::shared_ptr<ReadyQueue>>(num_threads);
for (auto& queue : ready_queues)
queue.reset(new ReadyQueue());
for (int i = 0; i < num_threads; ++i) {
std::thread t(&Engine::thread_init, this, i - 1);
t.detach();
}
}
-
Engine::start_threads
is tallying up the number of available devices and creating a vector of aReadyQueue
pointers with as many pointers as devices (there will be oneReadyQueue
per device) - We then reset each pointer to point to a new
ReadyQueue
- Next each thread is initialized with
Engine::thread_init
.
// TODO: Engine is not written in a way that it can deal with anything that's
// not CUDA.
auto Engine::thread_init(int device) -> void {
THInferNumThreads();
// NB: We MUST NOT construct the guard for device -1,
// as in some settings we compile with cuda, but
// have lazy stubs for CUDA functionality (so actually
// attempting to setup a guard(-1) will cause an
// error, because it will still query cudaGetDevice).
at::OptionalDeviceGuard guard;
if (device != -1) {
if (at::hasCUDA()) {
guard.reset_device(at::Device(at::DeviceType::CUDA, device));
}
if (at::hasHIP()) {
guard.reset_device(at::Device(at::DeviceType::HIP, device));
}
}
worker_device = device;
thread_main(nullptr);
}
-
worker_device
is a thread local variable and will be increasing for each device
Calling thread_main
with a nullptr
is where I start to run into trouble.
auto Engine::thread_main(GraphTask *graph_task) -> void {
auto queue = ready_queues[worker_device + 1];
// Why the test on graph_task->outstanding_tasks? See
// Note [Reentrant backwards]
while (!graph_task || graph_task->outstanding_tasks > 0) {
FunctionTask task = queue->pop();
if (task.fn && !task.base->has_error.load()) {
GradMode::set_enabled(task.base->grad_mode);
try {
evaluate_function(task);
} catch (std::exception& e) {
thread_on_exception(task, e);
}
}
// Notify downstream about the completion of tasks depending
// on both where the task was executed, and who owned the overall
// graph (in case of reentrant execution.) See Note [Reentrant backwards].
auto base_owner = task.base->owner;
// Task from a non-worker thread. Easy case.
if (base_owner == NO_DEVICE) {
if (--task.base->outstanding_tasks == 0) {
std::lock_guard<std::mutex> lock(task.base->mutex);
task.base->not_done.notify_all();
}
} else {
// If it's a task initiated from this thread, decrease the counter, but
// don't do anything - loop condition will do all checks for us next.
if (base_owner == worker_device) {
--task.base->outstanding_tasks;
// Otherwise send a dummy function task to the owning thread just to
// ensure that it's not sleeping. If it has work, it might see that
// graph_task->outstanding_tasks == 0 before it gets to the task, but
// it's a no-op anyway.
} else if (base_owner != worker_device) {
if (--task.base->outstanding_tasks == 0) {
// Synchronize outstanding_tasks with queue mutex
std::atomic_thread_fence(std::memory_order_release);
ready_queue(base_owner).push(FunctionTask(task.base, nullptr, InputBuffer(0)));
}
}
}
}
}
- First, we get a
ReadyQueue
pointer from ourready_queues
vector - Next, we enter the while loop as
!graph_task
is!nullptr
which istrue
- We then pop a task off our
ReadyQueue
…
auto ReadyQueue::pop() -> FunctionTask {
std::unique_lock<std::mutex> lock(mutex);
not_empty.wait(lock, [this]{ return !heap.empty(); });
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
auto task = std::move(const_cast<FunctionTask&>(heap.top())); heap.pop();
return task;
}
- We first lock our mutex
- Now we
wait
and block until the heap (ourReadyQueue
) is not empty
The issue I have is that the ReadyQueue
is empty and this won’t unblock, and we will always be waiting for the RQ to be non-empty? How does this release and unblock to allow Function tasks to be added to our queue?
Thanks!
EDIT: Can be removed, needed to brush on some of my C++ concurrency Think I got it now.