Common: Set a min size on threadpool work chunks.

This should avoid slicing loops, etc. into too many chunks.  Generalize
the memcpy a bit.
This commit is contained in:
Unknown W. Brackets 2021-04-15 23:59:02 -07:00
parent 639dd67130
commit b2f3f06768
5 changed files with 48 additions and 45 deletions

View file

@ -1,3 +1,5 @@
#include <algorithm>
#include <cstring>
#include "Common/Thread/ThreadPool.h"
#include "Common/Thread/ThreadUtil.h"
@ -54,34 +56,18 @@ void WorkerThread::WorkFunc() {
}
}
void LoopWorkerThread::Process(std::function<void(int, int)> work, int start, int end) {
void LoopWorkerThread::ProcessLoop(std::function<void(int, int)> work, int start, int end) {
std::lock_guard<std::mutex> guard(mutex);
work_ = std::move(work);
loopWork_ = std::move(work);
work_ = [this]() {
loopWork_(start_, end_);
};
start_ = start;
end_ = end;
jobsTarget = jobsDone + 1;
signal.notify_one();
}
void LoopWorkerThread::WorkFunc() {
setCurrentThreadName("LoopWorker");
std::unique_lock<std::mutex> guard(mutex);
while (active) {
// 'active == false' is one of the conditions for signaling,
// do not "optimize" it
while (active && jobsTarget <= jobsDone) {
signal.wait(guard);
}
if (active) {
work_(start_, end_);
std::lock_guard<std::mutex> doneGuard(doneMutex);
jobsDone++;
done.notify_one();
}
}
}
///////////////////////////// ThreadPool
ThreadPool::ThreadPool(int numThreads) {
@ -108,23 +94,32 @@ void ThreadPool::StartWorkers() {
}
}
void ThreadPool::ParallelLoop(const std::function<void(int,int)> &loop, int lower, int upper) {
void ThreadPool::ParallelLoop(const std::function<void(int,int)> &loop, int lower, int upper, int minSize) {
// Don't parallelize tiny loops.
if (minSize == -1)
minSize = 4;
int range = upper - lower;
if (range >= numThreads_ * 2) { // don't parallelize tiny loops (this could be better, maybe add optional parameter that estimates work per iteration)
if (range >= minSize) {
std::lock_guard<std::mutex> guard(mutex);
StartWorkers();
// could do slightly better load balancing for the generic case,
// but doesn't matter since all our loops are power of 2
int chunk = range / numThreads_;
int chunk = std::max(minSize, range / numThreads_);
int s = lower;
for (auto& worker : workers) {
worker->Process(loop, s, s+chunk);
s+=chunk;
for (auto &worker : workers) {
// We'll do the last chunk on the current thread.
if (s + chunk >= upper) {
break;
}
worker->ProcessLoop(loop, s, s + chunk);
s += chunk;
}
// This is the final chunk.
loop(s, upper);
for (auto& worker : workers) {
if (s < upper)
loop(s, upper);
for (auto &worker : workers) {
worker->WaitForCompletion();
}
} else {
@ -132,3 +127,9 @@ void ThreadPool::ParallelLoop(const std::function<void(int,int)> &loop, int lowe
}
}
void ThreadPool::ParallelMemcpy(void *dest, const void *src, int size) {
static const int MIN_SIZE = 128 * 1024;
ParallelLoop([&](int l, int h) {
memmove((uint8_t *)dest + l, (const uint8_t *)src + l, h - l);
}, 0, size, MIN_SIZE);
}

View file

@ -23,6 +23,8 @@ public:
void WaitForCompletion();
protected:
virtual void WorkFunc();
std::thread thread; // the worker thread
std::condition_variable signal; // used to signal new work
std::condition_variable done; // used to signal work completion
@ -30,11 +32,10 @@ protected:
bool active = true;
int jobsDone = 0;
int jobsTarget = 0;
private:
virtual void WorkFunc();
std::function<void()> work_; // the work to be done by this thread
private:
WorkerThread(const WorkerThread& other) = delete; // prevent copies
void operator =(const WorkerThread &other) = delete;
};
@ -42,14 +43,12 @@ private:
class LoopWorkerThread final : public WorkerThread {
public:
LoopWorkerThread() = default;
void Process(std::function<void(int, int)> work, int start, int end);
void ProcessLoop(std::function<void(int, int)> work, int start, int end);
private:
virtual void WorkFunc() override;
int start_;
int end_;
std::function<void(int, int)> work_; // the work to be done by this thread
std::function<void(int, int)> loopWork_; // the work to be done by this thread
};
// A thread pool manages a set of worker threads, and allows the execution of parallel loops on them
@ -61,7 +60,8 @@ public:
// don't need a destructor, "workers" is cleared on delete,
// leading to the stopping and joining of all worker threads (RAII and all that)
void ParallelLoop(const std::function<void(int,int)> &loop, int lower, int upper);
void ParallelLoop(const std::function<void(int,int)> &loop, int lower, int upper, int minSize);
void ParallelMemcpy(void *dest, const void *src, int sz);
private:
int numThreads_;

View file

@ -324,14 +324,10 @@ static void DoMemoryVoid(PointerWrap &p, uint32_t start, uint32_t size) {
switch (p.mode) {
case PointerWrap::MODE_READ:
GlobalThreadPool::Loop([&](int l, int h) {
memmove(d + l, storage + l, h - l);
}, 0, size);
GlobalThreadPool::Memcpy(d, storage, size);
break;
case PointerWrap::MODE_WRITE:
GlobalThreadPool::Loop([&](int l, int h) {
memmove(storage + l, d + l, h - l);
}, 0, size);
GlobalThreadPool::Memcpy(storage, d, size);
break;
case PointerWrap::MODE_MEASURE:
// Nothing to do here.

View file

@ -6,9 +6,14 @@
std::unique_ptr<ThreadPool> GlobalThreadPool::pool;
std::once_flag GlobalThreadPool::init_flag;
void GlobalThreadPool::Loop(const std::function<void(int,int)>& loop, int lower, int upper) {
void GlobalThreadPool::Loop(const std::function<void(int,int)>& loop, int lower, int upper, int minSize) {
std::call_once(init_flag, Inititialize);
pool->ParallelLoop(loop, lower, upper);
pool->ParallelLoop(loop, lower, upper, minSize);
}
void GlobalThreadPool::Memcpy(void *dest, const void *src, int size) {
std::call_once(init_flag, Inititialize);
pool->ParallelMemcpy(dest, src, size);
}
void GlobalThreadPool::Inititialize() {

View file

@ -6,7 +6,8 @@ class GlobalThreadPool {
public:
// will execute slices of "loop" from "lower" to "upper"
// in parallel on the global thread pool
static void Loop(const std::function<void(int,int)>& loop, int lower, int upper);
static void Loop(const std::function<void(int,int)>& loop, int lower, int upper, int minSize = -1);
static void Memcpy(void *dest, const void *src, int size);
private:
static std::unique_ptr<ThreadPool> pool;