ThreadManager: Add simple priority queues.

This allows basic differentiation of priorities.
This commit is contained in:
Unknown W. Brackets 2023-01-15 08:05:48 -08:00
parent 88ba003f46
commit 0971555e51

View file

@ -22,12 +22,13 @@
const int MAX_CORES_TO_USE = 16;
const int MIN_IO_BLOCKING_THREADS = 4;
static constexpr size_t TASK_PRIORITY_COUNT = (size_t)TaskPriority::COUNT;
struct GlobalThreadContext {
std::mutex mutex; // associated with each respective condition variable
std::deque<Task *> compute_queue;
std::deque<Task *> compute_queue[TASK_PRIORITY_COUNT];
std::atomic<int> compute_queue_size;
std::deque<Task *> io_queue;
std::deque<Task *> io_queue[TASK_PRIORITY_COUNT];
std::atomic<int> io_queue_size;
std::vector<ThreadContext *> threads_;
@ -42,7 +43,7 @@ struct ThreadContext {
int index;
TaskType type;
std::atomic<bool> cancelled;
std::deque<Task *> private_queue;
std::deque<Task *> private_queue[TASK_PRIORITY_COUNT];
char name[16];
};
@ -65,12 +66,14 @@ void ThreadManager::Teardown() {
// Purge any cancellable tasks while the threads shut down.
if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) {
auto drainQueue = [&](std::deque<Task *> &queue, std::atomic<int> &size) {
for (auto it = queue.begin(); it != queue.end(); ++it) {
if (TeardownTask(*it, false)) {
queue.erase(it);
size--;
return false;
auto drainQueue = [&](std::deque<Task *> queue[TASK_PRIORITY_COUNT], std::atomic<int> &size) {
for (size_t i = 0; i < TASK_PRIORITY_COUNT; ++i) {
for (auto it = queue[i].begin(); it != queue[i].end(); ++it) {
if (TeardownTask(*it, false)) {
queue[i].erase(it);
size--;
return false;
}
}
}
return true;
@ -86,8 +89,10 @@ void ThreadManager::Teardown() {
for (ThreadContext *&threadCtx : global_->threads_) {
threadCtx->thread.join();
// TODO: Is it better to just delete these?
for (Task *task : threadCtx->private_queue) {
TeardownTask(task, true);
for (size_t i = 0; i < TASK_PRIORITY_COUNT; ++i) {
for (Task *task : threadCtx->private_queue[i]) {
TeardownTask(task, true);
}
}
delete threadCtx;
}
@ -109,11 +114,12 @@ bool ThreadManager::TeardownTask(Task *task, bool enqueue) {
}
if (enqueue) {
size_t queueIndex = (size_t)task->Priority();
if (task->Type() == TaskType::CPU_COMPUTE) {
global_->compute_queue.push_back(task);
global_->compute_queue[queueIndex].push_back(task);
global_->compute_queue_size++;
} else if (task->Type() == TaskType::IO_BLOCKING) {
global_->io_queue.push_back(task);
global_->io_queue[queueIndex].push_back(task);
global_->io_queue_size++;
} else {
_assert_(false);
@ -147,33 +153,39 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
if (global_queue_size() > 0) {
// Grab one from the global queue if there is any.
std::unique_lock<std::mutex> lock(global->mutex);
auto &queue = isCompute ? global->compute_queue : global->io_queue;
auto queue = isCompute ? global->compute_queue : global->io_queue;
auto &queue_size = isCompute ? global->compute_queue_size : global->io_queue_size;
if (!queue.empty()) {
task = queue.front();
queue.pop_front();
queue_size--;
if (queue_size != 0) {
for (size_t p = 0; p < TASK_PRIORITY_COUNT; ++p) {
if (queue[p].empty())
continue;
// We are processing one now, so mark that.
thread->queue_size++;
task = queue[p].front();
queue[p].pop_front();
queue_size--;
// We are processing one now, so mark that.
thread->queue_size++;
break;
}
}
}
if (!task) {
std::unique_lock<std::mutex> lock(thread->mutex);
// We must check both queue and single again, while locked.
bool wait = true;
if (!thread->private_queue.empty()) {
task = thread->private_queue.front();
thread->private_queue.pop_front();
wait = false;
} else if (thread->cancelled) {
wait = false;
} else {
wait = global_queue_size() == 0;
for (size_t p = 0; p < TASK_PRIORITY_COUNT; ++p) {
if (thread->private_queue[p].empty())
continue;
task = thread->private_queue[p].front();
thread->private_queue[p].pop_front();
break;
}
// We must check both queue and single again, while locked.
bool wait = !thread->cancelled && !task && global_queue_size() == 0;
if (wait)
thread->cond.wait(lock);
}
@ -229,6 +241,7 @@ void ThreadManager::EnqueueTask(Task *task) {
_assert_msg_(IsInitialized(), "ThreadManager not initialized");
size_t queueIndex = (size_t)task->Priority();
int minThread;
int maxThread;
if (task->Type() == TaskType::CPU_COMPUTE) {
@ -247,7 +260,7 @@ void ThreadManager::EnqueueTask(Task *task) {
ThreadContext *thread = global_->threads_[threadNum];
if (thread->queue_size.load() == 0) {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->private_queue[queueIndex].push_back(task);
thread->queue_size++;
thread->cond.notify_one();
// Found it - done.
@ -260,10 +273,10 @@ void ThreadManager::EnqueueTask(Task *task) {
{
std::unique_lock<std::mutex> lock(global_->mutex);
if (task->Type() == TaskType::CPU_COMPUTE) {
global_->compute_queue.push_back(task);
global_->compute_queue[queueIndex].push_back(task);
global_->compute_queue_size++;
} else if (task->Type() == TaskType::IO_BLOCKING) {
global_->io_queue.push_back(task);
global_->io_queue[queueIndex].push_back(task);
global_->io_queue_size++;
} else {
_assert_(false);
@ -284,11 +297,12 @@ void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task) {
_assert_msg_(threadNum >= 0 && threadNum < (int)global_->threads_.size(), "Bad threadnum or not initialized");
ThreadContext *thread = global_->threads_[threadNum];
size_t queueIndex = (size_t)task->Priority();
thread->queue_size++;
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->private_queue[queueIndex].push_back(task);
thread->cond.notify_one();
}