Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
05d889e
Initial work.
mzient Sep 26, 2022
6f7a95e
NewThreadPool - working
mzient Sep 26, 2022
eabaa73
Add init/exit callbacks.
mzient Sep 28, 2022
09b5562
Fix exception class.
mzient Sep 29, 2022
896a922
[WIP]
mzient Oct 4, 2022
8c5d76c
Add multi-error as a seperate header.
mzient Oct 4, 2022
7667106
[WIP]
mzient Dec 1, 2022
ce62744
[WIP]
mzient Dec 12, 2022
2613088
Experiment.
mzient Feb 2, 2023
b8c70eb
Remove the cc file, currently not used.
mzient Feb 3, 2023
1eeed51
Moving files.
mzient Feb 3, 2023
df4aaa4
Move thread_pool_base to core.
mzient Feb 27, 2023
80a71ea
Add ThreadedExecutionEngine that combines a thread pool reference and…
mzient Feb 27, 2023
a2e6897
Fix after rebase.
mzient Dec 16, 2025
0691f35
[WIP]
mzient Dec 19, 2025
31c5763
Add incremental job. Validate in nvimgcodec.
mzient Jan 5, 2026
4904550
Make destructor virtual.
mzient Jan 5, 2026
da114d5
[WIP]
mzient Jan 8, 2026
1f6662c
[WIP]
mzient Jan 8, 2026
fd1bc5c
Use semaphore. Numberous fixes.
mzient Jan 12, 2026
ae0fb61
Fix: move notification and waiting to one translation unit. Refactoring.
mzient Jan 14, 2026
cc5b63b
Revert nvimgcodec.
mzient Jan 14, 2026
1a91ad2
Fix thread pool name handling.
mzient Jan 14, 2026
faa8863
Tidy up includes.
mzient Jan 14, 2026
5524623
Remove NewThreadPool as untested.
mzient Jan 14, 2026
d8b2d9b
Fix. Add more tests. Refactor tests.
mzient Jan 14, 2026
4a67674
Review issues. Rename Abandon to Discard. Extend docs. Extend lifecyc…
mzient Jan 26, 2026
f9bca08
Remove dead code.
mzient Jan 27, 2026
48c2100
Fix typo.
mzient Jan 29, 2026
7249d33
Typo
mzient Jan 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions dali/core/exec/thread_pool_base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "dali/core/exec/thread_pool_base.h"
#include <stdexcept>
#include <thread>

namespace dali {

JobBase::~JobBase() noexcept(false) {
if (total_tasks_ > 0 && !wait_completed_) {
throw std::logic_error("The job is not empty, but hasn't been discarded or waited for.");
}
while (running_)
std::this_thread::yield();
Comment on lines +25 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

busy-wait in destructor is CPU-intensive

The destructor spins while running_ is true. For long-running tasks, this burns CPU. Consider using a timed wait or documenting expected task duration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greptileai This never waits for more than the time between the notification of the atomic variable num_pending_tasks_ and setting the running_flag_ in DoNotify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely correct - the busy-wait in the destructor only spins for the extremely brief window between the atomic notification of num_pending_tasks_ and the setting of running_ = false at the very end of DoNotify().

Looking at the implementation, this design is actually quite clever. The running_ flag serves as a secondary synchronization mechanism to avoid a race condition where the destructor could be called right after num_pending_tasks_ is decremented but before all the notifications are sent.

The busy-wait duration is bounded to just a few CPU cycles - the time it takes to execute the notification calls and the final running_ = false assignment. This makes the spin time predictable and minimal, so the CPU overhead is negligible compared to the complexity of adding additional synchronization primitives.

}

void JobBase::DoWait() {
if (wait_started_)
throw std::logic_error("This job has already been waited for.");
wait_started_ = true;

if (total_tasks_ == 0) {
wait_completed_ = true;
return;
}

if (executor_ == nullptr)
throw std::logic_error("This job hasn't been run - cannot wait for it.");

auto ready = [&]() { return num_pending_tasks_ == 0; };
if (ThreadPoolBase::this_thread_pool() != nullptr) {
bool result = ThreadPoolBase::this_thread_pool()->WaitOrRunTasks(cv_, ready);
wait_completed_ = true;
if (!result)
throw std::logic_error("The thread pool was stopped");
} else {
int old = num_pending_tasks_.load();
while (old != 0) {
num_pending_tasks_.wait(old);
old = num_pending_tasks_.load();
assert(old >= 0);
}
wait_completed_ = true;
}
}

void JobBase::DoNotify() {
num_pending_tasks_.notify_all();
(void)std::lock_guard(mtx_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a std::lock_guard that immediately goes out of scope (nameless temporary). The lock acquires and releases mtx_ before cv_.notify_all() on line 62. If the intent is to hold the lock during notification, it should be named:

Suggested change
(void)std::lock_guard(mtx_);
std::lock_guard g(mtx_);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greptileai The intention was to acquire the lock and immediately release it, thereby establishing proper sequencing for the reentrant variant of wait function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand - this is a deliberate memory ordering technique. The std::lock_guard creates a temporary that acquires and immediately releases mtx_, establishing a happens-before relationship for the reentrant wait scenario.

However, this pattern is quite unusual and may confuse other developers who expect either:

  1. A named lock guard that holds the mutex during cv_.notify_all()
  2. Or explicit mtx_.lock(); mtx_.unlock(); calls to make the intent clearer

Consider adding a comment to clarify the intent:

Suggested change
(void)std::lock_guard(mtx_);
num_pending_tasks_.notify_all();
// Acquire and immediately release the mutex to establish proper memory ordering
// for the reentrant wait function
(void)std::lock_guard(mtx_);

This makes it clear to future maintainers that the immediate release is intentional for synchronization purposes.

cv_.notify_all();
Comment on lines +61 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unusual memory ordering pattern - consider adding a comment

This intentional lock/unlock pattern for happens-before relationship is uncommon. Add a comment:

Suggested change
(void)std::lock_guard(mtx_);
cv_.notify_all();
// Acquire and immediately release the mutex to establish happens-before for reentrant wait
(void)std::lock_guard(mtx_);

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

// We need this second flag to avoid a race condition where the destructor is called between
// decrementing num_pending_tasks_ and notification_ without excessive use of mutexes.
// This must be the very last operation in the task function that touches `this`.
running_ = false;
}

// Job ////////////////////////////////////////////////////////////////////

void Job::Run(ThreadPoolBase &tp, bool wait) {
if (executor_ != nullptr)
throw std::logic_error("This job has already been started.");
executor_ = &tp;
running_ = !tasks_.empty();
{
auto batch = tp.BeginBulkAdd();
for (auto &x : tasks_) {
batch.Add(std::move(x.second.func));
}
int added = batch.Size();
if (added) {
num_pending_tasks_ += added;
running_ = true;
}
batch.Submit();
}
if (wait && !tasks_.empty())
Wait();
}

void Job::Wait() {
DoWait();

// note - this vector is not allocated unless there were exceptions thrown
std::vector<std::exception_ptr> errors;
for (auto &x : tasks_) {
if (x.second.error)
errors.push_back(std::move(x.second.error));
}
if (errors.size() == 1)
std::rethrow_exception(errors[0]);
else if (errors.size() > 1)
throw MultipleErrors(std::move(errors));
}

void Job::Discard() {
if (executor_ != nullptr)
throw std::logic_error("Cannot discard a job that has already been started");
tasks_.clear();
total_tasks_ = 0;
}

// IncrementalJob /////////////////////////////////////////////////////////

void IncrementalJob::Run(ThreadPoolBase &tp, bool wait) {
if (executor_ && executor_ != &tp)
throw std::logic_error("This job is already running in a different executor.");
executor_ = &tp;
{
auto it = last_task_run_.has_value() ? std::next(*last_task_run_) : tasks_.begin();
auto batch = tp.BeginBulkAdd();
for (; it != tasks_.end(); ++it) {
batch.Add(std::move(it->func));
last_task_run_ = it;
}
int added = batch.Size();
if (added) {
num_pending_tasks_ += added;
running_ = true;
}
batch.Submit();
}
if (wait && !tasks_.empty())
Wait();
}

void IncrementalJob::Discard() {
if (executor_)
throw std::logic_error("Cannot discard a job that has already been started");
tasks_.clear();
total_tasks_ = 0;
}

void IncrementalJob::Wait() {
DoWait();
// note - this vector is not allocated unless there were exceptions thrown
std::vector<std::exception_ptr> errors;
for (auto &x : tasks_) {
if (x.error)
errors.push_back(std::move(x.error));
}
if (errors.size() == 1)
std::rethrow_exception(errors[0]);
else if (errors.size() > 1)
throw MultipleErrors(std::move(errors));
}

///////////////////////////////////////////////////////////////////////////

thread_local ThreadPoolBase *ThreadPoolBase::this_thread_pool_ = nullptr;
thread_local int ThreadPoolBase::this_thread_idx_ = -1;

void ThreadPoolBase::Init(int num_threads, const std::function<OnThreadStartFn> &on_thread_start) {
if (shutdown_pending_)
throw std::logic_error("The thread pool is being shut down.");
std::lock_guard<std::mutex> g(mtx_);
if (!threads_.empty())
throw std::logic_error("The thread pool is already started!");
threads_.reserve(num_threads);
for (int i = 0; i < num_threads; i++)
threads_.push_back(std::thread(&ThreadPoolBase::Run, this, i, on_thread_start));
}

void ThreadPoolBase::Shutdown(bool join) {
if ((shutdown_pending_ && !join) || threads_.empty())
return;
{
std::lock_guard<std::mutex> g(mtx_);
if (shutdown_pending_ && !join)
return;
shutdown_pending_ = true;
sem_.release(threads_.size());
}

for (auto &t : threads_)
t.join();
threads_.clear();
}

void ThreadPoolBase::AddTaskNoLock(TaskFunc &&f) {
if (shutdown_pending_)
throw std::logic_error("The thread pool is stopped and no longer accepts new tasks.");
tasks_.push(std::move(f));
}

void ThreadPoolBase::AddTask(TaskFunc &&f) {
{
std::lock_guard<std::mutex> g(mtx_);
AddTaskNoLock(std::move(f));
}
sem_.release(1);
}

void ThreadPoolBase::Run(
int index,
const std::function<OnThreadStartFn> &on_thread_start) noexcept {
this_thread_pool_ = this;
this_thread_idx_ = index;
std::any scope;
if (on_thread_start)
scope = on_thread_start(index);
while (!shutdown_pending_ || !tasks_.empty()) {
sem_.acquire();
std::unique_lock lock(mtx_);
if (shutdown_pending_)
break;
assert(!tasks_.empty() && "Semaphore acquired but no tasks present.");
PopAndRunTask(lock);
}
}

void ThreadPoolBase::PopAndRunTask(std::unique_lock<std::mutex> &lock) {
TaskFunc t = std::move(tasks_.front());
tasks_.pop();
lock.unlock();
t();
lock.lock();
}

template <typename Condition>
bool ThreadPoolBase::WaitOrRunTasks(std::condition_variable &cv, Condition &&condition) {
assert(this_thread_pool() == this);
std::unique_lock lock(mtx_);
while (!shutdown_pending_ || !tasks_.empty()) {
bool ret;
while (!(ret = condition()) && tasks_.empty())
cv.wait_for(lock, std::chrono::microseconds(100));

if (ret || condition()) // re-evaluate the condition, just in case
return true;
if (shutdown_pending_)
return condition();
if (!sem_.try_acquire())
continue;

assert(!tasks_.empty() && "Semaphore acquired but no tasks present.");
PopAndRunTask(lock);
}
return condition();
}

} // namespace dali
Loading