Greatly simplify the thread synchronization.

This commit is contained in:
Henrik Rydgård 2022-09-23 19:39:00 +02:00
parent f784112814
commit ef7c8844f8
6 changed files with 116 additions and 142 deletions

View file

@ -146,8 +146,11 @@ void FrameData::SubmitPending(VulkanContext *vulkan, FrameSubmitType type, Frame
hasPresentCommands = false;
if (type == FrameSubmitType::Present) {
NOTICE_LOG(G3D, "Has present commands, Triggering fence");
fenceToTrigger = fence;
}
} else {
NOTICE_LOG(G3D, "No present commands, not triggering fence");
}
if (!numCmdBufs && fenceToTrigger == VK_NULL_HANDLE) {
@ -181,14 +184,6 @@ void FrameData::SubmitPending(VulkanContext *vulkan, FrameSubmitType type, Frame
vkWaitForFences(vulkan->GetDevice(), 1, &readbackFence, true, UINT64_MAX);
vkResetFences(vulkan->GetDevice(), 1, &readbackFence);
}
// When !triggerFence, we notify after syncing with Vulkan.
if (type == FrameSubmitType::Present || type == FrameSubmitType::Sync) {
VERBOSE_LOG(G3D, "PULL: Frame %d.readyForFence = true", index);
std::unique_lock<std::mutex> lock(push_mutex);
readyForFence = true; // misnomer in sync mode!
push_condVar.notify_all();
}
}
void FrameDataShared::Init(VulkanContext *vulkan) {

View file

@ -7,15 +7,14 @@
#include "Common/GPU/Vulkan/VulkanContext.h"
struct VKRStep;
enum {
MAX_TIMESTAMP_QUERIES = 128,
};
enum class VKRRunType {
END,
PRESENT,
SYNC,
EXIT,
};
struct QueueProfileContext {
@ -43,14 +42,6 @@ enum class FrameSubmitType {
// Per-frame data, round-robin so we can overlap submission with execution of the previous frame.
struct FrameData {
std::mutex push_mutex;
std::condition_variable push_condVar;
std::mutex pull_mutex;
std::condition_variable pull_condVar;
bool readyForFence = true;
bool readyForRun = false; // protected by pull_mutex
bool skipSwap = false;
VkFence fence;
@ -68,9 +59,10 @@ struct FrameData {
bool hasMainCommands = false;
bool hasPresentCommands = false;
bool hasFencePending = false;
bool hasAcquired = false;
std::vector<VKRStep *> steps;
bool syncDone = false;
// Swapchain.
uint32_t curSwapchainImage = -1;
@ -89,12 +81,6 @@ struct FrameData {
// This will only submit if we are actually recording init commands.
void SubmitPending(VulkanContext *vulkan, FrameSubmitType type, FrameDataShared &shared);
VKRRunType RunType() const {
return runType_;
}
VKRRunType runType_ = VKRRunType::END;
private:
// Metadata for logging etc
int index;

View file

@ -554,7 +554,7 @@ void VulkanQueueRunner::PreprocessSteps(std::vector<VKRStep *> &steps) {
}
}
void VulkanQueueRunner::RunSteps(FrameData &frameData, FrameDataShared &frameDataShared) {
void VulkanQueueRunner::RunSteps(std::vector<VKRStep *> &steps, FrameData &frameData, FrameDataShared &frameDataShared) {
QueueProfileContext *profile = frameData.profilingEnabled_ ? &frameData.profile : nullptr;
if (profile)
@ -564,8 +564,8 @@ void VulkanQueueRunner::RunSteps(FrameData &frameData, FrameDataShared &frameDat
VkCommandBuffer cmd = frameData.hasPresentCommands ? frameData.presentCmd : frameData.mainCmd;
for (size_t i = 0; i < frameData.steps.size(); i++) {
const VKRStep &step = *frameData.steps[i];
for (size_t i = 0; i < steps.size(); i++) {
const VKRStep &step = *steps[i];
if (emitLabels) {
VkDebugUtilsLabelEXT labelInfo{ VK_STRUCTURE_TYPE_DEBUG_UTILS_LABEL_EXT };
@ -625,11 +625,11 @@ void VulkanQueueRunner::RunSteps(FrameData &frameData, FrameDataShared &frameDat
// Deleting all in one go should be easier on the instruction cache than deleting
// them as we go - and easier to debug because we can look backwards in the frame.
for (auto step : frameData.steps) {
for (auto step : steps) {
delete step;
}
frameData.steps.clear();
steps.clear();
if (profile)
profile->cpuEndTime = time_now_d();

View file

@ -247,6 +247,14 @@ private:
RPKey key_;
};
// These are enqueued from the main thread,
// and the render thread pops them off
struct VKRRenderThreadTask {
std::vector<VKRStep *> steps;
int frame;
VKRRunType runType;
};
class VulkanQueueRunner {
public:
VulkanQueueRunner(VulkanContext *vulkan) : vulkan_(vulkan), renderPasses_(16) {}
@ -257,7 +265,7 @@ public:
}
void PreprocessSteps(std::vector<VKRStep *> &steps);
void RunSteps(FrameData &frameData, FrameDataShared &frameDataShared);
void RunSteps(std::vector<VKRStep *> &steps, FrameData &frameData, FrameDataShared &frameDataShared);
void LogSteps(const std::vector<VKRStep *> &steps, bool verbose);
std::string StepToString(const VKRStep &step) const;

View file

@ -14,7 +14,7 @@
#include "Common/Thread/ThreadUtil.h"
#if 0 // def _DEBUG
#define VLOG(...) INFO_LOG(G3D, __VA_ARGS__)
#define VLOG(...) NOTICE_LOG(G3D, __VA_ARGS__)
#else
#define VLOG(...)
#endif
@ -333,10 +333,9 @@ bool VulkanRenderManager::CreateBackbuffers() {
// Start the thread.
if (HasBackbuffers()) {
run_ = true;
// Won't necessarily be 0.
threadInitFrame_ = vulkan_->GetCurFrame();
INFO_LOG(G3D, "Starting Vulkan submission thread (threadInitFrame_ = %d)", vulkan_->GetCurFrame());
run_ = true; // For controlling the compiler thread's exit
INFO_LOG(G3D, "Starting Vulkan submission thread");
thread_ = std::thread(&VulkanRenderManager::ThreadFunc, this);
INFO_LOG(G3D, "Starting Vulkan compiler thread");
compileThread_ = std::thread(&VulkanRenderManager::CompileThreadFunc, this);
@ -344,29 +343,32 @@ bool VulkanRenderManager::CreateBackbuffers() {
return true;
}
// Called from main thread.
void VulkanRenderManager::StopThread() {
if (!run_) {
INFO_LOG(G3D, "Vulkan submission thread was already stopped.");
return;
{
// Tell the render thread to quit when it's done.
std::unique_lock<std::mutex> lock(pushMutex_);
VKRRenderThreadTask task;
task.frame = vulkan_->GetCurFrame();
task.runType = VKRRunType::EXIT;
renderThreadQueue_.push(task);
pushCondVar_.notify_one();
}
// Compiler thread still relies on this.
run_ = false;
// Stop the thread.
thread_.join();
for (int i = 0; i < vulkan_->GetInflightFrames(); i++) {
auto &frameData = frameData_[i];
{
std::unique_lock<std::mutex> lock(frameData.push_mutex);
frameData.push_condVar.notify_all();
}
{
std::unique_lock<std::mutex> lock(frameData.pull_mutex);
frameData.pull_condVar.notify_all();
}
// Zero the queries so we don't try to pull them later.
frameData.profile.timestampDescriptions.clear();
}
thread_.join();
INFO_LOG(G3D, "Vulkan submission thread joined. Frame=%d", vulkan_->GetCurFrame());
compileCond_.notify_all();
compileThread_.join();
INFO_LOG(G3D, "Vulkan compiler thread joined.");
@ -374,28 +376,24 @@ void VulkanRenderManager::StopThread() {
// Eat whatever has been queued up for this frame if anything.
Wipe();
// Wait for any fences to finish and be resignaled, so we don't have sync issues.
// Also clean out any queued data, which might refer to things that might not be valid
// when we restart...
// Clean out any remaining queued data, which might refer to things that might not be valid
// when we restart the thread...
// Not sure if this is still needed
for (int i = 0; i < vulkan_->GetInflightFrames(); i++) {
auto &frameData = frameData_[i];
_assert_(!frameData.readyForRun);
_assert_(frameData.steps.empty());
if (frameData.hasInitCommands) {
// Clear 'em out. This can happen on restart sometimes.
vkEndCommandBuffer(frameData.initCmd);
frameData.hasInitCommands = false;
}
frameData.readyForRun = false;
for (size_t i = 0; i < frameData.steps.size(); i++) {
delete frameData.steps[i];
if (frameData.hasMainCommands) {
vkEndCommandBuffer(frameData.mainCmd);
frameData.hasMainCommands = false;
}
frameData.steps.clear();
std::unique_lock<std::mutex> lock(frameData.push_mutex);
while (!frameData.readyForFence) {
VLOG("PUSH: Waiting for frame[%d].readyForFence = 1 (stop)", i);
frameData.push_condVar.wait(lock);
if (frameData.hasPresentCommands) {
vkEndCommandBuffer(frameData.presentCmd);
frameData.hasPresentCommands = false;
}
}
}
@ -468,45 +466,30 @@ void VulkanRenderManager::DrainCompileQueue() {
void VulkanRenderManager::ThreadFunc() {
SetCurrentThreadName("RenderMan");
int threadFrame = threadInitFrame_;
bool nextFrame = false;
bool firstFrame = true;
while (true) {
// Pop a task of the queue and execute it.
VKRRenderThreadTask task;
{
if (nextFrame) {
threadFrame++;
if (threadFrame >= vulkan_->GetInflightFrames())
threadFrame = 0;
std::unique_lock<std::mutex> lock(pushMutex_);
while (renderThreadQueue_.empty()) {
pushCondVar_.wait(lock);
}
FrameData &frameData = frameData_[threadFrame];
std::unique_lock<std::mutex> lock(frameData.pull_mutex);
while (!frameData.readyForRun && run_) {
VLOG("PULL: Waiting for frame[%d].readyForRun", threadFrame);
frameData.pull_condVar.wait(lock);
}
if (!frameData.readyForRun && !run_) {
// This means we're out of frames to render and run_ is false, so bail.
break;
}
VLOG("PULL: frame[%d].readyForRun = false", threadFrame);
frameData.readyForRun = false;
// Previously we had a quick exit here that avoided calling Run() if run_ was suddenly false,
// but that created a race condition where frames could end up not finished properly on resize etc.
// Only increment next time if we're done.
nextFrame = frameData.RunType() == VKRRunType::END;
task = renderThreadQueue_.front();
renderThreadQueue_.pop();
}
VLOG("PULL: Running frame %d", threadFrame);
if (firstFrame) {
INFO_LOG(G3D, "Running first frame (%d)", threadFrame);
firstFrame = false;
// Oh, we got a task! We can now have pushMutex_ unlocked, allowing the host to
// push more work when it feels like it, and just start working.
if (task.runType == VKRRunType::EXIT) {
// Oh, host wanted out. Let's leave.
break;
}
Run(threadFrame);
VLOG("PULL: Finished frame %d", threadFrame);
Run(task);
}
// Wait for the device to be done with everything, before tearing stuff down.
// TODO: Do we need this?
vkDeviceWaitIdle(vulkan_->GetDevice());
VLOG("PULL: Quitting");
@ -519,19 +502,10 @@ void VulkanRenderManager::BeginFrame(bool enableProfiling, bool enableLogProfile
int curFrame = vulkan_->GetCurFrame();
FrameData &frameData = frameData_[curFrame];
// Make sure the very last command buffer from the frame before the previous has been fully executed.
{
std::unique_lock<std::mutex> lock(frameData.push_mutex);
while (!frameData.readyForFence) {
VLOG("PUSH: Waiting for frame[%d].readyForFence = 1", curFrame);
frameData.push_condVar.wait(lock);
}
frameData.readyForFence = false;
}
VLOG("PUSH: Fencing %d", curFrame);
// This must be the very first Vulkan call we do in a new frame.
// Makes sure the very last command buffer from the frame before the previous has been fully executed.
if (vkWaitForFences(device, 1, &frameData.fence, true, UINT64_MAX) == VK_ERROR_DEVICE_LOST) {
_assert_msg_(false, "Device lost in vkWaitForFences");
}
@ -578,9 +552,6 @@ void VulkanRenderManager::BeginFrame(bool enableProfiling, bool enableLogProfile
// Must be after the fence - this performs deletes.
VLOG("PUSH: BeginFrame %d", curFrame);
if (!run_) {
WARN_LOG(G3D, "BeginFrame while !run_!");
}
vulkan_->BeginFrame(enableLogProfiler ? GetInitCmd() : VK_NULL_HANDLE);
@ -1085,7 +1056,6 @@ void VulkanRenderManager::CopyFramebuffer(VKRFramebuffer *src, VkRect2D srcRect,
if (dstPos.x != 0 || dstPos.y != 0 || !fillsDst)
step->dependencies.insert(dst);
std::unique_lock<std::mutex> lock(mutex_);
steps_.push_back(step);
}
@ -1131,7 +1101,6 @@ void VulkanRenderManager::BlitFramebuffer(VKRFramebuffer *src, VkRect2D srcRect,
if (!fillsDst)
step->dependencies.insert(dst);
std::unique_lock<std::mutex> lock(mutex_);
steps_.push_back(step);
}
@ -1191,16 +1160,18 @@ void VulkanRenderManager::Finish() {
FrameData &frameData = frameData_[curFrame];
{
std::unique_lock<std::mutex> lock(frameData.pull_mutex);
VLOG("PUSH: Frame[%d].readyForRun = true", curFrame);
frameData.steps = std::move(steps_);
steps_.clear();
frameData.readyForRun = true;
frameData.runType_ = VKRRunType::END;
frameData.pull_condVar.notify_all();
VLOG("PUSH: Frame[%d]", curFrame);
std::unique_lock<std::mutex> lock(pushMutex_);
VKRRenderThreadTask task;
task.frame = curFrame;
task.runType = VKRRunType::PRESENT;
renderThreadQueue_.push(task);
renderThreadQueue_.back().steps = std::move(steps_);
pushCondVar_.notify_one();
}
vulkan_->EndFrame();
steps_.clear();
vulkan_->EndFrame();
insideFrame_ = false;
}
@ -1214,8 +1185,8 @@ void VulkanRenderManager::Wipe() {
// Called on the render thread.
//
// Can be called again after a VKRRunType::SYNC on the same frame.
void VulkanRenderManager::Run(int frame) {
FrameData &frameData = frameData_[frame];
void VulkanRenderManager::Run(VKRRenderThreadTask &task) {
FrameData &frameData = frameData_[task.frame];
_dbg_assert_(!frameData.hasPresentCommands);
frameData.SubmitPending(vulkan_, FrameSubmitType::Pending, frameDataShared_);
@ -1232,16 +1203,15 @@ void VulkanRenderManager::Run(int frame) {
_assert_msg_(res == VK_SUCCESS, "vkBeginCommandBuffer failed! result=%s", VulkanResultToString(res));
}
queueRunner_.PreprocessSteps(frameData.steps);
queueRunner_.PreprocessSteps(task.steps);
// Likely during shutdown, happens in headless.
if (frameData.steps.empty() && !frameData.hasAcquired)
if (task.steps.empty() && !frameData.hasAcquired)
frameData.skipSwap = true;
//queueRunner_.LogSteps(stepsOnThread, false);
queueRunner_.RunSteps(frameData, frameDataShared_);
switch (frameData.runType_) {
case VKRRunType::END:
queueRunner_.RunSteps(task.steps, frameData, frameDataShared_);
switch (task.runType) {
case VKRRunType::PRESENT:
frameData.SubmitPending(vulkan_, FrameSubmitType::Present, frameDataShared_);
if (!frameData.skipSwap) {
@ -1268,6 +1238,12 @@ void VulkanRenderManager::Run(int frame) {
case VKRRunType::SYNC:
// The submit will trigger the readbackFence, and also do the wait for it.
frameData.SubmitPending(vulkan_, FrameSubmitType::Sync, frameDataShared_);
{
std::unique_lock<std::mutex> lock(syncMutex_);
syncCondVar_.notify_one();
}
// At this point the GPU is idle, and we can resume filling the command buffers for the
// current frame since and thus all previously enqueued command buffers have been
// processed. No need to switch to the next frame number, would just be confusing.
@ -1277,7 +1253,7 @@ void VulkanRenderManager::Run(int frame) {
_dbg_assert_(false);
}
VLOG("PULL: Finished running frame %d", frame);
VLOG("PULL: Finished running frame %d", task.frame);
}
// Called from main thread.
@ -1288,23 +1264,23 @@ void VulkanRenderManager::FlushSync() {
FrameData &frameData = frameData_[curFrame];
{
std::unique_lock<std::mutex> lock(frameData.pull_mutex);
VLOG("PUSH: Frame[%d].readyForRun = true (sync)", curFrame);
frameData.steps = std::move(steps_);
steps_.clear();
frameData.readyForRun = true;
_dbg_assert_(!frameData.readyForFence);
frameData.runType_ = VKRRunType::SYNC;
frameData.pull_condVar.notify_all();
VLOG("PUSH: Frame[%d]", curFrame);
std::unique_lock<std::mutex> lock(pushMutex_);
VKRRenderThreadTask task;
task.frame = curFrame;
task.runType = VKRRunType::SYNC;
renderThreadQueue_.push(task);
renderThreadQueue_.back().steps = std::move(steps_);
pushCondVar_.notify_one();
}
{
std::unique_lock<std::mutex> lock(frameData.push_mutex);
std::unique_lock<std::mutex> lock(syncMutex_);
// Wait for the flush to be hit, since we're syncing.
while (!frameData.readyForFence) {
while (!frameData.syncDone) {
VLOG("PUSH: Waiting for frame[%d].readyForFence = 1 (sync)", curFrame);
frameData.push_condVar.wait(lock);
syncCondVar_.wait(lock);
}
frameData.readyForFence = false;
frameData.syncDone = false;
}
}

View file

@ -466,7 +466,7 @@ private:
void CompileThreadFunc();
void DrainCompileQueue();
void Run(int frame);
void Run(VKRRenderThreadTask &task);
void BeginSubmitFrame(int frame);
// Bad for performance but sometimes necessary for synchronous CPU readbacks (screenshots and whatnot).
@ -492,6 +492,8 @@ private:
int curHeight_ = -1;
bool insideFrame_ = false;
bool run_ = false;
// This is the offset within this frame, in case of a mid-frame sync.
int renderStepOffset_ = 0;
VKRStep *curRenderStep_ = nullptr;
@ -503,13 +505,20 @@ private:
std::vector<VKRStep *> steps_;
// Execution time state
bool run_ = true;
VulkanContext *vulkan_;
std::thread thread_;
std::mutex mutex_;
int threadInitFrame_ = 0;
VulkanQueueRunner queueRunner_;
// For pushing data on the queue.
std::mutex pushMutex_;
std::condition_variable pushCondVar_;
std::queue<VKRRenderThreadTask> renderThreadQueue_;
// For readbacks and other reasons we need to sync with the render thread.
std::mutex syncMutex_;
std::condition_variable syncCondVar_;
// Shader compilation thread to compile while emulating the rest of the frame.
// Only one right now but we could use more.
std::thread compileThread_;