#!/usr/bin/env python3 # MTK Stage2 Client (c) B.Kerler 2018-2021. # Licensed under MIT License import os import logging import time import argparse import hashlib from binascii import hexlify from struct import pack, unpack from mtkclient.Library.usblib import usb_class from mtkclient.Library.utils import LogBase from mtkclient.Library.utils import print_progress from mtkclient.Library.hwcrypto import crypto_setup, hwcrypto from mtkclient.config.brom_config import Mtk_Config class Stage2(metaclass=LogBase): def __init__(self, args, loglevel=logging.INFO): self.__logger = self.__logger self.args = args self.loglevel = loglevel self.info = self.__logger.info self.error = self.__logger.error self.warning = self.__logger.warning self.emmc_inited = False # Setup HW Crypto chip variables self.setup = crypto_setup() if loglevel == logging.DEBUG: logfilename = os.path.join("logs", "log.txt") if os.path.exists(logfilename): os.remove(logfilename) fh = logging.FileHandler(logfilename) self.__logger.addHandler(fh) self.__logger.setLevel(logging.DEBUG) else: self.__logger.setLevel(logging.INFO) portconfig = [[0x0E8D, 0x0003, -1], [0x0E8D, 0x2000, -1]] self.cdc = usb_class(portconfig=portconfig, loglevel=loglevel, devclass=10) def preinit(self): try: hwcode = self.read32(0x8000000) except: print("Error reading hwcode...aborting.") return False self.config = Mtk_Config(self.loglevel) self.config.init_hwcode(hwcode) self.setup.blacklist = self.config.chipconfig.blacklist self.setup.gcpu_base = self.config.chipconfig.gcpu_base self.setup.dxcc_base = self.config.chipconfig.dxcc_base self.setup.da_payload_addr = self.config.chipconfig.da_payload_addr self.setup.sej_base = self.config.chipconfig.sej_base self.setup.read32 = self.read32 self.setup.write32 = self.write32 self.setup.writemem = self.memwrite self.setup.meid_addr = self.config.chipconfig.meid_addr self.hwcrypto = hwcrypto(self.setup, self.loglevel) return True def init_emmc(self): self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x6001)) if unpack("I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x6000)) time.sleep(2) if unpack("I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x4001)) self.cdc.usbwrite(pack(">I", addr)) time.sleep(5) if unpack("I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x4002)) self.cdc.usbwrite(pack(">I", addr + (pos * 4))) self.cdc.usbwrite(pack(">I", 4)) result.append(unpack(" bool: 'Clear cache func' self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x5000)) ack = self.cdc.usbread(4) if ack == b"\xD0\xD0\xD0\xD0": return True return False def write32(self, addr, dwords) -> bool: if isinstance(dwords, int): dwords = [dwords] for pos in range(0, len(dwords)): self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x4000)) self.cdc.usbwrite(pack(">I", addr + (pos * 4))) self.cdc.usbwrite(pack(">I", 4)) self.cdc.usbwrite(pack("I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x1002)) self.cdc.usbwrite(pack(">I", type)) if display: print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50) # kick-wdt # self.cdc.usbwrite(pack(">I", 0xf00dd00d)) # self.cdc.usbwrite(pack(">I", 0x3001)) bytestoread = length bytesread = 0 old = 0 # emmc_read(0) for sector in range(startsector, sectors): self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x1000)) self.cdc.usbwrite(pack(">I", sector)) tmp = self.cdc.usbread(0x200) if len(tmp) != 0x200: self.error("Error on getting data") return if display: prog = sector / sectors * 100 if round(prog, 1) > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete, Sector:' + hex(sector), bar_length=50) old = round(prog, 1) bytesread += len(tmp) size = min(bytestoread, len(tmp)) if wf is not None: wf.write(tmp[:size]) else: buffer.extend(tmp) bytestoread -= size if display: print_progress(100, 100, prefix='Complete: ', suffix=filename, bar_length=50) if wf is not None: wf.close() else: return buffer[start % 0x200:(start % 0x200) + length] def preloader(self, start, length, filename): sectors = 0 if start != 0: start = (start // 0x200) if length != 0: sectors = (length // 0x200) + (1 if length % 0x200 else 0) self.info("Reading preloader...") if self.cdc.connected: if sectors == 0: buffer = self.readflash(type=1, start=0, length=0x4000, display=False) if len(buffer) != 0x4000: print("Error on reading boot1 area.") return if buffer[:9] == b'EMMC_BOOT': startbrlyt = unpack(" 0: size = min(bytestoread, 0x100) self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x4002)) self.cdc.usbwrite(pack(">I", addr + pos)) self.cdc.usbwrite(pack(">I", size)) if filename is None: data += self.cdc.usbread(size) else: wf.write(self.cdc.usbread(size)) bytestoread -= size pos += size self.info(f"{hex(start)}: " + hexlify(data).decode('utf-8')) if filename is not None: wf.close() return data def memwrite(self, start, data, filename=None): if filename is not None: rf = open(filename, "rb") bytestowrite = os.stat(filename).st_size else: if isinstance(data, str): data = bytes.fromhex(data) elif isinstance(data, int): data = pack(" 0: size = min(bytestowrite, 0x100) self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x4000)) self.cdc.usbwrite(pack(">I", addr + pos)) self.cdc.usbwrite(pack(">I", size)) if filename is None: wdata = data[pos:pos + size] else: wdata = rf.read(size) bytestowrite -= size pos += size while len(wdata) % 4 != 0: wdata += b"\x00" self.cdc.usbwrite(wdata) if filename is not None: rf.close() ack = self.cdc.usbread(4) if ack == b"\xD0\xD0\xD0\xD0": return True else: return False def rpmb(self, start, length, filename, reverse=False): if not self.emmc_inited: self.init_emmc() if start == 0: start = 0 else: start = (start // 0x100) if length == 0: sectors = 4 * 1024 * 1024 // 0x100 else: sectors = (length // 0x100) + (1 if length % 0x100 else 0) self.info("Reading rpmb...") self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x1002)) self.cdc.usbwrite(pack(">I", 0x1)) # kick-wdt # self.cdc.usbwrite(pack(">I", 0xf00dd00d)) # self.cdc.usbwrite(pack(">I", 0x3001)) print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50) bytesread = 0 old = 0 bytestoread = sectors * 0x100 with open(filename, "wb") as wf: for sector in range(start, sectors): self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x2000)) self.cdc.usbwrite(pack(">H", sector)) tmp = self.cdc.usbread(0x100) if reverse: tmp = tmp[::-1] if len(tmp) != 0x100: self.error("Error on getting data") return prog = sector / sectors * 100 if round(prog, 1) > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete, Sector:' + hex((sectors * 0x200) - bytestoread), bar_length=50) old = round(prog, 1) bytesread += 0x100 size = min(bytestoread, len(tmp)) wf.write(tmp[:size]) bytestoread -= size print_progress(100, 100, prefix='Complete: ', suffix=filename, bar_length=50) print("Done") def keys(self, data=b"", otp=None, mode="dxcc"): # self.hwcrypto.disable_range_blacklist("cqdma",self.cmd_C8) if mode == "dxcc": rpmbkey = self.hwcrypto.aes_hwcrypt(btype="dxcc",mode="rpmb") fdekey = self.hwcrypto.aes_hwcrypt(btype="dxcc",mode="fde") tfdekey = self.hwcrypto.aes_hwcrypt(btype="dxcc",mode="itrustee-fde") platkey, provkey = self.hwcrypto.aes_hwcrypt(btype="dxcc",mode="prov") print("\nKeys :\n-----------------------------------------") print("RPMB: " + hexlify(rpmbkey).decode('utf-8')) print("FDE : " + hexlify(fdekey).decode('utf-8')) print("iTrustee-FDE: " + hexlify(tfdekey).decode('utf-8')) print("Platform: " + hexlify(platkey).decode('utf-8')) print("Provisioning: " + hexlify(provkey).decode('utf-8')) print() elif mode == "sej": rpmbkey = self.hwcrypto.aes_hwcrypt(mode="rpmb", data=data, otp=otp, btype="sej") print("\nKeys :\n-----------------------------------------") print("RPMB: " + hexlify(rpmbkey).decode('utf-8')) print() elif mode == "sej_aes_decrypt": dec_data = self.hwcrypto.aes_hwcrypt(mode="cbc", data=data, btype="sej", encrypt=False) print() print("Data: " + hexlify(dec_data).decode('utf-8')) print() elif mode == "sej_aes_encrypt": enc_data = self.hwcrypto.aes_hwcrypt(mode="cbc", data=data, btype="sej", encrypt=False) print() print("Data: " + hexlify(enc_data).decode('utf-8')) print() elif mode == "dxcc_sha256": sha256val = self.hwcrypto.aes_hwcrypt(mode="sha256", data=data, btype="dxcc") print() print("SHA256: " + hexlify(sha256val).decode('utf-8')) print() def reboot(self): print("Rebooting..") self.cdc.usbwrite(pack(">I", 0xf00dd00d)) self.cdc.usbwrite(pack(">I", 0x3000)) def getint(valuestr): if valuestr == '': return None try: return int(valuestr) except Exception as err: err = err try: return int(valuestr, 16) except Exception as err: err = err pass return 0 cmds = { "rpmb": 'Dump rpmb', "preloader": 'Dump preloader', "boot2": 'Dump boot2', "reboot": 'Reboot phone', "memread": "Read memory [Example: memread --start 0 --length 0x10]", "memwrite": "Write memory [Example: memwrite --start 0x200000 --data 11223344", "keys": "Extract rpmb and fde key" } info = "MTK Stage2 client (c) B.Kerler 2021" def showcommands(): print(info) print("-----------------------------------\n") print("Available commands are:\n") for cmd in cmds: print("%20s" % (cmd) + ":\t" + cmds[cmd]) print() def main(): parser = argparse.ArgumentParser(description=info) subparsers = parser.add_subparsers(dest="cmd", help='Valid commands are: rpmb, preloader, boot2, memread, memwrite, keys') parser_rpmb = subparsers.add_parser("rpmb", help="Dump the rpmb") parser_rpmb.add_argument('--start', dest='start', type=str, help='Start offset to dump') parser_rpmb.add_argument('--length', dest='length', type=str, help='Max length to dump') parser_rpmb.add_argument('--reverse', dest='reverse', action="store_true", help='Reverse byte order (example: rpmb command)') parser_rpmb.add_argument('--filename', dest='filename', type=str, help='Read from / save to filename') parser_preloader = subparsers.add_parser("preloader", help="Dump the preloader") parser_preloader.add_argument('--start', dest='start', type=str, help='Start offset to dump') parser_preloader.add_argument('--length', dest='length', type=str, help='Max length to dump') parser_preloader.add_argument('--filename', dest='filename', type=str, help='Read from / save to filename') parser_boot2 = subparsers.add_parser("boot2", help="Dump boot2") parser_boot2.add_argument('--start', dest='start', type=str, help='Start offset to dump') parser_boot2.add_argument('--length', dest='length', type=str, help='Max length to dump') parser_boot2.add_argument('--filename', dest='filename', type=str, help='Read from / save to filename') parser_memread = subparsers.add_parser("memread", help="Read memory") parser_memread.add_argument(dest='start', type=str, help='Start offset to dump') parser_memread.add_argument(dest='length', type=str, help='Max length to dump') parser_memread.add_argument('--filename', dest='filename', type=str, help='Save to filename') parser_memwrite = subparsers.add_parser("memwrite", help="Write memory") parser_memwrite.add_argument(dest='start', type=str, help='Start offset to dump') parser_memwrite.add_argument('--data', dest='data', type=str, help='Data to write') parser_memwrite.add_argument('--filename', dest='filename', type=str, help='Read from filename') parser_reboot = subparsers.add_parser("reboot", help="Reboot device") parser_keys = subparsers.add_parser("keys", help="Write memory") parser_keys.add_argument('--mode', dest='mode', type=str, help='Mode for keys (dxcc,sej,gcpu)') parser_keys.add_argument('--otp', dest='otp', type=str, help='OTP for keys (dxcc,sej,gcpu)') args = parser.parse_args() cmd = args.cmd if cmd not in cmds: showcommands() exit(0) if not os.path.exists("logs"): os.mkdir("logs") st2 = Stage2(args) if st2.connect(): if not st2.preinit(): exit(1) if cmd == "rpmb": if args.filename is None: filename = os.path.join("logs", "rpmb") else: filename = args.filename start = getint(args.start) length = getint(args.length) st2.rpmb(start, length, filename, args.reverse) elif cmd == "preloader": if args.filename is None: filename = os.path.join("logs", "preloader") else: filename = args.filename start = getint(args.start) length = getint(args.length) st2.preloader(start, length, filename=filename) elif cmd == "boot2": if args.filename is None: filename = os.path.join("logs", "boot2") else: filename = args.filename start = getint(args.start) length = getint(args.length) st2.boot2(start, length, filename=filename) elif cmd == "memread": if args.start is None: print("Option --start is needed") exit(0) if args.length is None: print("Option --length is needed") exit(0) start = getint(args.start) length = getint(args.length) st2.memread(start, length, args.filename) elif cmd == "memwrite": if args.start is None: print("Option --start is needed") exit(0) if args.data is None: print("Option --data is needed") exit(0) start = getint(args.start) if st2.memwrite(start, args.data, args.filename): print(f"Successfully wrote data to {hex(start)}.") else: print(f"Failed to write data to {hex(start)}.") elif cmd == "keys": if args.mode is None: print("Option --mode is needed") exit(0) if args.mode == "sej": data=st2.memread(st2.hwcrypto.meid_addr,16) # if not args.otp: # print("Option --otp is needed") # exit(0) elif args.mode == "sej_aes_decrypt" or args.mode == "sej_aes_encrypt": if not args.data: print("Option --data is needed") exit(0) data = bytes.fromhex(args.data) else: data = b"" # otp_hisense=bytes.fromhex("486973656E736500000000000000000000000000000000000000000000000000") # st2.jump(0x223449) st2.keys(data=data, mode=args.mode, otp=args.otp) elif cmd == "reboot": st2.reboot() st2.close() if __name__ == "__main__": main()