From f9a7ad3e3d19af3ad5461d4ec8011f89445e416e Mon Sep 17 00:00:00 2001 From: "Unknown W. Brackets" Date: Sun, 5 Dec 2021 20:30:37 -0800 Subject: [PATCH] ThreadManager: Use separate pool for IO blocking. This prevents starving the compute pool (which may be used very regularly parallel loops or other tasks) if the IO operations are slow. --- Common/Thread/ThreadManager.cpp | 117 +++++++++++++++++++++----------- 1 file changed, 78 insertions(+), 39 deletions(-) diff --git a/Common/Thread/ThreadManager.cpp b/Common/Thread/ThreadManager.cpp index 1a48645357..588e88d520 100644 --- a/Common/Thread/ThreadManager.cpp +++ b/Common/Thread/ThreadManager.cpp @@ -21,12 +21,14 @@ // is not fair. However, we ignore that for now. const int MAX_CORES_TO_USE = 16; -const int EXTRA_THREADS = 4; // For I/O limited tasks +const int MIN_IO_BLOCKING_THREADS = 4; struct GlobalThreadContext { std::mutex mutex; // associated with each respective condition variable - std::deque queue; - std::atomic queue_size; + std::deque compute_queue; + std::atomic compute_queue_size; + std::deque io_queue; + std::atomic io_queue_size; std::vector threads_; std::atomic roundRobin; @@ -38,13 +40,15 @@ struct ThreadContext { std::mutex mutex; // protects the local queue. std::atomic queue_size; int index; + TaskType type; std::atomic cancelled; std::atomic private_single; std::deque private_queue; }; ThreadManager::ThreadManager() : global_(new GlobalThreadContext()) { - global_->queue_size = 0; + global_->compute_queue_size = 0; + global_->io_queue_size = 0; global_->roundRobin = 0; } @@ -60,19 +64,23 @@ void ThreadManager::Teardown() { } // Purge any cancellable tasks while the threads shut down. - bool done = false; - while (!done) { - done = true; + if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) { + auto drainQueue = [&](std::deque &queue, std::atomic &size) { + for (auto it = queue.begin(); it != queue.end(); ++it) { + if (TeardownTask(*it, false)) { + queue.erase(it); + size--; + return false; + } + } + return true; + }; std::unique_lock lock(global_->mutex); - for (auto it = global_->queue.begin(); it != global_->queue.end(); ++it) { - if (TeardownTask(*it, false)) { - global_->queue.erase(it); - global_->queue_size--; - done = false; - break; - } - } + while (!drainQueue(global_->compute_queue, global_->compute_queue_size)) + continue; + while (!drainQueue(global_->io_queue, global_->io_queue_size)) + continue; } for (ThreadContext *&threadCtx : global_->threads_) { @@ -86,7 +94,7 @@ void ThreadManager::Teardown() { } global_->threads_.clear(); - if (global_->queue_size > 0) { + if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) { WARN_LOG(SYSTEM, "ThreadManager::Teardown() with tasks still enqueued"); } } @@ -102,8 +110,15 @@ bool ThreadManager::TeardownTask(Task *task, bool enqueue) { } if (enqueue) { - global_->queue.push_back(task); - global_->queue_size++; + if (task->Type() == TaskType::CPU_COMPUTE) { + global_->compute_queue.push_back(task); + global_->compute_queue_size++; + } else if (task->Type() == TaskType::CPU_COMPUTE) { + global_->io_queue.push_back(task); + global_->io_queue_size++; + } else { + _assert_(false); + } } return false; } @@ -112,17 +127,26 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) char threadName[16]; snprintf(threadName, sizeof(threadName), "PoolWorker %d", thread->index); SetCurrentThreadName(threadName); + + const bool isCompute = thread->type == TaskType::CPU_COMPUTE; + const auto global_queue_size = [isCompute, &global]() -> int { + return isCompute ? global->compute_queue_size.load() : global->io_queue_size.load(); + }; + while (!thread->cancelled) { Task *task = thread->private_single.exchange(nullptr); // Check the global queue first, then check the private queue and wait if there's nothing to do. - if (!task && global->queue_size.load() > 0) { + if (!task && global_queue_size() > 0) { // Grab one from the global queue if there is any. std::unique_lock lock(global->mutex); - if (!global->queue.empty()) { - task = global->queue.front(); - global->queue.pop_front(); - global->queue_size--; + auto &queue = isCompute ? global->compute_queue : global->io_queue; + auto &queue_size = isCompute ? global->compute_queue_size : global->io_queue_size; + + if (!queue.empty()) { + task = queue.front(); + queue.pop_front(); + queue_size--; // We are processing one now, so mark that. thread->queue_size++; @@ -132,12 +156,19 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) if (!task) { std::unique_lock lock(thread->mutex); // We must check both queue and single again, while locked. + bool wait = true; if (!thread->private_queue.empty()) { task = thread->private_queue.front(); thread->private_queue.pop_front(); - } else if (!thread->private_single && !thread->cancelled && global->queue_size.load() == 0) { - thread->cond.wait(lock); + wait = false; + } else if (thread->private_single || thread->cancelled) { + wait = false; + } else { + wait = global_queue_size() == 0; } + + if (wait) + thread->cond.wait(lock); } // The task itself takes care of notifying anyone waiting on it. Not the // responsibility of the ThreadManager (although it could be!). @@ -157,7 +188,8 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) { } numComputeThreads_ = std::min(numRealCores * numLogicalCoresPerCpu, MAX_CORES_TO_USE); - int numThreads = numComputeThreads_ + EXTRA_THREADS; + // Double it for the IO blocking threads. + int numThreads = numComputeThreads_ + std::max(MIN_IO_BLOCKING_THREADS, numComputeThreads_); numThreads_ = numThreads; INFO_LOG(SYSTEM, "ThreadManager::Init(compute threads: %d, all: %d)", numComputeThreads_, numThreads_); @@ -166,6 +198,7 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) { ThreadContext *thread = new ThreadContext(); thread->cancelled.store(false); thread->private_single.store(nullptr); + thread->type = i < numComputeThreads_ ? TaskType::CPU_COMPUTE : TaskType::IO_BLOCKING; thread->thread = std::thread(&WorkerThreadFunc, global_, thread); thread->index = i; global_->threads_.push_back(thread); @@ -175,24 +208,21 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) { void ThreadManager::EnqueueTask(Task *task) { _assert_msg_(IsInitialized(), "ThreadManager not initialized"); + int minThread; int maxThread; - int threadOffset = 0; if (task->Type() == TaskType::CPU_COMPUTE) { // only the threads reserved for heavy compute. + minThread = 0; maxThread = numComputeThreads_; - threadOffset = 0; } else { - // any free thread + // Only IO blocking threads (to avoid starving compute threads.) + minThread = numComputeThreads_; maxThread = numThreads_; - threadOffset = numComputeThreads_; } // Find a thread with no outstanding work. - int threadNum = threadOffset; - for (int i = 0; i < maxThread; i++, threadNum++) { - if (threadNum >= global_->threads_.size()) { - threadNum = 0; - } + _assert_(maxThread <= global_->threads_.size()); + for (int threadNum = minThread; threadNum < maxThread; threadNum++) { ThreadContext *thread = global_->threads_[threadNum]; if (thread->queue_size.load() == 0) { std::unique_lock lock(thread->mutex); @@ -208,13 +238,22 @@ void ThreadManager::EnqueueTask(Task *task) { // Not particularly scientific, but hopefully we should not run into this too much. { std::unique_lock lock(global_->mutex); - global_->queue.push_back(task); - global_->queue_size++; + if (task->Type() == TaskType::CPU_COMPUTE) { + global_->compute_queue.push_back(task); + global_->compute_queue_size++; + } else if (task->Type() == TaskType::IO_BLOCKING) { + global_->io_queue.push_back(task); + global_->io_queue_size++; + } else { + _assert_(false); + } } - // Lock the thread to ensure it gets the message. int chosenIndex = global_->roundRobin++; - ThreadContext *&chosenThread = global_->threads_[chosenIndex % maxThread]; + chosenIndex = minThread + (chosenIndex % (maxThread - minThread)); + ThreadContext *&chosenThread = global_->threads_[chosenIndex]; + + // Lock the thread to ensure it gets the message. std::unique_lock lock(chosenThread->mutex); chosenThread->cond.notify_one(); }