diff --git a/Common/CPUDetect.h b/Common/CPUDetect.h index 1318814070..c2985fc3c5 100644 --- a/Common/CPUDetect.h +++ b/Common/CPUDetect.h @@ -39,7 +39,10 @@ struct CPUInfo { bool Mode64bit; bool HTT; + + // Number of real CPU cores. int num_cores; + // Number of logical CPUs per core. int logical_cpu_count; bool bAtom; diff --git a/Common/Thread/Channel.h b/Common/Thread/Channel.h index 53c1d83b94..9af11a0383 100644 --- a/Common/Thread/Channel.h +++ b/Common/Thread/Channel.h @@ -2,11 +2,18 @@ #include #include +#include + +// Named Channel.h because I originally intended to support a multi item channel as +// well as a simple blocking mailbox. Let's see if we get there. // Single item mailbox. template struct Mailbox { Mailbox() : refcount_(1) {} + ~Mailbox() { + assert(refcount_ == 0); + } std::mutex mutex_; std::condition_variable condvar_; diff --git a/Common/Thread/ParallelLoop.cpp b/Common/Thread/ParallelLoop.cpp index bb6868efb8..cf25d3ee33 100644 --- a/Common/Thread/ParallelLoop.cpp +++ b/Common/Thread/ParallelLoop.cpp @@ -1,6 +1,7 @@ #include -#include "ParallelLoop.h" +#include "Common/Thread/ParallelLoop.h" +#include "Common/CPUDetect.h" class LoopRangeTask : public Task { public: @@ -24,7 +25,6 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std:: minSize = 1; } - // TODO: Optimize using minSize. int numTasks = threadMan->GetNumLooperThreads(); int range = upper - lower; @@ -37,7 +37,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std:: // Just assign one task per thread, as many as we have. WaitableCounter *counter = new WaitableCounter(range); for (int i = 0; i < range; i++) { - threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, i, i + 1)); + threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, i, i + 1), TaskType::CPU_COMPUTE); } return counter; } else { @@ -50,7 +50,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std:: int start = lastEnd; d += dx; int end = i == numTasks - 1 ? range : (int)d; - threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, start, end)); + threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, start, end), TaskType::CPU_COMPUTE); lastEnd = end; } return counter; @@ -58,6 +58,13 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std:: } void ParallelRangeLoop(ThreadManager *threadMan, const std::function &loop, int lower, int upper, int minSize) { + if (cpu_info.num_cores == 1) { + // "Optimization" for single-core devices. + // No point in adding threading overhead, let's just do it inline. + loop(lower, upper); + return; + } + if (minSize == -1) { minSize = 4; } diff --git a/Common/Thread/Promise.h b/Common/Thread/Promise.h index c4f34a3996..7862cb0bcf 100644 --- a/Common/Thread/Promise.h +++ b/Common/Thread/Promise.h @@ -10,13 +10,11 @@ class PromiseTask : public Task { public: PromiseTask() {} ~PromiseTask() { - tx_->Release(); } void Run() override { T *value = fun_(); tx_->Send(value); - tx_->Release(); } std::function fun_; @@ -30,18 +28,17 @@ public: template class Promise { public: - static Promise *Spawn(ThreadManager *threadman, std::function fun) { + static Promise *Spawn(ThreadManager *threadman, std::function fun, TaskType taskType) { // std::pair, Tx> channel = CreateChannel(); Mailbox *mailbox = new Mailbox(); PromiseTask *task = new PromiseTask(); task->fun_ = fun; task->tx_ = mailbox; - threadman->EnqueueTask(task); + threadman->EnqueueTask(task, taskType); Promise *promise = new Promise(); promise->rx_ = mailbox; - mailbox->AddRef(); return promise; } diff --git a/Common/Thread/ThreadManager.cpp b/Common/Thread/ThreadManager.cpp index 8945e67f7d..3b00210b72 100644 --- a/Common/Thread/ThreadManager.cpp +++ b/Common/Thread/ThreadManager.cpp @@ -11,10 +11,24 @@ #include "Common/Thread/ThreadUtil.h" #include "Common/Thread/ThreadManager.h" +// Threads and task scheduling +// +// * The threadpool should contain a number of threads that's the the number of cores, +// plus a fixed number more for I/O-limited background tasks. +// * Parallel compute-limited loops should use as many threads as there are cores. +// They should always be scheduled to the first N threads. +// * For some tasks, splitting the input values up linearly between the threads +// is not fair. However, we ignore that for now. + +const int MAX_CORES_TO_USE = 16; +const int EXTRA_THREADS = 4; // For I/O limited tasks + struct GlobalThreadContext { std::mutex mutex; // associated with each respective condition variable std::deque queue; std::vector threads_; + + int roundRobin; }; struct ThreadContext { @@ -80,7 +94,11 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) } } -void ThreadManager::Init(int numThreads) { +void ThreadManager::Init(int numRealCores, int numLogicalCores) { + numComputeThreads_ = std::min(numRealCores, MAX_CORES_TO_USE); + int numThreads = numComputeThreads_ + EXTRA_THREADS; + numThreads_ = numThreads; + for (int i = 0; i < numThreads; i++) { ThreadContext *thread = new ThreadContext(); thread->cancelled.store(false); @@ -91,9 +109,25 @@ void ThreadManager::Init(int numThreads) { } void ThreadManager::EnqueueTask(Task *task, TaskType taskType) { + int maxThread; + int threadOffset = 0; + if (taskType == TaskType::CPU_COMPUTE) { + // only the threads reserved for heavy compute. + maxThread = numComputeThreads_; + threadOffset = 0; + } else { + // any free thread + maxThread = numThreads_; + threadOffset = numComputeThreads_; + } + // Find a thread with no outstanding work. - for (int i = 0; i < global_->threads_.size(); i++) { - ThreadContext *thread = global_->threads_[i]; + int threadNum = threadOffset; + for (int i = 0; i < maxThread; i++, threadNum++) { + if (threadNum >= global_->threads_.size()) { + threadNum = 0; + } + ThreadContext *thread = global_->threads_[threadNum]; if (thread->queueSize.load() == 0) { std::unique_lock lock(thread->mutex); thread->private_queue.push_back(task); @@ -104,11 +138,13 @@ void ThreadManager::EnqueueTask(Task *task, TaskType taskType) { } } - // Still not scheduled? Put it on the global queue and notify a random thread. + // Still not scheduled? Put it on the global queue and notify a thread chosen by round-robin. + // Not particularly scientific, but hopefully we should not run into this too much. { std::unique_lock lock(global_->mutex); global_->queue.push_back(task); - global_->threads_[0]->cond.notify_one(); + global_->threads_[global_->roundRobin % maxThread]->cond.notify_one(); + global_->roundRobin++; } } @@ -123,10 +159,7 @@ void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task, TaskType task } int ThreadManager::GetNumLooperThreads() const { - // If possible, let's use all threads but one for parallel loops. - // Not sure what's the best policy here. Maybe we should just have more threads than CPUs. - int numLooperThreads = (int)(global_->threads_.size()) - 1; - return std::max(numLooperThreads, 1); + return numComputeThreads_; } void ThreadManager::TryCancelTask(uint64_t taskID) { diff --git a/Common/Thread/ThreadManager.h b/Common/Thread/ThreadManager.h index b87214363f..ff4bbc5289 100644 --- a/Common/Thread/ThreadManager.h +++ b/Common/Thread/ThreadManager.h @@ -41,9 +41,12 @@ public: ThreadManager(); ~ThreadManager(); - void Init(int numWorkerThreads); - void EnqueueTask(Task *task, TaskType taskType = TaskType::CPU_COMPUTE); - void EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType = TaskType::CPU_COMPUTE); + // The distinction here is to be able to take hyper-threading into account. + // It gets even trickier when you think about mobile chips with BIG/LITTLE, but we'll + // just ignore it and let the OS handle it. + void Init(int numRealCores, int numLogicalCores); + void EnqueueTask(Task *task, TaskType taskType); + void EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType); // Currently does nothing. It will always be best-effort - maybe it cancels, // maybe it doesn't. Note that the id is the id() returned by the task. You need to make that @@ -57,6 +60,9 @@ public: private: GlobalThreadContext *global_; + int numThreads_ = 0; + int numComputeThreads_ = 0; + friend struct ThreadContext; }; diff --git a/Common/Thread/ThreadUtil.cpp b/Common/Thread/ThreadUtil.cpp index 8f35af321b..553ba99236 100644 --- a/Common/Thread/ThreadUtil.cpp +++ b/Common/Thread/ThreadUtil.cpp @@ -1,3 +1,5 @@ +#include "ppsspp_config.h" + #ifdef _WIN32 #include #ifdef __MINGW32__ @@ -16,6 +18,9 @@ #if defined(__ANDROID__) || defined(__APPLE__) || (defined(__GLIBC__) && defined(_GNU_SOURCE)) #include +#include +#include +#include #endif #ifdef TLS_SUPPORTED @@ -41,12 +46,14 @@ static EXCEPTION_DISPOSITION NTAPI ignore_handler(EXCEPTION_RECORD *rec, #endif void SetCurrentThreadName(const char* threadName) { -#ifdef _WIN32 +#if PPSSPP_PLATFORM(WINDOWS) // Set the debugger-visible threadname through an unholy magic hack static const DWORD MS_VC_EXCEPTION = 0x406D1388; #endif -#if defined(_WIN32) && defined(__MINGW32__) + // TODO: Use the new function SetThreadDescription available since Windows 10, version 1607. + +#if PPSSPP_PLATFORM(WINDOWS) && defined(__MINGW32__) // Thread information for VS compatible debugger. -1 sets current thread. THREADNAME_INFO ti; ti.dwType = 0x1000; @@ -67,7 +74,7 @@ void SetCurrentThreadName(const char* threadName) { // Pop exception handler tib->ExceptionList = tib->ExceptionList->Next; -#elif defined(_WIN32) +#elif PPSSPP_PLATFORM(WINDOWS) #pragma pack(push,8) struct THREADNAME_INFO { DWORD dwType; // must be 0x1000 @@ -121,3 +128,11 @@ void AssertCurrentThreadName(const char *threadName) { } #endif } + +int GetCurrentThreadIdForDebug() { +#if PPSSPP_PLATFORM(WINDOWS) + return (int)GetCurrentThreadId(); +#else + return gettid(); +#endif +} diff --git a/Common/Thread/ThreadUtil.h b/Common/Thread/ThreadUtil.h index d702ec566b..982de2a41c 100644 --- a/Common/Thread/ThreadUtil.h +++ b/Common/Thread/ThreadUtil.h @@ -6,3 +6,7 @@ // for AssertCurrentThreadName to work. void SetCurrentThreadName(const char *threadName); void AssertCurrentThreadName(const char *threadName); + +// Just gets a cheap thread identifier so that you can see different threads in debug output, +// exactly what it is is badly specified and not useful for anything. +int GetCurrentThreadIdForDebug(); diff --git a/Core/Config.cpp b/Core/Config.cpp index b8b08d8d9e..2a7fe268a5 100644 --- a/Core/Config.cpp +++ b/Core/Config.cpp @@ -423,12 +423,6 @@ std::string CreateRandMAC() { return randStream.str(); } -static int DefaultNumWorkers() { - // Let's cap the global thread pool at 16 threads. Nothing we do really should have much - // use for more... - return std::min(16, cpu_info.num_cores); -} - static int DefaultCpuCore() { #if PPSSPP_ARCH(ARM) || PPSSPP_ARCH(ARM64) || PPSSPP_ARCH(X86) || PPSSPP_ARCH(AMD64) return (int)CPUCore::JIT; @@ -473,7 +467,6 @@ static ConfigSetting generalSettings[] = { ConfigSetting("DiscordPresence", &g_Config.bDiscordPresence, true, true, false), // Or maybe it makes sense to have it per-game? Race conditions abound... ConfigSetting("UISound", &g_Config.bUISound, false, true, false), - ReportedConfigSetting("NumWorkerThreads", &g_Config.iNumWorkerThreads, &DefaultNumWorkers, true, true), ConfigSetting("AutoLoadSaveState", &g_Config.iAutoLoadSaveState, 0, true, true), ReportedConfigSetting("EnableCheats", &g_Config.bEnableCheats, false, true, true), ConfigSetting("CwCheatRefreshRate", &g_Config.iCwCheatRefreshRate, 77, true, true), diff --git a/Core/Config.h b/Core/Config.h index 6dbe092183..b1a80d5427 100644 --- a/Core/Config.h +++ b/Core/Config.h @@ -72,7 +72,6 @@ public: bool bBrowse; // when opening the emulator, immediately show a file browser // General - int iNumWorkerThreads; bool bScreenshotsAsPNG; bool bUseFFV1; bool bDumpFrames; diff --git a/UI/GameInfoCache.cpp b/UI/GameInfoCache.cpp index 9c9b3dd0df..e40fde9b8f 100644 --- a/UI/GameInfoCache.cpp +++ b/UI/GameInfoCache.cpp @@ -752,7 +752,7 @@ std::shared_ptr GameInfoCache::GetInfo(Draw::DrawContext *draw, const } GameInfoWorkItem *item = new GameInfoWorkItem(gamePath, info); - g_threadManager.EnqueueTask(item); + g_threadManager.EnqueueTask(item, TaskType::IO_BLOCKING); // Don't re-insert if we already have it. if (info_.find(pathStr) == info_.end()) diff --git a/UI/NativeApp.cpp b/UI/NativeApp.cpp index 9dceff87e7..b0196395d8 100644 --- a/UI/NativeApp.cpp +++ b/UI/NativeApp.cpp @@ -338,7 +338,7 @@ static void PostLoadConfig() { else i18nrepo.LoadIni(g_Config.sLanguageIni, langOverridePath); - g_threadManager.Init(g_Config.iNumWorkerThreads); + g_threadManager.Init(cpu_info.num_cores, cpu_info.logical_cpu_count); } static bool CreateDirectoriesAndroid() { diff --git a/headless/Headless.cpp b/headless/Headless.cpp index 2e142e638e..e62c08ce0e 100644 --- a/headless/Headless.cpp +++ b/headless/Headless.cpp @@ -388,7 +388,6 @@ int main(int argc, const char* argv[]) g_Config.iInternalResolution = 1; g_Config.iUnthrottleMode = (int)UnthrottleMode::CONTINUOUS; g_Config.bEnableLogging = fullLog; - g_Config.iNumWorkerThreads = 1; g_Config.bSoftwareSkinning = true; g_Config.bVertexDecoderJit = true; g_Config.bBlockTransferGPU = true; diff --git a/unittest/TestThreadManager.cpp b/unittest/TestThreadManager.cpp index 1dbaf689bb..022e8fa5ba 100644 --- a/unittest/TestThreadManager.cpp +++ b/unittest/TestThreadManager.cpp @@ -6,6 +6,7 @@ #include "Common/Thread/Channel.h" #include "Common/Thread/Promise.h" #include "Common/Thread/ParallelLoop.h" +#include "Common/Thread/ThreadUtil.h" struct ResultObject { bool ok; @@ -13,6 +14,7 @@ struct ResultObject { ResultObject *ResultProducer() { sleep_ms(250); + printf("result produced: thread %d\n", GetCurrentThreadIdForDebug()); return new ResultObject{ true }; } @@ -28,16 +30,23 @@ bool TestMailbox() { } void rangeFunc(int lower, int upper) { - printf("%d-%d\n", lower, upper); + sleep_ms(30); + printf("range %d-%d (thread %d)\n", lower, upper, GetCurrentThreadIdForDebug()); } +// This always passes unless something is badly broken, the interesting thing is the +// logged output. bool TestParallelLoop(ThreadManager *threadMan) { + printf("tester thread ID: %d", GetCurrentThreadIdForDebug()); + WaitableCounter *waitable = ParallelRangeLoopWaitable(threadMan, rangeFunc, 0, 7, 1); - // Can do stuff here if we like. - waitable->WaitAndRelease(); // Now it's done. + + ParallelRangeLoop(threadMan, rangeFunc, 0, 65); + + // Try a few synchronous loops (can be slightly more efficient) with various ranges. return true; } @@ -47,9 +56,9 @@ bool TestThreadManager() { } ThreadManager manager; - manager.Init(8); + manager.Init(8, 8); - Promise *object(Promise::Spawn(&manager, &ResultProducer)); + Promise *object(Promise::Spawn(&manager, &ResultProducer, TaskType::IO_BLOCKING)); if (!TestParallelLoop(&manager)) { return false;