#!/usr/bin/env python3 # MTK Flash Client (c) B.Kerler 2018-2021. # Licensed under MIT License import os import sys import logging import time import argparse from binascii import hexlify from struct import unpack, pack from mtkclient.config.usb_ids import default_ids from mtkclient.config.payloads import pathconfig from mtkclient.Library.pltools import PLTools from mtkclient.Library.mtk_preloader import Preloader from mtkclient.Library.mtk_daloader import DAloader from mtkclient.Library.Port import Port from mtkclient.Library.utils import LogBase, logsetup, getint from mtkclient.config.brom_config import Mtk_Config from mtkclient.Library.utils import print_progress def split_by_n(seq, unit_count): """A generator to divide a sequence into chunks of n units.""" while seq: yield seq[:unit_count] seq = seq[unit_count:] class Mtk(metaclass=LogBase): def __init__(self, args, loader, loglevel=logging.INFO, vid=-1, pid=-1, interface=0, preinit=True): self.config = Mtk_Config(loglevel=loglevel) self.args = args self.loader = loader self.vid = vid self.pid = pid self.interface = interface self.pathconfig = pathconfig() self.__logger = logsetup(self, self.__logger, loglevel) da_address = self.args.da_addr if da_address is not None: self.config.chipconfig.da_payload_addr = getint(da_address) brom_address = self.args.brom_addr if brom_address is not None: self.config.chipconfig.brom_payload_addr = getint(brom_address) watchdog_address = self.args.wdt if watchdog_address is not None: self.config.chipconfig.watchdog = getint(watchdog_address) uart_address = self.args.uart_addr if uart_address is not None: self.config.chipconfig.uart = getint(uart_address) var1 = self.args.var1 if var1 is not None: self.config.chipconfig.var1 = getint(var1) if preinit: self.init() def patch_preloader_security(self, data): patched = False data = bytearray(data) patches = [ ("B3F5807F01D1", "B3F5807F01D14FF000004FF000007047"), #confirmed : mt6739 c30 ("B3F5807F04BF4FF4807305F011B84FF0FF307047", "B3F5807F04BF4FF480734FF000004FF000007047"), ] i=0 for patchval in patches: pattern = bytes.fromhex(patchval[0]) idx = data.find(pattern) if idx != -1: patch = bytes.fromhex(patchval[1]) data[idx:idx + len(patch)] = patch patched = True break i+=1 if patched: # with open(sys.argv[1]+".patched","wb") as wf: # wf.write(data) # print("Patched !") self.info(f"Patched preloader security: {hex(i)}") else: self.warning(f"Failed to patch preloader security") return data def parse_preloader(self, preloader): with open(preloader, "rb") as rf: magic = unpack("I", mtk.port.usbread(4))[0] if ack == 0xB1B2B3B4: self.info("Successfully loaded stage2") return else: self.error("Error on jumping to pl") return else: self.error("Preloader path doesn't exist :(") return else: if os.path.exists(pl): with open(pl, "rb") as rf: rf.seek(0) dadata = rf.read() if mtk.preloader.init(args=self.args, readsocid=readsocid): if mtk.config.chipconfig.pl_payload_addr is not None: daaddr = mtk.config.chipconfig.pl_payload_addr else: daaddr = 0x40200000 # 0x40001000 if mtk.preloader.send_da(daaddr, len(dadata), 0x100, dadata): self.info(f"Sent da to {hex(daaddr)}, length {hex(len(dadata))}") if mtk.preloader.jump_da(daaddr): self.info(f"Jumped to {hex(daaddr)}.") ack = unpack(">I", mtk.port.usbread(4))[0] if ack == 0xB1B2B3B4: self.info("Successfully loaded stage2") else: self.error("Error on jumping to pl") return else: self.error("Error on sending pl") return self.close() elif cmd == "peek": addr = getint(self.args.offset) length = getint(self.args.length) if self.args.preloader is not None: preloader = self.args.preloader if os.path.exists(preloader): daaddr, dadata = mtk.parse_preloader(preloader) if self.args.filename is None: filename = "" else: filename = self.args.filename if mtk.preloader.init(args=self.args, readsocid=readsocid): if mtk.config.target_config["daa"]: mtk = mtk.bypass_security(args=self.args, vid=vid, pid=pid, interface=interface, readsocid=readsocid, enforcecrash=enforcecrash) if mtk is not None: if self.args.preloader is not None: if mtk.preloader.send_da(daaddr, len(dadata), 0x100, dadata): self.info(f"Sent preloader to {hex(daaddr)}, length {hex(len(dadata))}") if mtk.preloader.jump_da(daaddr): self.info(f"Jumped to pl {hex(daaddr)}.") time.sleep(2) mtk = Mtk(loader=self.args.loader, loglevel=self.__logger.level, vid=vid, pid=pid, interface=interface, args=self.args) res = mtk.preloader.init(args=self.args, readsocid=readsocid) if not res: self.error("Error on loading preloader") return else: self.info("Successfully connected to pl.") # mtk.preloader.get_hw_sw_ver() # status=mtk.preloader.jump_to_partition(b"") # Do not remove ! else: self.error("Error on jumping to pl") return self.info("Starting to read ...") dwords = length // 4 if length % 4: dwords += 1 if filename != "": wf = open(filename, "wb") sdata = b"" print_progress(0, 100, prefix='Progress:', suffix='Starting, addr 0x%08X' % addr, bar_length=50) length = dwords * 4 old = 0 pos = 0 while dwords: size = min(512 // 4, dwords) data = b"".join(int.to_bytes(val, 4, 'little') for val in mtk.preloader.read32(addr + pos, size)) sdata += data if filename != "": wf.write(data) pos += len(data) prog = pos / length * 100 if round(prog, 1) > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete, addr 0x%08X' % (addr + pos), bar_length=50) old = round(prog, 1) dwords = (length - pos) // 4 print_progress(100, 100, prefix='Progress:', suffix='Finished', bar_length=50) if filename == "": print(hexlify(sdata).decode('utf-8')) else: wf.close() self.info(f"Data from {hex(addr)} with size of {hex(length)} was written to " + filename) self.close() elif cmd == "stage": if self.args.filename is None: pc = pathconfig() stage1file = os.path.join(pc.get_payloads_path(), "generic_stage1_payload.bin") else: stage1file = self.args.filename if not os.path.exists(stage1file): self.error(f"Error: {stage1file} doesn't exist !") return False if self.args.stage2addr is None: stage2addr = None else: stage2addr = getint(self.args.stage2addr) if self.args.stage2 is None: stage2file = os.path.join(mtk.pathconfig.get_payloads_path(), "stage2.bin") else: stage2file = self.args.stage2 if not os.path.exists(stage2file): self.error(f"Error: {stage2file} doesn't exist !") return False verifystage2 = self.args.verifystage2 if mtk.preloader.init(args=self.args, readsocid=readsocid): if self.args.crash is not None: mtk = mtk.crasher(args=self.args, readsocid=readsocid, enforcecrash=enforcecrash) if mtk.port.cdc.pid == 0x0003: plt = PLTools(mtk, self.__logger.level) self.info("Uploading stage 1") ptype = "kamakiri2" if self.args.ptype is not None: ptype = self.args.ptype if plt.runpayload(filename=stage1file, ptype=ptype): self.info("Successfully uploaded stage 1, sending stage 2") with open(stage2file, "rb") as rr: stage2data = rr.read() while len(stage2data) % 0x200: stage2data += b"\x00" if stage2addr is None: stage2addr = mtk.config.chipconfig.da_payload_addr if stage2addr is None: stage2addr = 0x201000 # ###### Send stage2 # magic mtk.port.usbwrite(pack(">I", 0xf00dd00d)) # cmd write mtk.port.usbwrite(pack(">I", 0x4000)) # address mtk.port.usbwrite(pack(">I", stage2addr)) # length mtk.port.usbwrite(pack(">I", len(stage2data))) bytestowrite = len(stage2data) pos = 0 while bytestowrite > 0: size = min(bytestowrite, 1) if mtk.port.usbwrite(stage2data[pos:pos + size]): bytestowrite -= size pos += size # mtk.port.usbwrite(b"") time.sleep(0.1) flag = mtk.port.rdword() if flag != 0xD0D0D0D0: self.error(f"Error on sending stage2, size {hex(len(stage2data))}.") self.info(f"Done sending stage2, size {hex(len(stage2data))}.") if verifystage2: self.info("Verifying stage2 data") rdata = b"" mtk.port.usbwrite(pack(">I", 0xf00dd00d)) mtk.port.usbwrite(pack(">I", 0x4002)) mtk.port.usbwrite(pack(">I", stage2addr)) mtk.port.usbwrite(pack(">I", len(stage2data))) bytestoread = len(stage2data) while bytestoread > 0: size = min(bytestoread, 1) rdata += mtk.port.usbread(size) bytestoread -= size flag = mtk.port.rdword() if flag != 0xD0D0D0D0: self.error("Error on reading stage2 data") if rdata != stage2data: self.error("Stage2 data doesn't match") with open("rdata", "wb") as wf: wf.write(rdata) else: self.info("Stage2 verification passed.") # ####### Kick Watchdog # magic # mtk.port.usbwrite(pack("I", 0xf00dd00d)) # cmd jump mtk.port.usbwrite(pack(">I", 0x4001)) # address mtk.port.usbwrite(pack(">I", stage2addr)) self.info("Done jumping stage2 at %08X" % stage2addr) ack = unpack(">I", mtk.port.usbread(4))[0] if ack == 0xB1B2B3B4: self.info("Successfully loaded stage2") self.close() else: mtk.port.close() return False elif cmd == "payload": if mtk.preloader.init(args=self.args, readsocid=readsocid): mtk = mtk.crasher(args=self.args, readsocid=readsocid, enforcecrash=enforcecrash) plt = PLTools(mtk, self.__logger.level) payloadfile = self.args.payload if payloadfile is None: if mtk.config.chipconfig.loader is None: payloadfile = os.path.join(mtk.pathconfig.get_payloads_path(), "generic_patcher_payload.bin") else: payloadfile = os.path.join(mtk.pathconfig.get_payloads_path(), mtk.config.chipconfig.loader) ptype = "" if self.args.ptype is not None: ptype = self.args.ptype plt.runpayload(filename=payloadfile, ptype=ptype) mtk.port.close() self.close() elif cmd == "gettargetconfig": if mtk.preloader.init(args=self.args, readsocid=readsocid): self.info("Getting target info...") mtk.preloader.get_target_config() mtk.port.close() self.close() else: preloader = self.args.preloader if mtk.preloader.init(args=self.args, readsocid=readsocid): if mtk.config.target_config["daa"]: mtk = mtk.bypass_security(args=self.args, vid=vid, pid=pid, interface=interface, readsocid=readsocid, enforcecrash=enforcecrash) self.info("Device is protected.") if mtk is not None: meid = mtk.preloader.get_meid() if meid: self.info("Device is in BROM mode. Trying to dump preloader.") if preloader is None: preloader = self.dump_preloader_ram(mtk) else: self.info("Device is unprotected.") meid = mtk.preloader.get_meid() if meid: self.info("Device is in BROM mode. Trying to dump preloader.") mtk = mtk.bypass_security(args=self.args, vid=vid, pid=pid, interface=interface, readsocid=readsocid, enforcecrash=enforcecrash) preloader = self.dump_preloader_ram(mtk) if not mtk.daloader.upload_da(preloader=preloader): self.error("Error uploading da") return False else: return False if cmd == "gpt": directory = self.args.directory if directory is None: directory = "" sfilename = os.path.join(directory, f"gpt_main.bin") data, guid_gpt = mtk.daloader.get_gpt(self.args) if guid_gpt is None: self.error("Error reading gpt") mtk.daloader.close() exit(1) else: with open(sfilename, "wb") as wf: wf.write(data) print(f"Dumped GPT from to {sfilename}") sfilename = os.path.join(directory, f"gpt_backup.bin") with open(sfilename, "wb") as wf: wf.write(data[mtk.daloader.daconfig.pagesize:]) print(f"Dumped Backup GPT to {sfilename}") mtk.daloader.close() self.close() elif cmd == "printgpt": data, guid_gpt = mtk.daloader.get_gpt(self.args) if guid_gpt is None: self.error("Error reading gpt") else: guid_gpt.print() mtk.daloader.close() self.close() elif cmd == "r": partitionname = self.args.partitionname parttype = self.args.parttype filename = self.args.filename filenames = filename.split(",") partitions = partitionname.split(",") if len(partitions) != len(filenames): self.error("You need to gives as many filenames as given partitions.") mtk.daloader.close() exit(1) if parttype == "user" or parttype is None: i = 0 self.info("Requesting available partitions ....") gpttable = mtk.daloader.get_partition_data(self.args, parttype=parttype) for partition in partitions: partfilename = filenames[i] i += 1 if partition == "gpt": mtk.daloader.readflash(addr=0, length=0x16000, filename=partfilename, parttype=parttype) continue else: rpartition = None for gptentry in gpttable: if gptentry.name.lower() == partition.lower(): rpartition = gptentry break if rpartition is not None: self.info(f"Dumping partition {rpartition.name}") if mtk.daloader.readflash(addr=rpartition.sector * mtk.daloader.daconfig.pagesize, length=rpartition.sectors * mtk.daloader.daconfig.pagesize, filename=partfilename, parttype=parttype): self.info(f"Dumped sector {str(rpartition.sector)} with sector count " + f"{str(rpartition.sectors)} as {partfilename}.") else: self.info(f"Failed to dump sector {str(rpartition.sector)} with sector count " + f"{str(rpartition.sectors)} as {partfilename}.") else: self.error(f"Error: Couldn't detect partition: {partition}\nAvailable partitions:") for rpartition in gpttable: self.info(rpartition.name) else: i = 0 for partfilename in filenames: pos = 0 if mtk.daloader.readflash(addr=pos, length=0xFFFFFFFF, filename=partfilename, parttype=parttype): print(f"Dumped partition {str(partitionname)} as {partfilename}.") else: print(f"Failed to dump partition {str(partitionname)} as {partfilename}.") i += 1 mtk.daloader.close() self.close() elif cmd == "rl": directory = self.args.directory parttype = self.args.parttype if self.args.skip: skip = self.args.skip.split(",") else: skip = [] if not os.path.exists(directory): os.mkdir(directory) data, guid_gpt = mtk.daloader.get_gpt(self.args, parttype=parttype) if guid_gpt is None: self.error("Error reading gpt") else: storedir = directory if not os.path.exists(storedir): os.mkdir(storedir) sfilename = os.path.join(storedir, f"gpt_main.bin") with open(sfilename, "wb") as wf: wf.write(data) sfilename = os.path.join(storedir, f"gpt_backup.bin") with open(sfilename, "wb") as wf: wf.write(data[mtk.daloader.daconfig.pagesize * 2:]) for partition in guid_gpt.partentries: partitionname = partition.name if partition.name in skip: continue filename = os.path.join(storedir, partitionname + ".bin") self.info( f"Dumping partition {str(partition.name)} with sector count {str(partition.sectors)} " + f"as {filename}.") mtk.daloader.readflash(addr=partition.sector * mtk.daloader.daconfig.pagesize, length=partition.sectors * mtk.daloader.daconfig.pagesize, filename=filename, parttype=parttype) mtk.daloader.close() self.close() elif cmd == "rf": filename = self.args.filename parttype = self.args.parttype if mtk.daloader.daconfig.flashtype == "ufs": if parttype == "lu0": length = mtk.daloader.daconfig.flashsize[0] elif parttype == "lu1": length = mtk.daloader.daconfig.flashsize[1] elif parttype == "lu2": length = mtk.daloader.daconfig.flashsize[2] else: length = mtk.daloader.daconfig.flashsize[0] else: length = mtk.daloader.daconfig.flashsize print(f"Dumping sector 0 with flash size {hex(length)} as {filename}.") if mtk.daloader.readflash(addr=0, length=length, filename=filename, parttype=parttype): print(f"Dumped sector 0 with flash size {hex(length)} as {filename}.") else: print(f"Failed to dump sector 0 with flash size {hex(length)} as {filename}.") mtk.daloader.close() self.close() elif cmd == "rs": start = getint(self.args.start_sector) sectors = getint(self.args.sectors) filename = self.args.filename parttype = self.args.parttype if mtk.daloader.readflash(addr=start * mtk.daloader.daconfig.pagesize, length=sectors * mtk.daloader.daconfig.pagesize, filename=filename, parttype=parttype): print(f"Dumped sector {str(start)} with sector count {str(sectors)} as {filename}.") else: print(f"Failed to dump sector {str(start)} with sector count {str(sectors)} as {filename}.") mtk.daloader.close() self.close() elif cmd == "footer": filename = self.args.filename data, guid_gpt = mtk.daloader.get_gpt(self.args) if guid_gpt is None: self.error("Error reading gpt") mtk.daloader.close() exit(1) else: pnames = ["userdata2", "metadata", "userdata", "reserved1", "reserved2", "reserved3"] for partition in guid_gpt.partentries: if partition.name in pnames: print(f"Detected partition: {partition.name}") if partition.name in ["userdata2", "userdata"]: data = mtk.daloader.readflash( addr=( partition.sector + partition.sectors) * mtk.daloader.daconfig.pagesize - 0x4000, length=0x4000, filename="", parttype="user", display=False) else: data = mtk.daloader.readflash(addr=partition.sector * mtk.daloader.daconfig.pagesize, length=0x4000, filename="", parttype="user", display=False) if data == b"": continue val = unpack("