Autodetect number of threads (remove setting). Fix some bugs.

This commit is contained in:
Henrik Rydgård 2021-06-12 21:06:59 +02:00
parent 03c79ae055
commit 0fa27ff9d7
14 changed files with 112 additions and 40 deletions

View file

@ -39,7 +39,10 @@ struct CPUInfo {
bool Mode64bit;
bool HTT;
// Number of real CPU cores.
int num_cores;
// Number of logical CPUs per core.
int logical_cpu_count;
bool bAtom;

View file

@ -2,11 +2,18 @@
#include <mutex>
#include <condition_variable>
#include <cassert>
// Named Channel.h because I originally intended to support a multi item channel as
// well as a simple blocking mailbox. Let's see if we get there.
// Single item mailbox.
template<class T>
struct Mailbox {
Mailbox() : refcount_(1) {}
~Mailbox() {
assert(refcount_ == 0);
}
std::mutex mutex_;
std::condition_variable condvar_;

View file

@ -1,6 +1,7 @@
#include <cstring>
#include "ParallelLoop.h"
#include "Common/Thread/ParallelLoop.h"
#include "Common/CPUDetect.h"
class LoopRangeTask : public Task {
public:
@ -24,7 +25,6 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
minSize = 1;
}
// TODO: Optimize using minSize.
int numTasks = threadMan->GetNumLooperThreads();
int range = upper - lower;
@ -37,7 +37,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
// Just assign one task per thread, as many as we have.
WaitableCounter *counter = new WaitableCounter(range);
for (int i = 0; i < range; i++) {
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, i, i + 1));
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, i, i + 1), TaskType::CPU_COMPUTE);
}
return counter;
} else {
@ -50,7 +50,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
int start = lastEnd;
d += dx;
int end = i == numTasks - 1 ? range : (int)d;
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, start, end));
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, start, end), TaskType::CPU_COMPUTE);
lastEnd = end;
}
return counter;
@ -58,6 +58,13 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
}
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
if (cpu_info.num_cores == 1) {
// "Optimization" for single-core devices.
// No point in adding threading overhead, let's just do it inline.
loop(lower, upper);
return;
}
if (minSize == -1) {
minSize = 4;
}

View file

@ -10,13 +10,11 @@ class PromiseTask : public Task {
public:
PromiseTask() {}
~PromiseTask() {
tx_->Release();
}
void Run() override {
T *value = fun_();
tx_->Send(value);
tx_->Release();
}
std::function<T *()> fun_;
@ -30,18 +28,17 @@ public:
template<class T>
class Promise {
public:
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T *()> fun) {
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T *()> fun, TaskType taskType) {
// std::pair<Rx<T>, Tx<T>> channel = CreateChannel<T>();
Mailbox<T> *mailbox = new Mailbox<T>();
PromiseTask<T> *task = new PromiseTask<T>();
task->fun_ = fun;
task->tx_ = mailbox;
threadman->EnqueueTask(task);
threadman->EnqueueTask(task, taskType);
Promise<T> *promise = new Promise<T>();
promise->rx_ = mailbox;
mailbox->AddRef();
return promise;
}

View file

@ -11,10 +11,24 @@
#include "Common/Thread/ThreadUtil.h"
#include "Common/Thread/ThreadManager.h"
// Threads and task scheduling
//
// * The threadpool should contain a number of threads that's the the number of cores,
// plus a fixed number more for I/O-limited background tasks.
// * Parallel compute-limited loops should use as many threads as there are cores.
// They should always be scheduled to the first N threads.
// * For some tasks, splitting the input values up linearly between the threads
// is not fair. However, we ignore that for now.
const int MAX_CORES_TO_USE = 16;
const int EXTRA_THREADS = 4; // For I/O limited tasks
struct GlobalThreadContext {
std::mutex mutex; // associated with each respective condition variable
std::deque<Task *> queue;
std::vector<ThreadContext *> threads_;
int roundRobin;
};
struct ThreadContext {
@ -80,7 +94,11 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
}
}
void ThreadManager::Init(int numThreads) {
void ThreadManager::Init(int numRealCores, int numLogicalCores) {
numComputeThreads_ = std::min(numRealCores, MAX_CORES_TO_USE);
int numThreads = numComputeThreads_ + EXTRA_THREADS;
numThreads_ = numThreads;
for (int i = 0; i < numThreads; i++) {
ThreadContext *thread = new ThreadContext();
thread->cancelled.store(false);
@ -91,9 +109,25 @@ void ThreadManager::Init(int numThreads) {
}
void ThreadManager::EnqueueTask(Task *task, TaskType taskType) {
int maxThread;
int threadOffset = 0;
if (taskType == TaskType::CPU_COMPUTE) {
// only the threads reserved for heavy compute.
maxThread = numComputeThreads_;
threadOffset = 0;
} else {
// any free thread
maxThread = numThreads_;
threadOffset = numComputeThreads_;
}
// Find a thread with no outstanding work.
for (int i = 0; i < global_->threads_.size(); i++) {
ThreadContext *thread = global_->threads_[i];
int threadNum = threadOffset;
for (int i = 0; i < maxThread; i++, threadNum++) {
if (threadNum >= global_->threads_.size()) {
threadNum = 0;
}
ThreadContext *thread = global_->threads_[threadNum];
if (thread->queueSize.load() == 0) {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
@ -104,11 +138,13 @@ void ThreadManager::EnqueueTask(Task *task, TaskType taskType) {
}
}
// Still not scheduled? Put it on the global queue and notify a random thread.
// Still not scheduled? Put it on the global queue and notify a thread chosen by round-robin.
// Not particularly scientific, but hopefully we should not run into this too much.
{
std::unique_lock<std::mutex> lock(global_->mutex);
global_->queue.push_back(task);
global_->threads_[0]->cond.notify_one();
global_->threads_[global_->roundRobin % maxThread]->cond.notify_one();
global_->roundRobin++;
}
}
@ -123,10 +159,7 @@ void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task, TaskType task
}
int ThreadManager::GetNumLooperThreads() const {
// If possible, let's use all threads but one for parallel loops.
// Not sure what's the best policy here. Maybe we should just have more threads than CPUs.
int numLooperThreads = (int)(global_->threads_.size()) - 1;
return std::max(numLooperThreads, 1);
return numComputeThreads_;
}
void ThreadManager::TryCancelTask(uint64_t taskID) {

View file

@ -41,9 +41,12 @@ public:
ThreadManager();
~ThreadManager();
void Init(int numWorkerThreads);
void EnqueueTask(Task *task, TaskType taskType = TaskType::CPU_COMPUTE);
void EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType = TaskType::CPU_COMPUTE);
// The distinction here is to be able to take hyper-threading into account.
// It gets even trickier when you think about mobile chips with BIG/LITTLE, but we'll
// just ignore it and let the OS handle it.
void Init(int numRealCores, int numLogicalCores);
void EnqueueTask(Task *task, TaskType taskType);
void EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType);
// Currently does nothing. It will always be best-effort - maybe it cancels,
// maybe it doesn't. Note that the id is the id() returned by the task. You need to make that
@ -57,6 +60,9 @@ public:
private:
GlobalThreadContext *global_;
int numThreads_ = 0;
int numComputeThreads_ = 0;
friend struct ThreadContext;
};

View file

@ -1,3 +1,5 @@
#include "ppsspp_config.h"
#ifdef _WIN32
#include <windows.h>
#ifdef __MINGW32__
@ -16,6 +18,9 @@
#if defined(__ANDROID__) || defined(__APPLE__) || (defined(__GLIBC__) && defined(_GNU_SOURCE))
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/syscall.h>
#endif
#ifdef TLS_SUPPORTED
@ -41,12 +46,14 @@ static EXCEPTION_DISPOSITION NTAPI ignore_handler(EXCEPTION_RECORD *rec,
#endif
void SetCurrentThreadName(const char* threadName) {
#ifdef _WIN32
#if PPSSPP_PLATFORM(WINDOWS)
// Set the debugger-visible threadname through an unholy magic hack
static const DWORD MS_VC_EXCEPTION = 0x406D1388;
#endif
#if defined(_WIN32) && defined(__MINGW32__)
// TODO: Use the new function SetThreadDescription available since Windows 10, version 1607.
#if PPSSPP_PLATFORM(WINDOWS) && defined(__MINGW32__)
// Thread information for VS compatible debugger. -1 sets current thread.
THREADNAME_INFO ti;
ti.dwType = 0x1000;
@ -67,7 +74,7 @@ void SetCurrentThreadName(const char* threadName) {
// Pop exception handler
tib->ExceptionList = tib->ExceptionList->Next;
#elif defined(_WIN32)
#elif PPSSPP_PLATFORM(WINDOWS)
#pragma pack(push,8)
struct THREADNAME_INFO {
DWORD dwType; // must be 0x1000
@ -121,3 +128,11 @@ void AssertCurrentThreadName(const char *threadName) {
}
#endif
}
int GetCurrentThreadIdForDebug() {
#if PPSSPP_PLATFORM(WINDOWS)
return (int)GetCurrentThreadId();
#else
return gettid();
#endif
}

View file

@ -6,3 +6,7 @@
// for AssertCurrentThreadName to work.
void SetCurrentThreadName(const char *threadName);
void AssertCurrentThreadName(const char *threadName);
// Just gets a cheap thread identifier so that you can see different threads in debug output,
// exactly what it is is badly specified and not useful for anything.
int GetCurrentThreadIdForDebug();

View file

@ -423,12 +423,6 @@ std::string CreateRandMAC() {
return randStream.str();
}
static int DefaultNumWorkers() {
// Let's cap the global thread pool at 16 threads. Nothing we do really should have much
// use for more...
return std::min(16, cpu_info.num_cores);
}
static int DefaultCpuCore() {
#if PPSSPP_ARCH(ARM) || PPSSPP_ARCH(ARM64) || PPSSPP_ARCH(X86) || PPSSPP_ARCH(AMD64)
return (int)CPUCore::JIT;
@ -473,7 +467,6 @@ static ConfigSetting generalSettings[] = {
ConfigSetting("DiscordPresence", &g_Config.bDiscordPresence, true, true, false), // Or maybe it makes sense to have it per-game? Race conditions abound...
ConfigSetting("UISound", &g_Config.bUISound, false, true, false),
ReportedConfigSetting("NumWorkerThreads", &g_Config.iNumWorkerThreads, &DefaultNumWorkers, true, true),
ConfigSetting("AutoLoadSaveState", &g_Config.iAutoLoadSaveState, 0, true, true),
ReportedConfigSetting("EnableCheats", &g_Config.bEnableCheats, false, true, true),
ConfigSetting("CwCheatRefreshRate", &g_Config.iCwCheatRefreshRate, 77, true, true),

View file

@ -72,7 +72,6 @@ public:
bool bBrowse; // when opening the emulator, immediately show a file browser
// General
int iNumWorkerThreads;
bool bScreenshotsAsPNG;
bool bUseFFV1;
bool bDumpFrames;

View file

@ -752,7 +752,7 @@ std::shared_ptr<GameInfo> GameInfoCache::GetInfo(Draw::DrawContext *draw, const
}
GameInfoWorkItem *item = new GameInfoWorkItem(gamePath, info);
g_threadManager.EnqueueTask(item);
g_threadManager.EnqueueTask(item, TaskType::IO_BLOCKING);
// Don't re-insert if we already have it.
if (info_.find(pathStr) == info_.end())

View file

@ -338,7 +338,7 @@ static void PostLoadConfig() {
else
i18nrepo.LoadIni(g_Config.sLanguageIni, langOverridePath);
g_threadManager.Init(g_Config.iNumWorkerThreads);
g_threadManager.Init(cpu_info.num_cores, cpu_info.logical_cpu_count);
}
static bool CreateDirectoriesAndroid() {

View file

@ -388,7 +388,6 @@ int main(int argc, const char* argv[])
g_Config.iInternalResolution = 1;
g_Config.iUnthrottleMode = (int)UnthrottleMode::CONTINUOUS;
g_Config.bEnableLogging = fullLog;
g_Config.iNumWorkerThreads = 1;
g_Config.bSoftwareSkinning = true;
g_Config.bVertexDecoderJit = true;
g_Config.bBlockTransferGPU = true;

View file

@ -6,6 +6,7 @@
#include "Common/Thread/Channel.h"
#include "Common/Thread/Promise.h"
#include "Common/Thread/ParallelLoop.h"
#include "Common/Thread/ThreadUtil.h"
struct ResultObject {
bool ok;
@ -13,6 +14,7 @@ struct ResultObject {
ResultObject *ResultProducer() {
sleep_ms(250);
printf("result produced: thread %d\n", GetCurrentThreadIdForDebug());
return new ResultObject{ true };
}
@ -28,16 +30,23 @@ bool TestMailbox() {
}
void rangeFunc(int lower, int upper) {
printf("%d-%d\n", lower, upper);
sleep_ms(30);
printf("range %d-%d (thread %d)\n", lower, upper, GetCurrentThreadIdForDebug());
}
// This always passes unless something is badly broken, the interesting thing is the
// logged output.
bool TestParallelLoop(ThreadManager *threadMan) {
printf("tester thread ID: %d", GetCurrentThreadIdForDebug());
WaitableCounter *waitable = ParallelRangeLoopWaitable(threadMan, rangeFunc, 0, 7, 1);
// Can do stuff here if we like.
waitable->WaitAndRelease();
// Now it's done.
ParallelRangeLoop(threadMan, rangeFunc, 0, 65);
// Try a few synchronous loops (can be slightly more efficient) with various ranges.
return true;
}
@ -47,9 +56,9 @@ bool TestThreadManager() {
}
ThreadManager manager;
manager.Init(8);
manager.Init(8, 8);
Promise<ResultObject> *object(Promise<ResultObject>::Spawn(&manager, &ResultProducer));
Promise<ResultObject> *object(Promise<ResultObject>::Spawn(&manager, &ResultProducer, TaskType::IO_BLOCKING));
if (!TestParallelLoop(&manager)) {
return false;