diff --git a/Common/Thread/ThreadManager.cpp b/Common/Thread/ThreadManager.cpp index 9e87c1cfd6..318207c645 100644 --- a/Common/Thread/ThreadManager.cpp +++ b/Common/Thread/ThreadManager.cpp @@ -22,12 +22,13 @@ const int MAX_CORES_TO_USE = 16; const int MIN_IO_BLOCKING_THREADS = 4; +static constexpr size_t TASK_PRIORITY_COUNT = (size_t)TaskPriority::COUNT; struct GlobalThreadContext { std::mutex mutex; // associated with each respective condition variable - std::deque compute_queue; + std::deque compute_queue[TASK_PRIORITY_COUNT]; std::atomic compute_queue_size; - std::deque io_queue; + std::deque io_queue[TASK_PRIORITY_COUNT]; std::atomic io_queue_size; std::vector threads_; @@ -42,7 +43,7 @@ struct ThreadContext { int index; TaskType type; std::atomic cancelled; - std::deque private_queue; + std::deque private_queue[TASK_PRIORITY_COUNT]; char name[16]; }; @@ -65,12 +66,14 @@ void ThreadManager::Teardown() { // Purge any cancellable tasks while the threads shut down. if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) { - auto drainQueue = [&](std::deque &queue, std::atomic &size) { - for (auto it = queue.begin(); it != queue.end(); ++it) { - if (TeardownTask(*it, false)) { - queue.erase(it); - size--; - return false; + auto drainQueue = [&](std::deque queue[TASK_PRIORITY_COUNT], std::atomic &size) { + for (size_t i = 0; i < TASK_PRIORITY_COUNT; ++i) { + for (auto it = queue[i].begin(); it != queue[i].end(); ++it) { + if (TeardownTask(*it, false)) { + queue[i].erase(it); + size--; + return false; + } } } return true; @@ -86,8 +89,10 @@ void ThreadManager::Teardown() { for (ThreadContext *&threadCtx : global_->threads_) { threadCtx->thread.join(); // TODO: Is it better to just delete these? - for (Task *task : threadCtx->private_queue) { - TeardownTask(task, true); + for (size_t i = 0; i < TASK_PRIORITY_COUNT; ++i) { + for (Task *task : threadCtx->private_queue[i]) { + TeardownTask(task, true); + } } delete threadCtx; } @@ -109,11 +114,12 @@ bool ThreadManager::TeardownTask(Task *task, bool enqueue) { } if (enqueue) { + size_t queueIndex = (size_t)task->Priority(); if (task->Type() == TaskType::CPU_COMPUTE) { - global_->compute_queue.push_back(task); + global_->compute_queue[queueIndex].push_back(task); global_->compute_queue_size++; } else if (task->Type() == TaskType::IO_BLOCKING) { - global_->io_queue.push_back(task); + global_->io_queue[queueIndex].push_back(task); global_->io_queue_size++; } else { _assert_(false); @@ -147,33 +153,39 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) if (global_queue_size() > 0) { // Grab one from the global queue if there is any. std::unique_lock lock(global->mutex); - auto &queue = isCompute ? global->compute_queue : global->io_queue; + auto queue = isCompute ? global->compute_queue : global->io_queue; auto &queue_size = isCompute ? global->compute_queue_size : global->io_queue_size; - if (!queue.empty()) { - task = queue.front(); - queue.pop_front(); - queue_size--; + if (queue_size != 0) { + for (size_t p = 0; p < TASK_PRIORITY_COUNT; ++p) { + if (queue[p].empty()) + continue; - // We are processing one now, so mark that. - thread->queue_size++; + task = queue[p].front(); + queue[p].pop_front(); + queue_size--; + + // We are processing one now, so mark that. + thread->queue_size++; + break; + } } } if (!task) { std::unique_lock lock(thread->mutex); - // We must check both queue and single again, while locked. - bool wait = true; - if (!thread->private_queue.empty()) { - task = thread->private_queue.front(); - thread->private_queue.pop_front(); - wait = false; - } else if (thread->cancelled) { - wait = false; - } else { - wait = global_queue_size() == 0; + for (size_t p = 0; p < TASK_PRIORITY_COUNT; ++p) { + if (thread->private_queue[p].empty()) + continue; + + task = thread->private_queue[p].front(); + thread->private_queue[p].pop_front(); + break; } + // We must check both queue and single again, while locked. + bool wait = !thread->cancelled && !task && global_queue_size() == 0; + if (wait) thread->cond.wait(lock); } @@ -229,6 +241,7 @@ void ThreadManager::EnqueueTask(Task *task) { _assert_msg_(IsInitialized(), "ThreadManager not initialized"); + size_t queueIndex = (size_t)task->Priority(); int minThread; int maxThread; if (task->Type() == TaskType::CPU_COMPUTE) { @@ -247,7 +260,7 @@ void ThreadManager::EnqueueTask(Task *task) { ThreadContext *thread = global_->threads_[threadNum]; if (thread->queue_size.load() == 0) { std::unique_lock lock(thread->mutex); - thread->private_queue.push_back(task); + thread->private_queue[queueIndex].push_back(task); thread->queue_size++; thread->cond.notify_one(); // Found it - done. @@ -260,10 +273,10 @@ void ThreadManager::EnqueueTask(Task *task) { { std::unique_lock lock(global_->mutex); if (task->Type() == TaskType::CPU_COMPUTE) { - global_->compute_queue.push_back(task); + global_->compute_queue[queueIndex].push_back(task); global_->compute_queue_size++; } else if (task->Type() == TaskType::IO_BLOCKING) { - global_->io_queue.push_back(task); + global_->io_queue[queueIndex].push_back(task); global_->io_queue_size++; } else { _assert_(false); @@ -284,11 +297,12 @@ void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task) { _assert_msg_(threadNum >= 0 && threadNum < (int)global_->threads_.size(), "Bad threadnum or not initialized"); ThreadContext *thread = global_->threads_[threadNum]; + size_t queueIndex = (size_t)task->Priority(); thread->queue_size++; std::unique_lock lock(thread->mutex); - thread->private_queue.push_back(task); + thread->private_queue[queueIndex].push_back(task); thread->cond.notify_one(); }