ThreadManager: Add a simple priority field.

Currently, not actually respected.
This commit is contained in:
Unknown W. Brackets 2023-01-15 07:55:07 -08:00
parent 13b3a540c3
commit 88ba003f46
13 changed files with 83 additions and 29 deletions

View file

@ -403,6 +403,10 @@ public:
return TaskType::CPU_COMPUTE;
}
TaskPriority Priority() const override {
return TaskPriority::HIGH;
}
void Run() override {
for (auto &task : tasks_) {
task.pipeline->Create(vulkan_, task.compatibleRenderPass, task.rpType, task.sampleCount, task.scheduleTime, task.countToCompile);

View file

@ -6,13 +6,17 @@
class LoopRangeTask : public Task {
public:
LoopRangeTask(WaitableCounter *counter, const std::function<void(int, int)> &loop, int lower, int upper)
: counter_(counter), loop_(loop), lower_(lower), upper_(upper) {}
LoopRangeTask(WaitableCounter *counter, const std::function<void(int, int)> &loop, int lower, int upper, TaskPriority p)
: counter_(counter), loop_(loop), lower_(lower), upper_(upper), priority_(p) {}
TaskType Type() const override {
return TaskType::CPU_COMPUTE;
}
TaskPriority Priority() const override {
return priority_;
}
void Run() override {
loop_(lower_, upper_);
counter_->Count();
@ -23,9 +27,10 @@ public:
int lower_;
int upper_;
const TaskPriority priority_;
};
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority) {
if (minSize == -1) {
minSize = 1;
}
@ -38,7 +43,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
} else if (range <= minSize) {
// Single background task.
WaitableCounter *waitableCounter = new WaitableCounter(1);
threadMan->EnqueueTaskOnThread(0, new LoopRangeTask(waitableCounter, loop, lower, upper));
threadMan->EnqueueTaskOnThread(0, new LoopRangeTask(waitableCounter, loop, lower, upper, priority));
return waitableCounter;
} else {
// Split the range between threads. Allow for some fractional bits.
@ -65,7 +70,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
// Let's do the stragglers on the current thread.
break;
}
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(waitableCounter, loop, start, end));
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(waitableCounter, loop, start, end, priority));
counter += delta;
if ((counter >> fractionalBits) >= upper) {
break;
@ -83,7 +88,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
}
}
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority) {
if (cpu_info.num_cores == 1 || (minSize >= (upper - lower) && upper > lower)) {
// "Optimization" for single-core devices, or minSize larger than the range.
// No point in adding threading overhead, let's just do it inline (since this is the blocking variant).
@ -96,7 +101,7 @@ void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, i
minSize = 1;
}
WaitableCounter *counter = ParallelRangeLoopWaitable(threadMan, loop, lower, upper, minSize);
WaitableCounter *counter = ParallelRangeLoopWaitable(threadMan, loop, lower, upper, minSize, priority);
// TODO: Optimize using minSize. We'll just compute whether there's a remainer, remove it from the call to ParallelRangeLoopWaitable,
// and process the remainder right here. If there's no remainer, we'll steal a whole chunk.
if (counter) {
@ -105,7 +110,7 @@ void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, i
}
// NOTE: Supports a max of 2GB.
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes) {
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes, TaskPriority priority) {
// This threshold can probably be a lot bigger.
if (bytes < 512) {
memcpy(dst, src, bytes);
@ -118,11 +123,11 @@ void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t
const char *s = (const char *)src;
ParallelRangeLoop(threadMan, [&](int l, int h) {
memmove(d + l, s + l, h - l);
}, 0, (int)bytes, 128 * 1024);
}, 0, (int)bytes, 128 * 1024, priority);
}
// NOTE: Supports a max of 2GB.
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes) {
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes, TaskPriority priority) {
// This threshold can probably be a lot bigger.
if (bytes < 512) {
memset(dst, 0, bytes);
@ -134,5 +139,5 @@ void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t b
char *d = (char *)dst;
ParallelRangeLoop(threadMan, [&](int l, int h) {
memset(d + l, value, h - l);
}, 0, (int)bytes, 128 * 1024);
}, 0, (int)bytes, 128 * 1024, priority);
}

View file

@ -36,13 +36,13 @@ public:
};
// Note that upper bounds are non-inclusive: range is [lower, upper)
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize);
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority);
// Note that upper bounds are non-inclusive: range is [lower, upper)
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize);
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority = TaskPriority::NORMAL);
// Common utilities for large (!) memory copies.
// Will only fall back to threads if it seems to make sense.
// NOTE: These support a max of 2GB.
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes);
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes);
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes, TaskPriority priority = TaskPriority::NORMAL);
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes, TaskPriority priority = TaskPriority::NORMAL);

View file

@ -10,7 +10,8 @@
template<class T>
class PromiseTask : public Task {
public:
PromiseTask(std::function<T ()> fun, Mailbox<T> *tx, TaskType t) : fun_(fun), tx_(tx), type_(t) {
PromiseTask(std::function<T ()> fun, Mailbox<T> *tx, TaskType t, TaskPriority p)
: fun_(fun), tx_(tx), type_(t), priority_(p) {
tx_->AddRef();
}
~PromiseTask() {
@ -21,6 +22,10 @@ public:
return type_;
}
TaskPriority Priority() const override {
return priority_;
}
void Run() override {
T value = fun_();
tx_->Send(value);
@ -28,7 +33,8 @@ public:
std::function<T ()> fun_;
Mailbox<T> *tx_;
TaskType type_;
const TaskType type_;
const TaskPriority priority_;
};
// Represents pending or actual data.
@ -39,13 +45,13 @@ public:
template<class T>
class Promise {
public:
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T()> fun, TaskType taskType) {
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T()> fun, TaskType taskType, TaskPriority taskPriority = TaskPriority::NORMAL) {
Mailbox<T> *mailbox = new Mailbox<T>();
Promise<T> *promise = new Promise<T>();
promise->rx_ = mailbox;
PromiseTask<T> *task = new PromiseTask<T>(fun, mailbox, taskType);
PromiseTask<T> *task = new PromiseTask<T>(fun, mailbox, taskType, taskPriority);
threadman->EnqueueTask(task);
return promise;
}
@ -65,8 +71,8 @@ public:
}
// Allow an empty promise to spawn, too, in case we want to delay it.
void SpawnEmpty(ThreadManager *threadman, std::function<T()> fun, TaskType taskType) {
PromiseTask<T> *task = new PromiseTask<T>(fun, rx_, taskType);
void SpawnEmpty(ThreadManager *threadman, std::function<T()> fun, TaskType taskType, TaskPriority taskPriority = TaskPriority::NORMAL) {
PromiseTask<T> *task = new PromiseTask<T>(fun, rx_, taskType, taskPriority);
threadman->EnqueueTask(task);
}

View file

@ -11,11 +11,20 @@ enum class TaskType {
DEDICATED_THREAD, // These can never get stuck in queue behind others, but are more expensive to launch. Cannot use I/O.
};
enum class TaskPriority {
HIGH = 0,
NORMAL = 1,
LOW = 2,
COUNT,
};
// Implement this to make something that you can run on the thread manager.
class Task {
public:
virtual ~Task() {}
virtual TaskType Type() const = 0;
virtual TaskPriority Priority() const = 0;
virtual void Run() = 0;
virtual bool Cancellable() { return false; }
virtual void Cancel() {}

View file

@ -100,7 +100,7 @@ bool ElfReader::LoadRelocations(const Elf32_Rel *rels, int numRelocs) {
relocOps[r] = Memory::ReadUnchecked_Instruction(addr, true).encoding;
}
}, 0, numRelocs, 128);
}, 0, numRelocs, 128, TaskPriority::HIGH);
ParallelRangeLoop(&g_threadManager, [&](int l, int h) {
for (int r = l; r < h; r++) {
@ -213,7 +213,7 @@ bool ElfReader::LoadRelocations(const Elf32_Rel *rels, int numRelocs) {
Memory::WriteUnchecked_U32(op, addr);
NotifyMemInfo(MemBlockFlags::WRITE, addr, 4, "Relocation");
}
}, 0, numRelocs, 128);
}, 0, numRelocs, 128, TaskPriority::HIGH);
if (numErrors) {
WARN_LOG(LOADER, "%i bad relocations found!!!", numErrors.load());

View file

@ -434,7 +434,7 @@ void __KernelMemoryInit()
kernelMemory.Init(PSP_GetKernelMemoryBase(), PSP_GetKernelMemoryEnd() - PSP_GetKernelMemoryBase(), false);
userMemory.Init(PSP_GetUserMemoryBase(), PSP_GetUserMemoryEnd() - PSP_GetUserMemoryBase(), false);
volatileMemory.Init(PSP_GetVolatileMemoryStart(), PSP_GetVolatileMemoryEnd() - PSP_GetVolatileMemoryStart(), false);
ParallelMemset(&g_threadManager, Memory::GetPointerWrite(PSP_GetKernelMemoryBase()), 0, PSP_GetUserMemoryEnd() - PSP_GetKernelMemoryBase());
ParallelMemset(&g_threadManager, Memory::GetPointerWrite(PSP_GetKernelMemoryBase()), 0, PSP_GetUserMemoryEnd() - PSP_GetKernelMemoryBase(), TaskPriority::HIGH);
NotifyMemInfo(MemBlockFlags::WRITE, PSP_GetKernelMemoryBase(), PSP_GetUserMemoryEnd() - PSP_GetKernelMemoryBase(), "MemInit");
INFO_LOG(SCEKERNEL, "Kernel and user memory pools initialized");

View file

@ -708,7 +708,13 @@ public:
TextureSaveTask(SimpleBuf<u32> _data) : data(std::move(_data)) {}
TaskType Type() const override { return TaskType::CPU_COMPUTE; } // Also I/O blocking but dominated by compute
// Also I/O blocking but dominated by compute.
TaskType Type() const override { return TaskType::CPU_COMPUTE; }
TaskPriority Priority() const override {
return TaskPriority::LOW;
}
void Run() override {
const Path filename = basePath / hashfile;
const Path saveFilename = basePath / NEW_TEXTURE_DIR / hashfile;
@ -995,6 +1001,10 @@ public:
return TaskType::IO_BLOCKING;
}
TaskPriority Priority() const override {
return TaskPriority::NORMAL;
}
void Run() override {
tex_.Prepare();
waitable_->Notify();

View file

@ -287,7 +287,7 @@ static const u8 *mymemmem(const u8 *haystack, size_t off, size_t hlen, const u8
p++;
alignp();
}
}, 0, range, 128 * 1024);
}, 0, range, 128 * 1024, TaskPriority::LOW);
return result;
}

View file

@ -105,6 +105,11 @@ public:
return TaskType::CPU_COMPUTE;
}
TaskPriority Priority() const override {
// Let priority emulation tasks win over this.
return TaskPriority::NORMAL;
}
void Run() override {
ProcessItems();
status_ = false;

View file

@ -345,6 +345,18 @@ public:
return TaskType::IO_BLOCKING;
}
TaskPriority Priority() const override {
switch (gamePath_.Type()) {
case PathType::NATIVE:
case PathType::CONTENT_URI:
return TaskPriority::NORMAL;
default:
// Remote/network access.
return TaskPriority::LOW;
}
}
void Run() override {
// An early-return will result in the destructor running, where we can set
// flags like working and pending.

View file

@ -755,7 +755,7 @@ UI::EventReturn ConfirmMemstickMoveScreen::OnConfirm(UI::EventParams &params) {
}
return new MoveResult{ true, "", failedFiles };
}, TaskType::IO_BLOCKING);
}, TaskType::IO_BLOCKING, TaskPriority::HIGH);
RecreateViews();
} else {

View file

@ -45,7 +45,7 @@ bool TestParallelLoop(ThreadManager *threadMan) {
printf("tester thread ID: %d\n", GetCurrentThreadIdForDebug());
printf("waitable test\n");
WaitableCounter *waitable = ParallelRangeLoopWaitable(threadMan, rangeFunc, 0, 7, 1);
WaitableCounter *waitable = ParallelRangeLoopWaitable(threadMan, rangeFunc, 0, 7, 1, TaskPriority::HIGH);
// Can do stuff here if we like.
waitable->WaitAndRelease();
// Now it's done.
@ -58,7 +58,7 @@ bool TestParallelLoop(ThreadManager *threadMan) {
ParallelRangeLoop(threadMan, rangeFunc, 0, 100, 40);
// Try a loop with minimum size larger than range.
printf("waitable test [10-30)\n");
WaitableCounter *waitable2 = ParallelRangeLoopWaitable(threadMan, rangeFunc, 10, 30, 40);
WaitableCounter *waitable2 = ParallelRangeLoopWaitable(threadMan, rangeFunc, 10, 30, 40, TaskPriority::LOW);
waitable2->WaitAndRelease();
return true;
}
@ -75,6 +75,9 @@ public:
IncrementTask(TaskType type, LimitedWaitable *waitable) : type_(type), waitable_(waitable) {}
~IncrementTask() {}
TaskType Type() const override { return type_; }
TaskPriority Priority() const override {
return TaskPriority::NORMAL;
}
void Run() override {
g_atomicCounter++;
waitable_->Notify();