From e45ce3af96f4720d7566c56bb1b0117c73c575c5 Mon Sep 17 00:00:00 2001 From: Arthur Blot Date: Sat, 10 Nov 2012 23:17:15 +0100 Subject: [PATCH 1/2] Implemented message pipes (may have issues) --- Core/HLE/sceKernelMsgPipe.cpp | 529 +++++++++++++++++++++++++++------- Core/HLE/sceKernelThread.h | 2 +- 2 files changed, 425 insertions(+), 106 deletions(-) diff --git a/Core/HLE/sceKernelMsgPipe.cpp b/Core/HLE/sceKernelMsgPipe.cpp index 80fdad860f..ca8fa76a73 100644 --- a/Core/HLE/sceKernelMsgPipe.cpp +++ b/Core/HLE/sceKernelMsgPipe.cpp @@ -20,6 +20,14 @@ #include "sceKernelMsgPipe.h" #include "sceKernelThread.h" +#define SCE_KERNEL_MPA_THFIFO_S 0x0000 +#define SCE_KERNEL_MPA_THPRI_S 0x0100 +#define SCE_KERNEL_MPA_THFIFO_R 0x0000 +#define SCE_KERNEL_MPA_THPRI_R 0x1000 + +#define SCE_KERNEL_MPW_FULL 0 +#define SCE_KERNEL_MPW_ASAP 1 + struct NativeMsgPipe { SceSize size; @@ -31,7 +39,17 @@ struct NativeMsgPipe int numReceiveWaitThreads; }; -struct MsgPipe : public KernelObject +struct MsgPipeWaitingThread +{ + SceUID id; + u32 bufAddr; + u32 bufSize; + u32 freeSize; + int waitMode; + u32 transferredBytesAddr; +}; + +struct MsgPipe : public KernelObject { const char *GetName() {return nmp.name;} const char *GetTypeName() {return "MsgPipe";} @@ -40,13 +58,99 @@ struct MsgPipe : public KernelObject NativeMsgPipe nmp; - std::vector sendWaitingThreads; - std::vector receiveWaitingThreads; + std::vector sendWaitingThreads; + std::vector receiveWaitingThreads; + + void AddWaitingThread(std::vector &list, SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr, bool usePrio) + { + MsgPipeWaitingThread thread = { id, addr, size, size, waitMode, transferredBytesAddr }; + if (usePrio) + { + for (std::vector::iterator it = list.begin(); it != list.end(); it++) + { + if (__KernelGetThreadPrio(id) >= __KernelGetThreadPrio((*it).id)) + { + list.insert(it, thread); + break; + } + } + } + else + { + list.push_back(thread); + } + } + + void AddSendWaitingThread(SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr) + { + bool usePrio = ((nmp.attr & SCE_KERNEL_MPA_THPRI_S) != 0); + AddWaitingThread(sendWaitingThreads, id, addr, size, waitMode, transferredBytesAddr, usePrio); + } + + void AddReceiveWaitingThread(SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr) + { + bool usePrio = ((nmp.attr & SCE_KERNEL_MPA_THPRI_R) != 0); + AddWaitingThread(receiveWaitingThreads, id, addr, size, waitMode, transferredBytesAddr, usePrio); + } + + void CheckSendThreads() + { + if (sendWaitingThreads.empty()) + return; + MsgPipeWaitingThread *thread = &sendWaitingThreads.front(); + if (nmp.freeSize >= thread->bufSize) + { + // Put all the data to the buffer + memcpy(buffer + (nmp.bufSize - nmp.freeSize), Memory::GetPointer(thread->bufAddr), thread->bufSize); + Memory::Write_U32(thread->bufSize, thread->transferredBytesAddr); + nmp.freeSize -= thread->bufSize; + __KernelResumeThreadFromWait(thread->id); + sendWaitingThreads.erase(sendWaitingThreads.begin()); + CheckReceiveThreads(); + } + else if (thread->waitMode == SCE_KERNEL_MPW_ASAP && nmp.freeSize != 0) + { + // Put as much data as possible into the buffer + memcpy(buffer + (nmp.bufSize - nmp.freeSize), Memory::GetPointer(thread->bufAddr), nmp.freeSize); + Memory::Write_U32(nmp.freeSize, thread->transferredBytesAddr); + nmp.freeSize = 0; + __KernelResumeThreadFromWait(thread->id); + receiveWaitingThreads.erase(receiveWaitingThreads.begin()); + CheckReceiveThreads(); + } + } + + // This function should be only ran when the temporary buffer size is not 0 (otherwise, data is copied directly to the threads) + void CheckReceiveThreads() + { + if (receiveWaitingThreads.empty()) + return; + MsgPipeWaitingThread *thread = &receiveWaitingThreads.front(); + if (nmp.bufSize - nmp.freeSize >= thread->bufSize) + { + // Get the needed data from the buffer + Memory::Memcpy(thread->bufAddr, buffer, thread->bufSize); + // Put the unused data at the start of the buffer + memmove(buffer, buffer + thread->bufSize, nmp.bufSize - nmp.freeSize); + Memory::Write_U32(thread->bufSize, thread->transferredBytesAddr); + nmp.freeSize += thread->bufSize; + __KernelResumeThreadFromWait(thread->id); + receiveWaitingThreads.erase(receiveWaitingThreads.begin()); + CheckSendThreads(); + } + else if (thread->waitMode == SCE_KERNEL_MPW_ASAP && nmp.freeSize != nmp.bufSize) + { + // Get all the data from the buffer + Memory::Memcpy(thread->bufAddr, buffer, nmp.bufSize - nmp.freeSize); + Memory::Write_U32(nmp.bufSize - nmp.freeSize, thread->transferredBytesAddr); + nmp.freeSize = nmp.bufSize; + __KernelResumeThreadFromWait(thread->id); + receiveWaitingThreads.erase(receiveWaitingThreads.begin()); + CheckSendThreads(); + } + } - // Ring buffer u8 *buffer; - int writePos; - int readPos; }; void sceKernelCreateMsgPipe() @@ -68,28 +172,145 @@ void sceKernelCreateMsgPipe() m->nmp.numSendWaitThreads = 0; m->nmp.numReceiveWaitThreads = 0; - m->buffer = new u8[size]; - m->writePos = 0; - m->readPos = 0; + m->buffer = 0; + if (size != 0) + { + m->buffer = new u8[size]; + } + + DEBUG_LOG(HLE, "%d=sceKernelCreateMsgPipe(%s, part=%d, attr=%08x, size=%d, opt=%08x)", id, name, memoryPartition, attr, size, opt); RETURN(id); } void sceKernelDeleteMsgPipe() { - SceUInt uid = PARAM(0); + SceUID uid = PARAM(0); u32 error; - MsgPipe *p = kernelObjects.Get(uid, error); - if (!p) + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { ERROR_LOG(HLE, "sceKernelDeleteMsgPipe(%i) - ERROR %08x", uid, error); + RETURN(error); return; } - delete [] p->buffer; - ERROR_LOG(HLE, "sceKernelDeleteMsgPipe(%i)", uid); + if (m->buffer != 0) + { + delete [] m->buffer; + } + for (u32 i = 0; i < m->sendWaitingThreads.size(); i++) + { + __KernelResumeThreadFromWait(m->sendWaitingThreads[i].id); + } + for (u32 i = 0; i < m->receiveWaitingThreads.size(); i++) + { + __KernelResumeThreadFromWait(m->receiveWaitingThreads[i].id); + } + DEBUG_LOG(HLE, "sceKernelDeleteMsgPipe(%i)", uid); RETURN(kernelObjects.Destroy(uid)); } +void __KernelSendMsgPipe(MsgPipe *m, u32 sendBufAddr, u32 sendSize, int waitMode, u32 resultAddr, u32 timeoutPtr, bool cbEnabled, bool pool) +{ + u32 curSendAddr = sendBufAddr; + if (m->nmp.bufSize == 0) + { + while (!m->receiveWaitingThreads.empty()) + { + MsgPipeWaitingThread *thread = &m->receiveWaitingThreads.front(); + if (thread->freeSize > sendSize) + { + Memory::Memcpy(thread->bufAddr + (thread->bufSize - thread->freeSize), Memory::GetPointer(curSendAddr), sendSize); + thread->freeSize -= sendSize; + curSendAddr += sendSize; + sendSize = 0; + if (thread->waitMode == SCE_KERNEL_MPW_ASAP) + { + Memory::Write_U32(thread->bufSize - thread->freeSize, thread->transferredBytesAddr); + __KernelResumeThreadFromWait(thread->id); + m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin()); + } + break; + } + else if (thread->freeSize == sendSize) + { + Memory::Memcpy(thread->bufAddr + (thread->bufSize - thread->freeSize), Memory::GetPointer(curSendAddr), sendSize); + Memory::Write_U32(thread->bufSize, thread->transferredBytesAddr); + __KernelResumeThreadFromWait(thread->id); + m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin()); + curSendAddr += sendSize; + sendSize = 0; + break; + } + else + { + Memory::Memcpy(thread->bufAddr + (thread->bufSize - thread->freeSize), Memory::GetPointer(curSendAddr), thread->freeSize); + sendSize -= thread->freeSize; + curSendAddr += thread->freeSize; + Memory::Write_U32(thread->bufSize, thread->transferredBytesAddr); + __KernelResumeThreadFromWait(thread->id); + m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin()); + } + } + // If there is still data to send and (we want to send all of it or we didn't send anything) + if (sendSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curSendAddr == sendBufAddr)) + { + if (pool) + { + RETURN(SCE_KERNEL_ERROR_MPP_FULL); + return; + } + else + { + m->AddSendWaitingThread(__KernelGetCurThread(), curSendAddr, sendSize, waitMode, resultAddr); + RETURN(0); + __KernelWaitCurThread(WAITTYPE_MSGPIPE, 0, 0, 0, cbEnabled); + return; + } + } + } + else + { + if (sendSize <= m->nmp.freeSize) + { + memcpy(m->buffer + (m->nmp.bufSize - m->nmp.freeSize), Memory::GetPointer(sendBufAddr), sendSize); + m->nmp.freeSize -= sendSize; + curSendAddr = sendBufAddr + sendSize; + sendSize = 0; + } + else if (waitMode == SCE_KERNEL_MPW_ASAP && m->nmp.freeSize != 0) + { + memcpy(m->buffer + (m->nmp.bufSize - m->nmp.freeSize), Memory::GetPointer(sendBufAddr), m->nmp.freeSize); + curSendAddr = sendBufAddr + m->nmp.freeSize; + sendSize -= m->nmp.freeSize; + m->nmp.freeSize = 0; + } + else + { + if (pool) + { + RETURN(SCE_KERNEL_ERROR_MPP_FULL); + return; + } + else + { + m->AddSendWaitingThread(__KernelGetCurThread(), curSendAddr, sendSize, waitMode, resultAddr); + RETURN(0); + __KernelWaitCurThread(WAITTYPE_MSGPIPE, 0, 0, 0, cbEnabled); + return; + } + } + + if (curSendAddr != sendBufAddr) + { + m->CheckReceiveThreads(); + } + } + Memory::Write_U32(curSendAddr - sendBufAddr, resultAddr); + + RETURN(0); +} + void sceKernelSendMsgPipe() { SceUInt uid = PARAM(0); @@ -100,36 +321,15 @@ void sceKernelSendMsgPipe() u32 timeoutPtr = PARAM(5); u32 error; - MsgPipe *pipe = kernelObjects.Get(uid, error); - if (!pipe) { + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { ERROR_LOG(HLE, "sceKernelSendMsgPipe(%i) - ERROR %08x", uid, error); RETURN(error); return; } - if (sendSize > pipe->nmp.freeSize) { - // TODO: Block - ERROR_LOG(HLE, "sceKernelSendMsgPipe(%i) - Message won't fit", uid, error); - RETURN(SCE_KERNEL_ERROR_MPP_FULL); - return; - } - - const u8 *source = Memory::GetPointer(sendBufAddr); - - int destIndex = pipe->writePos; - if (pipe->writePos + sendSize > pipe->nmp.size) { - // Split in two - int firstCopySize = pipe->nmp.size - pipe->writePos; - memcpy(pipe->buffer + pipe->writePos, source, firstCopySize); - memcpy(pipe->buffer, source + firstCopySize, sendSize - firstCopySize); - } else { - memcpy(pipe->buffer + pipe->writePos, source, sendSize); - } - pipe->writePos += sendSize; - pipe->writePos %= pipe->nmp.size; - - ERROR_LOG(HLE, "UNIMPL sceKernelSendMsgPipe(%i, %08x, %i, %i, %08x, %08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr); - RETURN(0); + DEBUG_LOG(HLE, "sceKernelSendMsgPipe(id=%i, addr=%08x, size=%i, mode=%i, result=%08x, timeout=%08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr); + __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr, false, false); } void sceKernelSendMsgPipeCB() @@ -142,36 +342,15 @@ void sceKernelSendMsgPipeCB() u32 timeoutPtr = PARAM(5); u32 error; - MsgPipe *pipe = kernelObjects.Get(uid, error); - if (!pipe) { - ERROR_LOG(HLE, "sceKernelSendMsgPipe(%i) - ERROR %08x", uid, error); + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { + ERROR_LOG(HLE, "sceKernelSendMsgPipeCB(%i) - ERROR %08x", uid, error); RETURN(error); return; } - if (sendSize > pipe->nmp.freeSize) { - // TODO: Block - ERROR_LOG(HLE, "sceKernelSendMsgPipe(%i) - Message won't fit", uid, error); - RETURN(SCE_KERNEL_ERROR_MPP_FULL); - return; - } - - const u8 *source = Memory::GetPointer(sendBufAddr); - - int destIndex = pipe->writePos; - if (pipe->writePos + sendSize > pipe->nmp.size) { - // Split in two - int firstCopySize = pipe->nmp.size - pipe->writePos; - memcpy(pipe->buffer + pipe->writePos, source, firstCopySize); - memcpy(pipe->buffer, source + firstCopySize, sendSize - firstCopySize); - } else { - memcpy(pipe->buffer + pipe->writePos, source, sendSize); - } - pipe->writePos += sendSize; - pipe->writePos %= pipe->nmp.size; - - ERROR_LOG(HLE, "UNIMPL sceKernelSendMsgPipe(%i, %08x, %i, %i, %08x, %08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr); - RETURN(0); + DEBUG_LOG(HLE, "sceKernelSendMsgPipeCB(id=%i, addr=%08x, size=%i, mode=%i, result=%08x, timeout=%08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr); + __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr, true, false); __KernelCheckCallbacks(); } @@ -182,86 +361,224 @@ void sceKernelTrySendMsgPipe() u32 sendSize = PARAM(2); int waitMode = PARAM(3); u32 resultAddr = PARAM(4); - u32 timeoutPtr = PARAM(5); u32 error; - MsgPipe *pipe = kernelObjects.Get(uid, error); - if (!pipe) { - ERROR_LOG(HLE, "sceKernelSendMsgPipe(%i) - ERROR %08x", uid, error); + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { + ERROR_LOG(HLE, "sceKernelTrySendMsgPipe(%i) - ERROR %08x", uid, error); RETURN(error); return; } - if (sendSize > pipe->nmp.freeSize) { - // TODO: Block - ERROR_LOG(HLE, "sceKernelSendMsgPipe(%i) - Message won't fit", uid, error); - RETURN(SCE_KERNEL_ERROR_MPP_FULL); - return; - } + DEBUG_LOG(HLE, "sceKernelTrySendMsgPipeCB(id=%i, addr=%08x, size=%i, mode=%i, result=%08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr); + __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, 0, false, true); +} - const u8 *source = Memory::GetPointer(sendBufAddr); - - int destIndex = pipe->writePos; - if (pipe->writePos + sendSize > pipe->nmp.size) { - // Split in two - int firstCopySize = pipe->nmp.size - pipe->writePos; - memcpy(pipe->buffer + pipe->writePos, source, firstCopySize); - memcpy(pipe->buffer, source + firstCopySize, sendSize - firstCopySize); - } else { - memcpy(pipe->buffer + pipe->writePos, source, sendSize); +void __KernelReceiveMsgPipe(MsgPipe *m, u32 receiveBufAddr, u32 receiveSize, int waitMode, u32 resultAddr, u32 timeoutPtr, bool cbEnabled, bool pool) +{ + u32 curReceiveAddr = receiveBufAddr; + // MsgPipe buffer size is 0, receiving directly from waiting send threads + if (m->nmp.bufSize == 0) + { + // While they're still sending waiting threads (which can send data) + while (!m->sendWaitingThreads.empty()) + { + MsgPipeWaitingThread *thread = &m->sendWaitingThreads.front(); + // Sending thread has more data than we have to receive: retrieve just the amount of data we want + if (thread->bufSize - thread->freeSize > receiveSize) + { + Memory::Memcpy(curReceiveAddr, Memory::GetPointer(thread->bufAddr), receiveSize); + thread->freeSize += receiveSize; + // Move still available data at the beginning of the sending thread buffer + Memory::Memcpy(thread->bufAddr, Memory::GetPointer(thread->bufAddr + receiveSize), thread->bufSize - thread->freeSize); + curReceiveAddr += receiveSize; + receiveSize = 0; + // The sending thread mode is ASAP: we have sent some data so restart it even though its buffer isn't empty + if (thread->waitMode == SCE_KERNEL_MPW_ASAP) + { + Memory::Write_U32(thread->bufSize - thread->freeSize, thread->transferredBytesAddr); + __KernelResumeThreadFromWait(thread->id); + m->sendWaitingThreads.erase(m->sendWaitingThreads.begin()); + } + break; + } + // Sending thread wants to send the same amount of data as we want to retrieve: get the data and resume thread + else if (thread->bufSize - thread->freeSize == receiveSize) + { + Memory::Memcpy(curReceiveAddr, Memory::GetPointer(thread->bufAddr), receiveSize); + Memory::Write_U32(thread->bufSize, thread->transferredBytesAddr); + __KernelResumeThreadFromWait(thread->id); + m->sendWaitingThreads.erase(m->sendWaitingThreads.begin()); + curReceiveAddr += receiveSize; + receiveSize = 0; + break; + } + // Not enough data in the sending thread: get the data available and restart the sending thread, then loop + else + { + Memory::Memcpy(curReceiveAddr, Memory::GetPointer(thread->bufAddr), thread->bufSize - thread->freeSize); + receiveSize -= thread->bufSize - thread->freeSize; + curReceiveAddr += thread->bufSize - thread->freeSize; + Memory::Write_U32(thread->bufSize, thread->transferredBytesAddr); + __KernelResumeThreadFromWait(thread->id); + m->sendWaitingThreads.erase(m->sendWaitingThreads.begin()); + } + } + // All data hasn't been received and (mode isn't ASAP or nothing was received) + if (receiveSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curReceiveAddr == receiveBufAddr)) + { + if (pool) + { + RETURN(SCE_KERNEL_ERROR_MPP_EMPTY); + return; + } + else + { + m->AddReceiveWaitingThread(__KernelGetCurThread(), curReceiveAddr, receiveSize, waitMode, resultAddr); + RETURN(0); + __KernelWaitCurThread(WAITTYPE_MSGPIPE, 0, 0, 0, cbEnabled); + return; + } + } } - pipe->writePos += sendSize; - pipe->writePos %= pipe->nmp.size; - - ERROR_LOG(HLE, "UNIMPL sceKernelSendMsgPipe(%i, %08x, %i, %i, %08x, %08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr); + // Getting data from the MsgPipe buffer + else + { + // Enough data in the buffer: copy just the needed amount of data + if (receiveSize <= m->nmp.bufSize - m->nmp.freeSize) + { + Memory::Memcpy(receiveBufAddr, m->buffer, receiveSize); + m->nmp.freeSize += receiveSize; + memmove(m->buffer, m->buffer + receiveSize, m->nmp.bufSize - m->nmp.freeSize); + curReceiveAddr = receiveBufAddr + receiveSize; + receiveSize = 0; + } + // Else, if mode is ASAP and there's at list 1 available byte of data: copy all the available data + else if (waitMode == SCE_KERNEL_MPW_ASAP && m->nmp.freeSize != m->nmp.bufSize) + { + Memory::Memcpy(receiveBufAddr, m->buffer, m->nmp.bufSize - m->nmp.freeSize); + receiveSize -= m->nmp.bufSize - m->nmp.freeSize; + curReceiveAddr = receiveBufAddr + m->nmp.bufSize - m->nmp.freeSize; + m->nmp.freeSize = m->nmp.bufSize; + } + else + { + if (pool) + { + RETURN(SCE_KERNEL_ERROR_MPP_EMPTY); + return; + } + else + { + m->AddReceiveWaitingThread(__KernelGetCurThread(), curReceiveAddr, receiveSize, waitMode, resultAddr); + RETURN(0); + __KernelWaitCurThread(WAITTYPE_MSGPIPE, 0, 0, 0, cbEnabled); + return; + } + } + + if (curReceiveAddr != receiveBufAddr) + { + m->CheckSendThreads(); + } + } + Memory::Write_U32(curReceiveAddr - receiveBufAddr, resultAddr); + RETURN(0); } void sceKernelReceiveMsgPipe() { - SceUInt uid = PARAM(0); + SceUID uid = PARAM(0); u32 receiveBufAddr = PARAM(1); u32 receiveSize = PARAM(2); int waitMode = PARAM(3); u32 resultAddr = PARAM(4); u32 timeoutPtr = PARAM(5); - ERROR_LOG(HLE, "UNIMPL sceKernelReceiveMsgPipe(%i, %08x, %i, %i, %08x, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr); - RETURN(SCE_KERNEL_ERROR_MPP_EMPTY); + u32 error; + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { + ERROR_LOG(HLE, "sceKernelReceiveMsgPipe(%i) - ERROR %08x", uid, error); + RETURN(error); + return; + } + + DEBUG_LOG(HLE, "sceKernelReceiveMsgPipe(%i, %08x, %i, %i, %08x, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr); + __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr, false, false); } void sceKernelReceiveMsgPipeCB() { - SceUInt uid = PARAM(0); + SceUID uid = PARAM(0); u32 receiveBufAddr = PARAM(1); u32 receiveSize = PARAM(2); int waitMode = PARAM(3); u32 resultAddr = PARAM(4); u32 timeoutPtr = PARAM(5); - ERROR_LOG(HLE, "UNIMPL sceKernelReceiveMsgPipeCB(%i, %08x, %i, %i, %08x, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr); - RETURN(SCE_KERNEL_ERROR_MPP_EMPTY); + u32 error; + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { + ERROR_LOG(HLE, "sceKernelReceiveMsgPipeCB(%i) - ERROR %08x", uid, error); + RETURN(error); + return; + } + + DEBUG_LOG(HLE, "sceKernelReceiveMsgPipeCB(%i, %08x, %i, %i, %08x, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr); + __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr, true, false); } void sceKernelTryReceiveMsgPipe() { - SceUInt uid = PARAM(0); + SceUID uid = PARAM(0); u32 receiveBufAddr = PARAM(1); u32 receiveSize = PARAM(2); int waitMode = PARAM(3); u32 resultAddr = PARAM(4); - ERROR_LOG(HLE, "UNIMPL sceKernelTryReceiveMsgPipe(%i, %08x, %i, %i, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr); - RETURN(SCE_KERNEL_ERROR_MPP_EMPTY); + u32 error; + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) { + ERROR_LOG(HLE, "sceKernelTryReceiveMsgPipe(%i) - ERROR %08x", uid, error); + RETURN(error); + return; + } + + DEBUG_LOG(HLE, "sceKernelTryReceiveMsgPipe(%i, %08x, %i, %i, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr); + __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, 0, false, true); } void sceKernelCancelMsgPipe() { - SceUInt uid = PARAM(0); - u32 numSendThreads = PARAM(1); - u32 numReceiveThreads = PARAM(2); + SceUID uid = PARAM(0); + u32 numSendThreadsAddr = PARAM(1); + u32 numReceiveThreadsAddr = PARAM(2); - ERROR_LOG(HLE, "UNIMPL sceKernelCancelMsgPipe(%i, %i, %i)", uid, numSendThreads, numReceiveThreads); + u32 error; + MsgPipe *m = kernelObjects.Get(uid, error); + if (!m) + { + ERROR_LOG(HLE, "sceKernelCancelMsgPipe(%i) - ERROR %08x", uid, error); + RETURN(error); + return; + } + if (m->buffer != 0) + { + delete [] m->buffer; + } + u32 count; + for (count = 0; count < m->sendWaitingThreads.size(); count++) + { + __KernelResumeThreadFromWait(m->sendWaitingThreads[count].id); + } + Memory::Write_U32(count, numSendThreadsAddr); + for (count = 0; count < m->receiveWaitingThreads.size(); count++) + { + __KernelResumeThreadFromWait(m->receiveWaitingThreads[count].id); + } + Memory::Write_U32(count, numReceiveThreadsAddr); + DEBUG_LOG(HLE, "sceKernelCancelMsgPipe(%i, %i, %i)", uid, numSendThreadsAddr, numReceiveThreadsAddr); RETURN(0); } @@ -272,10 +589,12 @@ void sceKernelReferMsgPipeStatus() DEBUG_LOG(HLE,"sceKernelReferMsgPipeStatus(%i, %08x)", uid, msgPipeStatusAddr); u32 error; - MsgPipe *mp = kernelObjects.Get(uid, error); - if (mp) + MsgPipe *m = kernelObjects.Get(uid, error); + if (m) { - Memory::WriteStruct(msgPipeStatusAddr, &mp->nmp); + m->nmp.numSendWaitThreads = m->sendWaitingThreads.size(); + m->nmp.numReceiveWaitThreads = m->receiveWaitingThreads.size(); + Memory::WriteStruct(msgPipeStatusAddr, &m->nmp); RETURN(0); } else diff --git a/Core/HLE/sceKernelThread.h b/Core/HLE/sceKernelThread.h index 92f88c8dc7..e4e536dd2a 100644 --- a/Core/HLE/sceKernelThread.h +++ b/Core/HLE/sceKernelThread.h @@ -62,7 +62,7 @@ enum WaitType //probably not the real values WAITTYPE_MBX = 5, WAITTYPE_VPL = 6, WAITTYPE_FPL = 7, - // + WAITTYPE_MSGPIPE = 8, // fake WAITTYPE_THREADEND = 9, WAITTYPE_AUDIOCHANNEL = 10, // this is fake, should be replaced with 8 eventflags ( ?? ) WAITTYPE_UMD = 11, // this is fake, should be replaced with 1 eventflag ( ?? ) From eb491a6707403e9e8773b4b0f0ec8e1db962f637 Mon Sep 17 00:00:00 2001 From: Arthur Blot Date: Sat, 10 Nov 2012 23:17:37 +0100 Subject: [PATCH 2/2] Fixed test.py for Linux, mbx added to working list --- test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test.py b/test.py index 9db2cb12af..9b87789165 100644 --- a/test.py +++ b/test.py @@ -6,7 +6,7 @@ import os import subprocess -PPSSPP_EXECUTABLES = [ "Windows/Release/PPSSPPHeadless.exe", "SDL/build/ppsspp_headless" ] +PPSSPP_EXECUTABLES = [ "Windows/Release/PPSSPPHeadless.exe", "SDL/build/ppsspp-headless" ] PPSSPP_EXE = None TEST_ROOT = "pspautotests/tests/" @@ -25,6 +25,7 @@ tests_good = [ "misc/testgp", "string/string", "gpu/callbacks/ge_callbacks", + "threads/mbx/mbx", ] # These are the next tests up for fixing. @@ -47,7 +48,6 @@ tests_next = [ "sysmem/sysmem", "threads/events/events", "threads/fpl/fpl", - "threads/mbx/mbx", "threads/msgpipe/msgpipe", "threads/mutex/mutex", "threads/scheduling/scheduling", @@ -114,7 +114,7 @@ def run_tests(test_list, args): cmdline = PPSSPP_EXE + " " + elf_filename + " " + " ".join(args) #print "Cmdline: " + cmdline - proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) output = proc.stdout.read().strip() if output.startswith("TESTERROR"):