mirror of
https://github.com/kinnay/Wii-U-Firmware-Emulator.git
synced 2025-04-02 10:41:41 -04:00
505 lines
18 KiB
Python
505 lines
18 KiB
Python
|
|
import pyemu
|
|
import struct
|
|
import sys
|
|
|
|
import debug
|
|
import log
|
|
|
|
class CoprocHandler:
|
|
def __init__(self, interpreter, core, mmu):
|
|
self.interpreter = interpreter
|
|
self.core = core
|
|
self.mmu = mmu
|
|
|
|
self.control_reg = 0
|
|
self.domain_access_reg = 0
|
|
self.data_fault_status = 0
|
|
self.instr_fault_status = 0
|
|
self.fault_address = 0
|
|
|
|
def write(self, coproc, opc, value, rn, rm, type):
|
|
if coproc == 15:
|
|
if rn == 7:
|
|
if rm == 10 and type == 1: pass #Data synchronization barrier register
|
|
elif rm == 6 and type == 1: pass #Invalidate data cache line register
|
|
elif rm == 10 and type == 4: pass #Clean and invalidate entire data cache register
|
|
elif rm == 0 and type == 4: pass #Wait for interrupt
|
|
elif rm == 5 and type == 0: #Invalidate instruction cache line register
|
|
self.interpreter.invalidate_icache()
|
|
elif rm == 6 and type == 0: pass #Invalidate data cache line register
|
|
|
|
elif rn == 3 and rm == 0 and type == 0: #Write domain access control register
|
|
self.domain_access_reg = value
|
|
|
|
elif rn == 8 and rm == 7 and type == 0: pass #Invalidate unified TLB register
|
|
|
|
elif rn == 1 and rm == 0 and type == 0: #Write control register
|
|
self.control_reg = value
|
|
self.mmu.set_enabled(value & 1)
|
|
elif rn == 2 and rm == 0 and type == 0: #Write translation table base register 0
|
|
self.mmu.set_translation_table_base(value)
|
|
elif rn == 5 and rm == 0 and type == 0: #Write data fault status register
|
|
self.data_fault_status = value
|
|
elif rn == 5 and rm == 0 and type == 1: #Write instruction fault status register
|
|
self.instr_fault_status = value
|
|
elif rn == 6 and rm == 0 and type == 0: #Write fault address register
|
|
self.fault_address = value
|
|
else:
|
|
print("COPROC WRITE p%i %i %08X c%i c%i %i at %08X" %(coproc, opc, value, rn, rm, type, self.core.reg(15)))
|
|
else:
|
|
raise RuntimeError("Write to coprocessor %i" %coproc)
|
|
|
|
def read(self, coproc, opc, rn, rm, type):
|
|
if coproc == 15:
|
|
fields = [rn, rm, type]
|
|
if fields == [1, 0, 0]: #Read control register
|
|
return self.control_reg
|
|
elif fields == [3, 0, 0]: #Read domain access control register
|
|
return self.domain_access_reg
|
|
elif fields == [5, 0, 0]: #Read data fault status register
|
|
return self.data_fault_status
|
|
elif fields == [5, 0, 1]: #Read instruction fault status register
|
|
return self.instr_fault_status
|
|
elif fields == [6, 0, 0]: #Read fault address register
|
|
return self.fault_address
|
|
elif fields == [7, 10, 3]: #Test and clean dcache
|
|
return pyemu.ARMCore.Z
|
|
elif fields == [7, 14, 3]: #Test, clean and invalidate dcache
|
|
return pyemu.ARMCore.Z
|
|
else:
|
|
print("COPROC READ p%i %i c%i c%i %i at %08X" %(coproc, opc, rn, rm, type, self.core.reg(15)))
|
|
return 0
|
|
else:
|
|
raise RuntimeError("Read from coprocessor %i" %coproc)
|
|
|
|
|
|
class SVCHandler:
|
|
def __init__(self, core, reader, writer):
|
|
self.core = core
|
|
self.reader = reader
|
|
self.writer = writer
|
|
|
|
self.logger = log.ConsoleLogger("ARM")
|
|
self.next_handle = 1000
|
|
|
|
def handle(self, id):
|
|
if id != 0xAB:
|
|
raise RuntimeError("Invalid software interrupt: 0x%X" %id)
|
|
|
|
type = self.core.reg(0)
|
|
if type == 1: #Open(name, mode, namelen)
|
|
ptr = self.core.reg(1)
|
|
name = self.reader.string(self.u32(ptr))
|
|
mode = self.reader.u32(ptr + 4)
|
|
print("OPEN(%s, %i) -> %i" %(name, mode, self.next_handle))
|
|
self.core.setreg(0, self.next_handle)
|
|
self.next_handle += 1
|
|
|
|
elif type == 2: #Close
|
|
print("CLOSE(%i)" %self.reader.u32(self.core.reg(1)))
|
|
|
|
elif type == 4: #Write
|
|
self.logger.write(self.reader.string(self.core.reg(1)))
|
|
|
|
elif type == 6: #Read
|
|
ptr = self.core.reg(1)
|
|
handle = self.reader.u32(ptr)
|
|
buffer = self.reader.u32(ptr + 4)
|
|
length = self.reader.u32(ptr + 8)
|
|
result = input("READ(%i, %i): " %(handle, length))
|
|
result = result[:length - 1] + "\0"
|
|
self.writer.write(buffer, result.encode("ascii"))
|
|
self.core.setreg(0, len(result))
|
|
|
|
else:
|
|
raise RuntimeError("Invalid svc type: %i" %type)
|
|
|
|
|
|
class UNDHandler:
|
|
|
|
IOS_CREATE_MESSAGE_QUEUE = 0xC
|
|
IOS_DESTROY_MESSAGE_QUEUE = 0xD
|
|
IOS_SEND_MESSAGE = 0xE
|
|
IOS_JAM_MESSAGE = 0xF
|
|
IOS_RECEIVE_MESSAGE = 0x10
|
|
IOS_OPEN = 0x33
|
|
IOS_CLOSE = 0x34
|
|
IOS_READ = 0x35
|
|
IOS_WRITE = 0x36
|
|
IOS_SEEK = 0x37
|
|
IOS_IOCTL = 0x38
|
|
IOS_IOCTLV = 0x39
|
|
IOS_OPEN_ASYNC = 0x3A
|
|
IOS_CLOSE_ASYNC = 0x3B
|
|
IOS_READ_ASYNC = 0x3C
|
|
IOS_WRITE_ASYNC = 0x3D
|
|
IOS_SEEK_ASYNC = 0x3E
|
|
IOS_IOCTL_ASYNC = 0x3F
|
|
IOS_IOCTLV_ASYNC = 0x40
|
|
IOS_RESUME = 0x43
|
|
IOS_RESUME_ASYNC = 0x46
|
|
IOS_RESOURCE_REPLY = 0x49
|
|
|
|
def __init__(self, breakpoints, core, reader, writer):
|
|
self.breakpoints = breakpoints
|
|
self.core = core
|
|
self.reader = reader
|
|
self.writer = writer
|
|
|
|
self.requests = []
|
|
self.async_reqs = []
|
|
self.added_pcs = []
|
|
self.ipc_names = {}
|
|
|
|
if "logsys" in sys.argv:
|
|
self.ipc_logger = log.FileLogger("ipc.txt")
|
|
self.message_logger = log.FileLogger("messages.txt")
|
|
self.file_logger = log.FileLogger("files.txt")
|
|
|
|
def handle(self):
|
|
if "logsys" in sys.argv:
|
|
self.log_syscall() #Slows down code a lot
|
|
self.core.trigger_exception(self.core.UNDEFINED_INSTRUCTION)
|
|
|
|
def handle_breakpoint(self, pc):
|
|
for req in self.requests:
|
|
if pc == req[0] and self.get_thread() == req[1]:
|
|
self.handle_result(pc, *req[2:])
|
|
self.requests.remove(req)
|
|
return
|
|
|
|
def handle_result(self, pc, syscall, *args):
|
|
module = self.get_module_name(pc)
|
|
result = self.core.reg(0)
|
|
lr = self.core.reg(14)
|
|
pc = self.core.reg(15)
|
|
|
|
if syscall == self.IOS_CREATE_MESSAGE_QUEUE:
|
|
self.message_logger.print("[%s:%08X] CREATE(%i) -> %08X" %(module, lr, args[0], result))
|
|
|
|
elif syscall == self.IOS_RECEIVE_MESSAGE:
|
|
queue = args[0]
|
|
message = self.reader.u32(args[1])
|
|
flags = args[2]
|
|
self.message_logger.print("[%s:%08X] RECEIVE(%08X, %i) -> (%08X, %08X)" %(module, lr, queue, flags, result, message))
|
|
|
|
for req in self.async_reqs:
|
|
if queue == req[2] and message == req[3]:
|
|
result = self.reader.u32(message + 4)
|
|
self.handle_async_result(pc, req[1], req[4], result, *req[5:])
|
|
self.async_reqs.remove(req)
|
|
break
|
|
|
|
elif syscall == self.IOS_OPEN:
|
|
self.ipc_logger.print("[%s:%08X] OPEN(%s, %i) -> %08X" %(module, lr, *args, result))
|
|
self.ipc_names[result] = args[0]
|
|
|
|
elif syscall == self.IOS_IOCTL:
|
|
self.ipc_logger.print("[%s:%08X] IOCTL[%s](%08X, %i) -> %08X" %(module, lr, *args[:3], result))
|
|
|
|
dev, fd, ioctl, indata, outdata = args
|
|
self.handle_ioctl(dev, ioctl, indata, outdata, result)
|
|
|
|
elif syscall == self.IOS_IOCTLV:
|
|
self.ipc_logger.print("[%s:%08X] IOCTLV[%s](%08X, %i) -> %08X" %(module, lr, *args[:3], result))
|
|
|
|
dev, fd, ioctlv, invecs, outinfo = args
|
|
outvecs = []
|
|
for ptr, size in outinfo:
|
|
outvecs.append(self.reader.read(ptr, size))
|
|
self.handle_ioctlv(dev, ioctlv, invecs, outvecs, result)
|
|
|
|
elif syscall == self.IOS_RESUME:
|
|
self.ipc_logger.print("[%s:%08X] RESUME[%s](%08X) -> %08X" %(module, lr, *args, result))
|
|
|
|
def handle_async_result(self, pc, lr, syscall, result, *args):
|
|
module = self.get_module_name(pc)
|
|
if syscall == self.IOS_IOCTL_ASYNC:
|
|
fd, ioctl, indata, outdata = args
|
|
name = self.ipc_names[fd]
|
|
|
|
self.ipc_logger.print("[%s:%08X] IOCTL_ASYNC[%s](%08X, %i) -> %08X" %(module, lr, name, fd, ioctl, result))
|
|
self.handle_ioctl(name, ioctl, indata, outdata, result)
|
|
|
|
elif syscall == self.IOS_IOCTLV_ASYNC:
|
|
fd, ioctlv, invecs, outinfo = args
|
|
name = self.ipc_names[fd]
|
|
|
|
outvecs = []
|
|
for ptr, size in outinfo:
|
|
outvecs.append(self.reader.read(ptr, size))
|
|
|
|
self.ipc_logger.print("[%s:%08X] IOCTLV_ASYNC[%s](%08X, %i) -> %08X" %(module, lr, name, fd, ioctlv, result))
|
|
self.handle_ioctlv(name, ioctlv, invecs, outvecs, result)
|
|
|
|
elif syscall == self.IOS_RESUME_ASYNC:
|
|
fd = args[0]
|
|
self.ipc_logger.print("[%s:%08X] RESUME_ASYNC[%s](%08X) -> %08X" %(module, lr, self.ipc_names[fd], fd, result))
|
|
|
|
def handle_ioctl(self, name, ioctl, indata, outdata, result):
|
|
if name == "/dev/fsa":
|
|
if ioctl == 3: #FSAGetVolumeInfo
|
|
path = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSAGetVolumeInfo(%s)" %path)
|
|
elif ioctl == 4: #FSAInit
|
|
self.ipc_logger.print("\tFSAInit()")
|
|
elif ioctl == 5: #FSAChangeDir
|
|
path = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSAChangeDir(%s)" %path)
|
|
elif ioctl == 7: #FSAMakeDir
|
|
path = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
arg = struct.unpack_from(">I", indata, 0x284)[0]
|
|
self.ipc_logger.print("\tFSAMakeDir(%s, %i)" %(path, arg))
|
|
elif ioctl == 8: #FSARemove
|
|
path = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSARemove(%s)" %path)
|
|
elif ioctl == 10: #FSAOpenDir
|
|
path = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSAOpenDir(%s)" %path)
|
|
elif ioctl == 14: #FSAOpenFile
|
|
fn = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
mode = indata[0x284 : 0x284 + 0x10].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSAOpenFile(%s, %s)" %(fn, mode))
|
|
self.file_logger.print("FSAOpenFile(%s, %s)" %(fn, mode))
|
|
elif ioctl == 20: #FSAGetStatFile
|
|
handle = indata[4 : 8].hex().upper()
|
|
self.ipc_logger.print("\tFSAGetStatFile(%s)" %handle)
|
|
elif ioctl == 24: #FSAGetInfoByQuery
|
|
fn = indata[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
type = {
|
|
0: "FSAGetFreeSpaceSize",
|
|
1: "FSAGetDirSize",
|
|
2: "FSAGetEntryNum",
|
|
4: "FSAGetDeviceInfo",
|
|
5: "FSAGetStat",
|
|
7: "FSAGetJournalFreeSpaceSize"
|
|
}[struct.unpack_from(">I", indata, 0x284)[0]]
|
|
self.ipc_logger.print("\tFSAGetInfoByQuery(%s, %s)" %(fn, type))
|
|
|
|
def handle_ioctlv(self, name, ioctlv, invecs, outvecs, result):
|
|
if name == "/dev/crypto":
|
|
if ioctlv == 12: #IOSC_GenerateHash
|
|
type = struct.unpack_from(">I", invecs[0], 12)[0]
|
|
datalen = len(invecs[2])
|
|
hash = outvecs[0]
|
|
self.ipc_logger.print("\tIOSC_GenerateHash(0x%X, %i) -> %s" %(datalen, type, hash.hex()))
|
|
elif ioctlv == 14: #IOSC_Decrypt
|
|
key = struct.unpack_from(">I", invecs[0], 8)[0]
|
|
iv = invecs[1]
|
|
datalen = len(invecs[2])
|
|
self.ipc_logger.print("\tIOSC_Decrypt(%i, 0x%X, %s)" %(key, datalen, iv.hex()))
|
|
if ioctlv == 16: #IOSC_GenerateBlockMAC
|
|
key, type = struct.unpack_from(">II", invecs[0], 8)
|
|
datalen = len(invecs[3])
|
|
customlen = len(invecs[2])
|
|
hash = outvecs[0]
|
|
self.ipc_logger.print("\tIOSC_GenerateBlockMAC(%i, 0x%X, 0x%X, %i) -> %s" %(key, datalen, customlen, type, hash.hex()))
|
|
|
|
if name == "/dev/fsa":
|
|
if ioctlv == 1: #FSAMount
|
|
data = invecs[0]
|
|
path1 = data[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
path2 = data[0x284 : 0x284 + 0x280].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSAMount(%s, %s)" %(path1, path2))
|
|
elif ioctlv == 15: #FSAReadFile
|
|
data = invecs[0]
|
|
length, count = struct.unpack_from(">II", data, 8)
|
|
self.ipc_logger.print("\tFSAReadFile(0x%X * %i) -> 0x%X" %(count, length, result))
|
|
elif ioctlv == 103:
|
|
data = invecs[0]
|
|
string1 = data[4 : 4 + 0x280].decode("ascii").strip("\0")
|
|
string2 = data[0x284 : 0x284 + 0x280].decode("ascii").strip("\0")
|
|
self.ipc_logger.print("\tFSA_0x67(%s, %s, ...)" %(string1, string2))
|
|
|
|
def add_request(self, pc, *args):
|
|
if pc not in self.added_pcs:
|
|
self.breakpoints.add(pc, self.handle_breakpoint)
|
|
self.added_pcs.append(pc)
|
|
self.requests.append((pc, self.get_thread()) + args)
|
|
|
|
def add_async(self, *args):
|
|
self.async_reqs.append(args)
|
|
|
|
def log_syscall(self):
|
|
core = self.core
|
|
pc = core.reg(15)
|
|
lr = core.reg(14)
|
|
|
|
instr = self.reader.u32(pc - 4)
|
|
if instr & ~0xFF00 == 0xE7F000F0:
|
|
syscall = (instr >> 8) & 0xFF
|
|
module = self.get_module_name(pc)
|
|
|
|
if syscall == self.IOS_CREATE_MESSAGE_QUEUE:
|
|
self.add_request(pc, syscall, core.reg(1))
|
|
elif syscall == self.IOS_DESTROY_MESSAGE_QUEUE:
|
|
self.message_logger.print("[%s:%08X] DESTROY(%08X)" %(module, lr, core.reg(0)))
|
|
elif syscall == self.IOS_SEND_MESSAGE:
|
|
args = self.get_args(3)
|
|
self.message_logger.print("[%s:%08X] SEND(%08X, %08X, %i)" %(module, lr, *args))
|
|
elif syscall == self.IOS_JAM_MESSAGE:
|
|
args = self.get_args(3)
|
|
self.message_logger.print("[%s:%08X] JAM(%08X, %08X, %i)" %(module, lr, *args))
|
|
elif syscall == self.IOS_RECEIVE_MESSAGE:
|
|
self.add_request(pc, syscall, *self.get_args(3))
|
|
|
|
elif syscall == self.IOS_OPEN:
|
|
name = self.reader.string(core.reg(0))
|
|
mode = core.reg(1)
|
|
self.add_request(pc, syscall, name, mode)
|
|
elif syscall == self.IOS_CLOSE:
|
|
fd = core.reg(0)
|
|
self.ipc_logger.print("[%s:%08X] CLOSE[%s](%08X)" %(module, lr, self.ipc_names[fd], fd))
|
|
self.ipc_names.pop(fd)
|
|
elif syscall == self.IOS_IOCTL:
|
|
args = self.get_args(6)
|
|
fd, ioctl = args[0], args[1]
|
|
indata = self.reader.read(args[2], args[3])
|
|
outdata = self.reader.read(args[4], args[5])
|
|
self.add_request(pc, syscall, self.ipc_names[fd], fd, ioctl, indata, outdata)
|
|
elif syscall == self.IOS_IOCTLV:
|
|
args = self.get_args(5)
|
|
|
|
fd, ioctlv = args[0], args[1]
|
|
|
|
invecs = []
|
|
outvecs = []
|
|
addr = args[4]
|
|
for i in range(args[2]):
|
|
ptr, size = self.reader.u32(addr), self.reader.u32(addr + 4)
|
|
invecs.append(self.reader.read(ptr, size))
|
|
addr += 12
|
|
for i in range(args[3]):
|
|
ptr, size = self.reader.u32(addr), self.reader.u32(addr + 4)
|
|
outvecs.append((ptr, size))
|
|
addr += 12
|
|
|
|
self.add_request(pc, syscall, self.ipc_names[fd], fd, ioctlv, invecs, outvecs)
|
|
|
|
elif syscall == self.IOS_IOCTL_ASYNC:
|
|
args = self.get_args(8)
|
|
fd, ioctl = args[0], args[1]
|
|
indata = self.reader.read(args[2], args[3])
|
|
outdata = self.reader.read(args[4], args[5])
|
|
self.add_async(pc, lr, *args[6:], syscall, fd, ioctl, indata, outdata)
|
|
elif syscall == self.IOS_IOCTLV_ASYNC:
|
|
args = self.get_args(7)
|
|
|
|
fd, ioctlv = args[0], args[1]
|
|
|
|
invecs = []
|
|
outvecs = []
|
|
addr = args[4]
|
|
for i in range(args[2]):
|
|
ptr, size = self.reader.u32(addr), self.reader.u32(addr + 4)
|
|
invecs.append(self.reader.read(ptr, size))
|
|
addr += 12
|
|
for i in range(args[3]):
|
|
ptr, size = self.reader.u32(addr), self.reader.u32(addr + 4)
|
|
outvecs.append((ptr, size))
|
|
addr += 12
|
|
|
|
self.add_async(pc, lr, *args[5:], syscall, fd, ioctlv, invecs, outvecs)
|
|
|
|
elif syscall == self.IOS_RESUME:
|
|
fd = core.reg(0)
|
|
self.add_request(pc, syscall, self.ipc_names[fd], fd)
|
|
elif syscall == self.IOS_RESUME_ASYNC:
|
|
args = self.get_args(5)
|
|
self.add_async(pc, lr, *args[3:], syscall, args[0])
|
|
|
|
def get_args(self, num):
|
|
args = []
|
|
for i in range(num):
|
|
if i <= 3:
|
|
args.append(self.core.reg(i))
|
|
else:
|
|
args.append(self.reader.u32(self.core.reg(13) + (i - 4) * 4))
|
|
return args
|
|
|
|
def get_thread(self):
|
|
return self.reader.u32(0x8173BA0)
|
|
|
|
def get_module_name(self, addr):
|
|
if 0x04000000 <= addr < 0x04020000: return "CRYPTO"
|
|
if 0x05000000 <= addr < 0x05060000: return "MCP"
|
|
if 0x08120000 <= addr < 0x08140000: return "KERNEL"
|
|
if 0x10100000 <= addr < 0x10140000: return "USB"
|
|
if 0x10700000 <= addr < 0x10800000: return "FS"
|
|
if 0x11F00000 <= addr < 0x11FC0000: return "PAD"
|
|
if 0x12300000 <= addr < 0x12440000: return "NET"
|
|
if 0xE0000000 <= addr < 0xE0100000: return "ACP"
|
|
if 0xE1000000 <= addr < 0xE10C0000: return "NSEC"
|
|
if 0xE2000000 <= addr < 0xE2280000: return "NIM_BOSS"
|
|
if 0xE3000000 <= addr < 0xE3180000: return "FPD"
|
|
if 0xE4000000 <= addr < 0xE4040000: return "TEST"
|
|
if 0xE5000000 <= addr < 0xE5040000: return "AUXIL"
|
|
if 0xE6000000 <= addr < 0xE6040000: return "BSP"
|
|
raise ValueError("get_module_name(0x%08X)" %addr)
|
|
|
|
|
|
class ExceptionHandler:
|
|
def __init__(self, core):
|
|
self.core = core
|
|
|
|
def data_abort(self, addr, write):
|
|
type = ["read from", "write to"][write]
|
|
raise RuntimeError("Data abort: %s %08X at %08X" %(type, addr, self.core.reg(15)))
|
|
|
|
|
|
class ARMEmulator:
|
|
def __init__(self, emulator, physmem, hw):
|
|
self.emulator = emulator
|
|
self.core = pyemu.ARMCore()
|
|
self.physmem = physmem
|
|
self.virtmem = pyemu.ARMMMU(physmem)
|
|
self.virtmem.set_cache_enabled(True)
|
|
self.interpreter = pyemu.ARMInterpreter(self.core, physmem, self.virtmem)
|
|
self.interpreter.set_icache_enabled(True)
|
|
self.interpreter.set_alarm(5000, hw.latte.update_timer)
|
|
self.interrupts = hw.latte.irq_arm
|
|
|
|
self.mem_reader = debug.MemoryReader(physmem, self.virtmem, False)
|
|
self.mem_writer = debug.MemoryWriter(physmem, self.virtmem, False)
|
|
self.breakpoints = debug.BreakpointHandler(self.interpreter)
|
|
self.coproc_handler = CoprocHandler(self.interpreter, self.core, self.virtmem)
|
|
self.svc_handler = SVCHandler(self.core, self.mem_reader, self.mem_writer)
|
|
self.und_handler = UNDHandler(self.breakpoints, self.core, self.mem_reader, self.mem_writer)
|
|
self.exc_handler = ExceptionHandler(self.core)
|
|
|
|
self.interpreter.on_software_interrupt(self.svc_handler.handle)
|
|
self.interpreter.on_undefined_instruction(self.und_handler.handle)
|
|
self.interpreter.on_data_error(self.exc_handler.data_abort)
|
|
self.interpreter.on_coproc_read(self.coproc_handler.read)
|
|
self.interpreter.on_coproc_write(self.coproc_handler.write)
|
|
self.interpreter.on_breakpoint(self.breakpoints.handle)
|
|
self.interpreter.on_watchpoint(False, self.breakpoints.handle_watch)
|
|
self.interpreter.on_watchpoint(True, self.breakpoints.handle_watch)
|
|
|
|
self.debugger = debug.ARMDebugger(self)
|
|
|
|
self.breakpoints.add(0x5033E02, self.hack_log_level)
|
|
self.breakpoints.add(0x5055324, self.handle_syslog)
|
|
self.logger = log.FileLogger("log.txt")
|
|
|
|
def check_interrupts(self):
|
|
if self.interrupts.check_interrupts():
|
|
self.core.trigger_exception(self.core.IRQ)
|
|
|
|
def hack_log_level(self, addr): #For COS log
|
|
if "logall" in sys.argv:
|
|
self.core.setreg(3, self.core.reg(3) | 0xE00)
|
|
|
|
def handle_syslog(self, addr):
|
|
addr, length = self.core.reg(1), self.core.reg(2)
|
|
data = self.mem_reader.read(addr, length).decode("ascii")
|
|
self.logger.write(data)
|
|
|
|
def cleanup(self):
|
|
self.logger.close()
|
|
if "logsys" in sys.argv:
|
|
self.und_handler.ipc_logger.close()
|
|
self.und_handler.message_logger.close()
|
|
self.und_handler.file_logger.close()
|
|
self.svc_handler.logger.close()
|