From b2f3f0676876d27081910793efcd9ab9cdf30af8 Mon Sep 17 00:00:00 2001 From: "Unknown W. Brackets" Date: Thu, 15 Apr 2021 23:59:02 -0700 Subject: [PATCH] Common: Set a min size on threadpool work chunks. This should avoid slicing loops, etc. into too many chunks. Generalize the memcpy a bit. --- Common/Thread/ThreadPool.cpp | 59 ++++++++++++++++++------------------ Common/Thread/ThreadPool.h | 14 ++++----- Core/MemMap.cpp | 8 ++--- Core/ThreadPools.cpp | 9 ++++-- Core/ThreadPools.h | 3 +- 5 files changed, 48 insertions(+), 45 deletions(-) diff --git a/Common/Thread/ThreadPool.cpp b/Common/Thread/ThreadPool.cpp index 126de73691..01d6220b67 100644 --- a/Common/Thread/ThreadPool.cpp +++ b/Common/Thread/ThreadPool.cpp @@ -1,3 +1,5 @@ +#include +#include #include "Common/Thread/ThreadPool.h" #include "Common/Thread/ThreadUtil.h" @@ -54,34 +56,18 @@ void WorkerThread::WorkFunc() { } } -void LoopWorkerThread::Process(std::function work, int start, int end) { +void LoopWorkerThread::ProcessLoop(std::function work, int start, int end) { std::lock_guard guard(mutex); - work_ = std::move(work); + loopWork_ = std::move(work); + work_ = [this]() { + loopWork_(start_, end_); + }; start_ = start; end_ = end; jobsTarget = jobsDone + 1; signal.notify_one(); } -void LoopWorkerThread::WorkFunc() { - setCurrentThreadName("LoopWorker"); - std::unique_lock guard(mutex); - while (active) { - // 'active == false' is one of the conditions for signaling, - // do not "optimize" it - while (active && jobsTarget <= jobsDone) { - signal.wait(guard); - } - if (active) { - work_(start_, end_); - - std::lock_guard doneGuard(doneMutex); - jobsDone++; - done.notify_one(); - } - } -} - ///////////////////////////// ThreadPool ThreadPool::ThreadPool(int numThreads) { @@ -108,23 +94,32 @@ void ThreadPool::StartWorkers() { } } -void ThreadPool::ParallelLoop(const std::function &loop, int lower, int upper) { +void ThreadPool::ParallelLoop(const std::function &loop, int lower, int upper, int minSize) { + // Don't parallelize tiny loops. + if (minSize == -1) + minSize = 4; + int range = upper - lower; - if (range >= numThreads_ * 2) { // don't parallelize tiny loops (this could be better, maybe add optional parameter that estimates work per iteration) + if (range >= minSize) { std::lock_guard guard(mutex); StartWorkers(); // could do slightly better load balancing for the generic case, // but doesn't matter since all our loops are power of 2 - int chunk = range / numThreads_; + int chunk = std::max(minSize, range / numThreads_); int s = lower; - for (auto& worker : workers) { - worker->Process(loop, s, s+chunk); - s+=chunk; + for (auto &worker : workers) { + // We'll do the last chunk on the current thread. + if (s + chunk >= upper) { + break; + } + worker->ProcessLoop(loop, s, s + chunk); + s += chunk; } // This is the final chunk. - loop(s, upper); - for (auto& worker : workers) { + if (s < upper) + loop(s, upper); + for (auto &worker : workers) { worker->WaitForCompletion(); } } else { @@ -132,3 +127,9 @@ void ThreadPool::ParallelLoop(const std::function &loop, int lowe } } +void ThreadPool::ParallelMemcpy(void *dest, const void *src, int size) { + static const int MIN_SIZE = 128 * 1024; + ParallelLoop([&](int l, int h) { + memmove((uint8_t *)dest + l, (const uint8_t *)src + l, h - l); + }, 0, size, MIN_SIZE); +} diff --git a/Common/Thread/ThreadPool.h b/Common/Thread/ThreadPool.h index 2064b31c28..a410c2964d 100644 --- a/Common/Thread/ThreadPool.h +++ b/Common/Thread/ThreadPool.h @@ -23,6 +23,8 @@ public: void WaitForCompletion(); protected: + virtual void WorkFunc(); + std::thread thread; // the worker thread std::condition_variable signal; // used to signal new work std::condition_variable done; // used to signal work completion @@ -30,11 +32,10 @@ protected: bool active = true; int jobsDone = 0; int jobsTarget = 0; -private: - virtual void WorkFunc(); std::function work_; // the work to be done by this thread +private: WorkerThread(const WorkerThread& other) = delete; // prevent copies void operator =(const WorkerThread &other) = delete; }; @@ -42,14 +43,12 @@ private: class LoopWorkerThread final : public WorkerThread { public: LoopWorkerThread() = default; - void Process(std::function work, int start, int end); + void ProcessLoop(std::function work, int start, int end); private: - virtual void WorkFunc() override; - int start_; int end_; - std::function work_; // the work to be done by this thread + std::function loopWork_; // the work to be done by this thread }; // A thread pool manages a set of worker threads, and allows the execution of parallel loops on them @@ -61,7 +60,8 @@ public: // don't need a destructor, "workers" is cleared on delete, // leading to the stopping and joining of all worker threads (RAII and all that) - void ParallelLoop(const std::function &loop, int lower, int upper); + void ParallelLoop(const std::function &loop, int lower, int upper, int minSize); + void ParallelMemcpy(void *dest, const void *src, int sz); private: int numThreads_; diff --git a/Core/MemMap.cpp b/Core/MemMap.cpp index 1f50b3b810..e12cb8af39 100644 --- a/Core/MemMap.cpp +++ b/Core/MemMap.cpp @@ -324,14 +324,10 @@ static void DoMemoryVoid(PointerWrap &p, uint32_t start, uint32_t size) { switch (p.mode) { case PointerWrap::MODE_READ: - GlobalThreadPool::Loop([&](int l, int h) { - memmove(d + l, storage + l, h - l); - }, 0, size); + GlobalThreadPool::Memcpy(d, storage, size); break; case PointerWrap::MODE_WRITE: - GlobalThreadPool::Loop([&](int l, int h) { - memmove(storage + l, d + l, h - l); - }, 0, size); + GlobalThreadPool::Memcpy(storage, d, size); break; case PointerWrap::MODE_MEASURE: // Nothing to do here. diff --git a/Core/ThreadPools.cpp b/Core/ThreadPools.cpp index 9af420cf37..0159392328 100644 --- a/Core/ThreadPools.cpp +++ b/Core/ThreadPools.cpp @@ -6,9 +6,14 @@ std::unique_ptr GlobalThreadPool::pool; std::once_flag GlobalThreadPool::init_flag; -void GlobalThreadPool::Loop(const std::function& loop, int lower, int upper) { +void GlobalThreadPool::Loop(const std::function& loop, int lower, int upper, int minSize) { std::call_once(init_flag, Inititialize); - pool->ParallelLoop(loop, lower, upper); + pool->ParallelLoop(loop, lower, upper, minSize); +} + +void GlobalThreadPool::Memcpy(void *dest, const void *src, int size) { + std::call_once(init_flag, Inititialize); + pool->ParallelMemcpy(dest, src, size); } void GlobalThreadPool::Inititialize() { diff --git a/Core/ThreadPools.h b/Core/ThreadPools.h index 350d56da1e..6912996435 100644 --- a/Core/ThreadPools.h +++ b/Core/ThreadPools.h @@ -6,7 +6,8 @@ class GlobalThreadPool { public: // will execute slices of "loop" from "lower" to "upper" // in parallel on the global thread pool - static void Loop(const std::function& loop, int lower, int upper); + static void Loop(const std::function& loop, int lower, int upper, int minSize = -1); + static void Memcpy(void *dest, const void *src, int size); private: static std::unique_ptr pool;