ThreadManager: Use separate pool for IO blocking.

This prevents starving the compute pool (which may be used very regularly
parallel loops or other tasks) if the IO operations are slow.
This commit is contained in:
Unknown W. Brackets 2021-12-05 20:30:37 -08:00
parent 8b5173350f
commit f9a7ad3e3d

View file

@ -21,12 +21,14 @@
// is not fair. However, we ignore that for now.
const int MAX_CORES_TO_USE = 16;
const int EXTRA_THREADS = 4; // For I/O limited tasks
const int MIN_IO_BLOCKING_THREADS = 4;
struct GlobalThreadContext {
std::mutex mutex; // associated with each respective condition variable
std::deque<Task *> queue;
std::atomic<int> queue_size;
std::deque<Task *> compute_queue;
std::atomic<int> compute_queue_size;
std::deque<Task *> io_queue;
std::atomic<int> io_queue_size;
std::vector<ThreadContext *> threads_;
std::atomic<int> roundRobin;
@ -38,13 +40,15 @@ struct ThreadContext {
std::mutex mutex; // protects the local queue.
std::atomic<int> queue_size;
int index;
TaskType type;
std::atomic<bool> cancelled;
std::atomic<Task *> private_single;
std::deque<Task *> private_queue;
};
ThreadManager::ThreadManager() : global_(new GlobalThreadContext()) {
global_->queue_size = 0;
global_->compute_queue_size = 0;
global_->io_queue_size = 0;
global_->roundRobin = 0;
}
@ -60,19 +64,23 @@ void ThreadManager::Teardown() {
}
// Purge any cancellable tasks while the threads shut down.
bool done = false;
while (!done) {
done = true;
if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) {
auto drainQueue = [&](std::deque<Task *> &queue, std::atomic<int> &size) {
for (auto it = queue.begin(); it != queue.end(); ++it) {
if (TeardownTask(*it, false)) {
queue.erase(it);
size--;
return false;
}
}
return true;
};
std::unique_lock<std::mutex> lock(global_->mutex);
for (auto it = global_->queue.begin(); it != global_->queue.end(); ++it) {
if (TeardownTask(*it, false)) {
global_->queue.erase(it);
global_->queue_size--;
done = false;
break;
}
}
while (!drainQueue(global_->compute_queue, global_->compute_queue_size))
continue;
while (!drainQueue(global_->io_queue, global_->io_queue_size))
continue;
}
for (ThreadContext *&threadCtx : global_->threads_) {
@ -86,7 +94,7 @@ void ThreadManager::Teardown() {
}
global_->threads_.clear();
if (global_->queue_size > 0) {
if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) {
WARN_LOG(SYSTEM, "ThreadManager::Teardown() with tasks still enqueued");
}
}
@ -102,8 +110,15 @@ bool ThreadManager::TeardownTask(Task *task, bool enqueue) {
}
if (enqueue) {
global_->queue.push_back(task);
global_->queue_size++;
if (task->Type() == TaskType::CPU_COMPUTE) {
global_->compute_queue.push_back(task);
global_->compute_queue_size++;
} else if (task->Type() == TaskType::CPU_COMPUTE) {
global_->io_queue.push_back(task);
global_->io_queue_size++;
} else {
_assert_(false);
}
}
return false;
}
@ -112,17 +127,26 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
char threadName[16];
snprintf(threadName, sizeof(threadName), "PoolWorker %d", thread->index);
SetCurrentThreadName(threadName);
const bool isCompute = thread->type == TaskType::CPU_COMPUTE;
const auto global_queue_size = [isCompute, &global]() -> int {
return isCompute ? global->compute_queue_size.load() : global->io_queue_size.load();
};
while (!thread->cancelled) {
Task *task = thread->private_single.exchange(nullptr);
// Check the global queue first, then check the private queue and wait if there's nothing to do.
if (!task && global->queue_size.load() > 0) {
if (!task && global_queue_size() > 0) {
// Grab one from the global queue if there is any.
std::unique_lock<std::mutex> lock(global->mutex);
if (!global->queue.empty()) {
task = global->queue.front();
global->queue.pop_front();
global->queue_size--;
auto &queue = isCompute ? global->compute_queue : global->io_queue;
auto &queue_size = isCompute ? global->compute_queue_size : global->io_queue_size;
if (!queue.empty()) {
task = queue.front();
queue.pop_front();
queue_size--;
// We are processing one now, so mark that.
thread->queue_size++;
@ -132,12 +156,19 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
if (!task) {
std::unique_lock<std::mutex> lock(thread->mutex);
// We must check both queue and single again, while locked.
bool wait = true;
if (!thread->private_queue.empty()) {
task = thread->private_queue.front();
thread->private_queue.pop_front();
} else if (!thread->private_single && !thread->cancelled && global->queue_size.load() == 0) {
thread->cond.wait(lock);
wait = false;
} else if (thread->private_single || thread->cancelled) {
wait = false;
} else {
wait = global_queue_size() == 0;
}
if (wait)
thread->cond.wait(lock);
}
// The task itself takes care of notifying anyone waiting on it. Not the
// responsibility of the ThreadManager (although it could be!).
@ -157,7 +188,8 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) {
}
numComputeThreads_ = std::min(numRealCores * numLogicalCoresPerCpu, MAX_CORES_TO_USE);
int numThreads = numComputeThreads_ + EXTRA_THREADS;
// Double it for the IO blocking threads.
int numThreads = numComputeThreads_ + std::max(MIN_IO_BLOCKING_THREADS, numComputeThreads_);
numThreads_ = numThreads;
INFO_LOG(SYSTEM, "ThreadManager::Init(compute threads: %d, all: %d)", numComputeThreads_, numThreads_);
@ -166,6 +198,7 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) {
ThreadContext *thread = new ThreadContext();
thread->cancelled.store(false);
thread->private_single.store(nullptr);
thread->type = i < numComputeThreads_ ? TaskType::CPU_COMPUTE : TaskType::IO_BLOCKING;
thread->thread = std::thread(&WorkerThreadFunc, global_, thread);
thread->index = i;
global_->threads_.push_back(thread);
@ -175,24 +208,21 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) {
void ThreadManager::EnqueueTask(Task *task) {
_assert_msg_(IsInitialized(), "ThreadManager not initialized");
int minThread;
int maxThread;
int threadOffset = 0;
if (task->Type() == TaskType::CPU_COMPUTE) {
// only the threads reserved for heavy compute.
minThread = 0;
maxThread = numComputeThreads_;
threadOffset = 0;
} else {
// any free thread
// Only IO blocking threads (to avoid starving compute threads.)
minThread = numComputeThreads_;
maxThread = numThreads_;
threadOffset = numComputeThreads_;
}
// Find a thread with no outstanding work.
int threadNum = threadOffset;
for (int i = 0; i < maxThread; i++, threadNum++) {
if (threadNum >= global_->threads_.size()) {
threadNum = 0;
}
_assert_(maxThread <= global_->threads_.size());
for (int threadNum = minThread; threadNum < maxThread; threadNum++) {
ThreadContext *thread = global_->threads_[threadNum];
if (thread->queue_size.load() == 0) {
std::unique_lock<std::mutex> lock(thread->mutex);
@ -208,13 +238,22 @@ void ThreadManager::EnqueueTask(Task *task) {
// Not particularly scientific, but hopefully we should not run into this too much.
{
std::unique_lock<std::mutex> lock(global_->mutex);
global_->queue.push_back(task);
global_->queue_size++;
if (task->Type() == TaskType::CPU_COMPUTE) {
global_->compute_queue.push_back(task);
global_->compute_queue_size++;
} else if (task->Type() == TaskType::IO_BLOCKING) {
global_->io_queue.push_back(task);
global_->io_queue_size++;
} else {
_assert_(false);
}
}
// Lock the thread to ensure it gets the message.
int chosenIndex = global_->roundRobin++;
ThreadContext *&chosenThread = global_->threads_[chosenIndex % maxThread];
chosenIndex = minThread + (chosenIndex % (maxThread - minThread));
ThreadContext *&chosenThread = global_->threads_[chosenIndex];
// Lock the thread to ensure it gets the message.
std::unique_lock<std::mutex> lock(chosenThread->mutex);
chosenThread->cond.notify_one();
}