From b26ddc907bd024cc2ff69100d71d893345ce518f Mon Sep 17 00:00:00 2001 From: Bjoern Kerler Date: Sat, 23 Nov 2019 19:06:21 +0100 Subject: [PATCH] New interface, diag fixes --- Library/gpt.py | 8 +- Library/tcpclient.py | 4 +- Library/usb.py | 134 ----------- Library/usblib.py | 6 +- README.md | 43 +++- diag.py | 0 edl.py | 549 +++++++++++++++++++++++-------------------- fhloaderparse.py | 0 requirements.txt | 3 + tcpclient.py | 2 +- 10 files changed, 340 insertions(+), 409 deletions(-) delete mode 100755 Library/usb.py mode change 100644 => 100755 diag.py mode change 100644 => 100755 fhloaderparse.py create mode 100644 requirements.txt diff --git a/Library/gpt.py b/Library/gpt.py index 09caf8a..4664e08 100755 --- a/Library/gpt.py +++ b/Library/gpt.py @@ -112,13 +112,13 @@ class gpt: EFI_VMWARE_VMFS = 0xAA31E02A EFI_VMWARE_RESERVED = 0x9198EFFC - def __init__(self, num_part_entries=None, part_entry_size=None, part_entry_start_lba=None, *args, **kwargs): + def __init__(self, num_part_entries=0, part_entry_size=0, part_entry_start_lba=0, *args, **kwargs): self.num_part_entries = num_part_entries self.part_entry_size = part_entry_size self.part_entry_start_lba = part_entry_start_lba - if num_part_entries is None: + if num_part_entries is 0: self.gpt_header += [('num_part_entries', 'I'),] - if part_entry_size is None: + if part_entry_size is 0: self.gpt_header += [('part_entry_size', 'I'),] @@ -131,7 +131,7 @@ class gpt: if self.header["revision"]!=0x100: print("Unknown GPT revision.") return False - if self.part_entry_start_lba is not None: + if self.part_entry_start_lba is not 0: start = self.part_entry_start_lba else: start=self.header["part_entry_start_lba"]*sectorsize diff --git a/Library/tcpclient.py b/Library/tcpclient.py index 53ee48d..12a6cae 100644 --- a/Library/tcpclient.py +++ b/Library/tcpclient.py @@ -1,9 +1,9 @@ import socket class tcpclient(): - def __init__(self): + def __init__(self, port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_address = ("localhost", 1340) + server_address = ("localhost", port) print("connecting to %s port %s" % server_address) self.sock.connect(server_address) diff --git a/Library/usb.py b/Library/usb.py deleted file mode 100755 index 1b03ed0..0000000 --- a/Library/usb.py +++ /dev/null @@ -1,134 +0,0 @@ -import usb.core # pyusb -import usb.util -import time -import errno - -USB_DIR_OUT=0 # to device -USB_DIR_IN=0x80 # to host - -# USB types, the second of three bRequestType fields -USB_TYPE_MASK=(0x03 << 5) -USB_TYPE_STANDARD=(0x00 << 5) -USB_TYPE_CLASS=(0x01 << 5) -USB_TYPE_VENDOR=(0x02 << 5) -USB_TYPE_RESERVED=(0x03 << 5) - -# USB recipients, the third of three bRequestType fields -USB_RECIP_MASK=0x1f -USB_RECIP_DEVICE=0x00 -USB_RECIP_INTERFACE=0x01 -USB_RECIP_ENDPOINT=0x02 -USB_RECIP_OTHER=0x03 -#From Wireless USB 1.0 -USB_RECIP_PORT= 0x04 -USB_RECIP_RPIPE=0x05 - -class usb_class(): - - def log(self,level,msg): - if level>1: - if self.debug==True: - print(msg) - else: - print(msg) - - def __init__(self,vid=0x05c6, pid=0x9008, Debug=False): - self.vid=vid - self.pid=pid - self.debug=Debug - self.connected=False - - def getInterfaceCount(self): - self.device = usb.core.find(idVendor=self.vid, idProduct=self.pid) - if self.device is None: - self.log(2, "Couldn't detect the device. Is it connected ?") - return False - try: - self.device.set_configuration() - except: - pass - self.configuration = self.device.get_active_configuration() - self.log(2, self.configuration) - return self.configuration.bNumInterfaces - - def connect(self, interface=0, EP_IN=-1, EP_OUT=-1): - self.device = usb.core.find(idVendor=self.vid, idProduct=self.pid) - if self.device is None: - self.log(2, "Couldn't detect the device. Is it connected ?") - return False - try: - self.device.set_configuration() - except: - pass - self.configuration = self.device.get_active_configuration() - self.log(2, self.configuration) - if interface>self.configuration.bNumInterfaces: - print("Invalid interface, max number is %d" % self.configuration.bNumInterfaces) - return False - for itf_num in [interface]: - itf = usb.util.find_descriptor(self.configuration, - bInterfaceNumber=itf_num) - try: - if self.device.is_kernel_driver_active(itf_num): - self.log(2, "Detaching kernel driver") - self.device.detach_kernel_driver(itf_num) - except: - self.log(2, "No kernel driver supported.") - - usb.util.claim_interface(self.device, itf_num) - if EP_OUT==-1: - self.EP_OUT = usb.util.find_descriptor(itf, - # match the first OUT endpoint - custom_match= \ - lambda e: \ - usb.util.endpoint_direction(e.bEndpointAddress) == \ - usb.util.ENDPOINT_OUT) - else: - self.EP_OUT=EP_OUT - if EP_IN==-1: - self.EP_IN = usb.util.find_descriptor(itf, - # match the first OUT endpoint - custom_match= \ - lambda e: \ - usb.util.endpoint_direction(e.bEndpointAddress) == \ - usb.util.ENDPOINT_IN) - else: - self.EP_IN=EP_IN - - self.connected=True - return True - - def write(self,command,pktsize=64): - pos=0 - if command==b'': - self.device.write(self.EP_OUT, b'') - else: - while pos to see help with all options -- "./edl.py -printgpt 0 -memory ufs" -> to print gpt on lun 0 on device with ufs -- "./edl.py -printgpt 0 -memory emmc" -> to print gpt on device with emmc -- "./edl.py -rf 0 flash.bin -memory emmc" -> to dump whole flash on lun 0 for device with emmc -- "./edl.py -footer 0 footer.bin -memory emmc" -> to dump the crypto footer on lun 0 for Androids -- "./edl.py -w 0 boot boot.img -memory emmc" -> to write boot.img to the "boot" partition on lun 0 on the device with emmc flash -- "./edl.py -memory emmc -server" -> Run TCP/IP server, see tcpclient.py for an example client +- "./edl.py printgpt --memory=ufs --lun=0" -> to print gpt on lun 0 on device with ufs +- "./edl.py printgpt" -> to print gpt on device with emmc +- "./edl.py rf flash.bin" -> to dump whole flash for device with emmc +- "./edl.py rf lun0.bin --memory=ufs --lun=0" -> to dump whole lun 0 for device with ufs +- "./edl.py rl dumps" -> to dump all partitions to directory dumps for device with emmc +- "./edl.py rl dumps --memory=ufs --lun=0" -> to dump all partitions from lun0 to directory dumps for device with ufs +- "./edl.py rs 0 15 data.bin" -> to dump 15 sectors from starting sector 0 to file data.bin for device with emmc +- "./edl.py r boot_a boot.img" -> to dump the partition "boot_a" to the filename boot.img for device with emmc +- "./edl.py footer footer.bin" -> to dump the crypto footer for Androids with emmc flash +- "./edl.py w boot boot.img --memory=ufs --lun=0" -> to write boot.img to the "boot" partition on lun 0 on the device with ufs flash +- "./edl.py w boot boot.img" -> to write boot.img to the "boot" partition on lun 0 on the device with emmc flash +- "./edl.py wl dumps" -> to write all files from "dumps" folder to according partitions to flash +- "./edl.py wf dump.bin" -> to write the rawimage dump.bin to flash +- "./edl.py e misc" -> to erase the partition misc on emmc flash +- "./edl.py server --memory=ufs --tcpport=1340" -> Run TCP/IP server on port 1340, see tcpclient.py for an example client +- "./edl.py peek 0x200000 0x10 mem.bin" -> To dump 0x10 bytes from offset 0x200000 to file mem.bin from memory +- "./edl.py peekhex 0x200000 0x10" -> To dump 0x10 bytes from offset 0x200000 as hex string from memory +- "./edl.py peekqword 0x200000" -> To display a qword (8-bytes) at offset 0x200000 from memory +- "./edl.py pokeqword 0x200000 0x400000" -> To write the q-word value 0x400000 to offset 0x200000 in memory +- "./edl.py poke 0x200000 mem.bin" -> To write the binary file mem.bin to offset 0x200000 in memory +- "./edl.py secureboot" -> To display secureboot fuses (only on EL3 loaders) +- "./edl.py pbl pbl.bin" -> To dump pbl (only on EL3 loaders) +- "./edl.py qfp qfp.bin" -> To dump qfprom fuses (only on EL3 loaders) +- "./edl.py xml run.xml" -> To send a xml file run.xml via firehose +- "./edl.py reset" -> To reboot the phone + Install EDL loaders =============== @@ -40,8 +60,11 @@ Install EDL loaders - Copy all your loaders into the examples directory - "./fhloaderparse.py examples" -> will autodetect and rename loader structure and copy them to the "Loaders" directory -Run Diag +Run Diag (examples) ======== +For Oneplus 6T, enter *#801#* on dialpad, set Engineer Mode and Serial to on and try : +- "./diag.py -vid 0x05c6 -pid 0x676c -interface 0 -info" + Allows to send commands to the qc diag port - "./diag.py -vid 0x1234 -pid 0x5678 -interface 0 -info" -> Send cmd "00" and return info - "./diag.py -vid 0x1234 -pid 0x5678 -interface 0 -spc 303030303030" -> Send spc "303030303030" diff --git a/diag.py b/diag.py old mode 100644 new mode 100755 diff --git a/edl.py b/edl.py index 9636dd3..b9124ec 100755 --- a/edl.py +++ b/edl.py @@ -1,14 +1,107 @@ #!/usr/bin/env python3 -''' -Licensed under MIT License, (c) B. Kerler 2018-2019 -''' +# Qualcomm Sahara / Firehose Client (c) B.Kerler 2018-2019. +# Licensed under MIT License +""" +Usage: + edl.py -h | --help + edl.py [--vid=vid] [--pid=pid] + edl.py [--loader=filename] + edl.py [--debugmode] + edl.py [--gpt-num-part-entries=number] [--gpt-part-entry-size=number] [--gpt-part-entry-start-lba=number] + edl.py [--memory=memtype] [--skipstorageinit] [--maxpayload=bytes] [--sectorsize==bytes] + edl.py server [--tcpport=portnumber] + edl.py printgpt [--lun=lun] + edl.py gpt [--memory=memtype] [--lun=lun] + edl.py r [--memory=memtype] [--lun=lun] + edl.py rl [--memory=memtype] [--lun=lun] + edl.py rf [--memory=memtype] [--lun=lun] + edl.py rs [--lun=lun] + edl.py w [--memory=memtype] [--lun=lun] [--skipwrite] + edl.py wl [--memory=memtype] [--lun=lun] + edl.py wf [--memory=memtype] [--lun=lun] + edl.py ws [--memory=memtype] [--lun=lun] [--skipwrite] + edl.py e [--memory=memtype] [--skipwrite] [--lun=lun] + edl.py es [--memory=memtype] [--lun=lun] [--skipwrite] + edl.py footer [--memory=memtype] [--lun=lun] + edl.py peek + edl.py peekhex + edl.py peekdword + edl.py peekqword + edl.py memtbl + edl.py poke + edl.py pokehex + edl.py pokedword + edl.py pokeqword + edl.py memcpy + edl.py secureboot + edl.py pbl + edl.py qfp + edl.py getstorageinfo + edl.py setbootablestoragedrive + edl.py send + edl.py xml + edl.py reset + edl.py nop + +Description: + server [--tcpport=portnumber] # Run tcp/ip server + printgpt [--lun=lun] # Print GPT Table information + gpt [--memory=memtype] [--lun=lun] # Save gpt table to file + r [--memory=memtype] [--lun=lun] # Read flash to filename + rl [--memory=memtype] [--lun=lun] # Read all partitions from flash to a directory + rf [--memory=memtype] [--lun=lun] # Read whole flash to file + rs [--lun=lun] # Read sectors starting at start_sector to filename + w [--memory=memtype] [--lun=lun] [--skipwrite] # Write filename to partition to flash + wl [--memory=memtype] [--lun=lun] # Write all files from directory to flash + wf [--memory=memtype] [--lun=lun] # Write whole filename to flash + ws [--memory=memtype] [--lun=lun] [--skipwrite] # Write filename to flash at start_sector + e [--memory=memtype] [--skipwrite] [--lun=lun] # Erase partition from flash + es [--memory=memtype] [--lun=lun] [--skipwrite] # Erase sectors at start_sector from flash + footer [--memory=memtype] [--lun=lun] # Read crypto footer from flash + peek # Dump memory at offset with given length to filename + peekhex # Dump memory at offset and given length as hex string + peekdword # Dump DWORD at memory offset + peekqword # Dump QWORD at memory offset + memtbl # Dump memory table to file + poke # Write filename to memory at offset to memory + pokehex # Write hex string data at offset to memory + pokedword # Write DWORD to memory at offset + pokeqword # Write QWORD to memory at offset + memcpy # Copy memory from srcoffset with given size to dstoffset + secureboot # Print secureboot fields from qfprom fuses + pbl # Dump primary bootloader to filename + qfp # Dump QFPROM fuses to filename + getstorageinfo # Print storage info in firehose mode + setbootablestoragedrive # Change bootable storage drive to lun number + send # Send firehose command + xml # Send firehose xml file + reset # Send firehose reset command + nop # Send firehose nop command + +Options: + --loader=filename Use specific EDL loader, disable autodetection [default: None] + --vid=vid Set usb vendor id used for EDL [default: 0x05c6] + --pid=pid Set usb product id used for EDL [default: 0x9008] + --lun=lun Set lun to read from (UFS memory only) [default: 0] + --maxpayload=bytes Set the maximum payload for EDL [default: 0x100000] + --sectorsize=bytes Set default sector size [default: 0x200] + --memory=memtype Set memory type (EMMC or UFS) [default: eMMC] + --skipwrite Do not allow any writes to flash (simulate only) + --skipstorageinit Skip storage initialisation + --debugmode Enable verbose mode + --gpt-num-part-entries=number Set GPT entry count [default: 0] + --gpt-part-entry-size=number Set GPT entry size [default: 0] + --gpt-part-entry-start-lba=number Set GPT entry start lba sector [default: 0] + --tcpport=portnumber Set port for tcp server [default:1340] +""" +print("Qualcomm Sahara / Firehose Client (c) B.Kerler 2018-2019.") + +from docopt import docopt +args = docopt(__doc__, version='EDL 2.0') -import argparse import time -import os from Library.utils import * from Library.usblib import usb_class -from Library.gpt import gpt from Library.sahara import qualcomm_sahara from Library.firehose import qualcomm_firehose from Library.streaming import qualcomm_streaming @@ -245,88 +338,21 @@ def check_cmd(supported_funcs,func): return False def main(): - info='Qualcomm Sahara / Firehose Client (c) B.Kerler 2018-2019.' - parser = argparse.ArgumentParser(description=info) - print("\n"+info+"\n\n") - parser.add_argument('-loader',metavar="none,",help='[Option] Flash programmer to load e.g. prog_emmc_firehose.elf', default='') - parser.add_argument('-vid',metavar="",help='[Option] Specify vid, default=0x05c6)', default="0x05C6") - parser.add_argument('-pid',metavar="", help='[Option] Specify pid, default=0x9008)', default="0x9008") - parser.add_argument('-maxpayload',metavar="",help='[Option] The max bytes to transfer in firehose mode (default=1048576)', type=int, default=1048576) - parser.add_argument('-skipwrite', help='[Option] Do not write actual data to disk (use this for UFS provisioning)', action="store_true") - parser.add_argument('-skipstorageinit', help='[Option] Do not initialize storage device (use this for UFS provisioning)',action="store_true") - parser.add_argument('-memory', metavar="",help='[Option] Memory type (default=UFS)',default='UFS') - parser.add_argument('-sectorsize', metavar="",help='[Option] Define Disk Sector Size (default=512)',type=int,default=512) - #parser.add_argument('-lun', metavar="",help='[Option] Define LUN',type=int,default=0) - #parser.add_argument('-debug', help='[Option] Enable debug output', action="store_true") - parser.add_argument('-debugmode', help='[CMD:Sahara] Switch to Memory Dump mode (Debug only)',action="store_true") - parser.add_argument('-debugread', help='[CMD:Sahara] Read Debug Logs',action="store_true") - parser.add_argument('-dmss', help='[CMD:Sahara] Switch to DMSS Download mode',action="store_true") - parser.add_argument('-streaming', help='[CMD:Sahara] Switch to Streaming Download mode', action="store_true") - - parser.add_argument('-r', metavar=("","",""), help='[CMD:Firehose] Dump partition based on partition name', nargs=3,default=[]) - parser.add_argument('-rl', metavar=("", ""), help='[CMD:Firehose] Dump whole lun/flash partitions to a directory', nargs=2,default=[]) - parser.add_argument('-rf', metavar=("",""),help='[CMD:Firehose] Dump whole lun/flash', nargs=2, default=[]) - parser.add_argument('-rs', metavar=("","","",""), help='[CMD:Firehose] Dump from start sector to end sector to file', nargs=4,default=[]) - parser.add_argument('-w', metavar=("","",""), help='[CMD:Firehose] Write filename to GPT partition', nargs=3, default=[]) - parser.add_argument('-ws', metavar=("","",""), help='[CMD:Firehose] Write filename at sector ', nargs=3, default=[]) - parser.add_argument('-e', metavar=("",""), help='[CMD:Firehose] Erase the entire partition specified',nargs=2, default=[]) - parser.add_argument('-es', metavar=("","",""), help='[CMD:Firehose] Erase disk from start sector for number of sectors',nargs=3,default=[]) - parser.add_argument('-gpt', metavar=",", help='[CMD:Firehose] Dump gpt to file', nargs=2, default=[]) - parser.add_argument('-printgpt', help='[CMD:Firehose] Print gpt', default="") - parser.add_argument('-footer', metavar=("",""), help='[CMD:Firehose] Dump crypto footer', nargs=2, default=[]) - - parser.add_argument('-pbl', metavar=(""),help='[CMD:Firehose] Dump boot rom (pbl)', default="") - parser.add_argument('-qfp', metavar=(""), help='[CMD:Firehose] Dump qfprom', default="") - parser.add_argument('-secureboot', help='[CMD:Firehose] Get secure boot info', action="store_true") - parser.add_argument('-memtbl', metavar=(""), help='[CMD:Firehose] Dump memory table', default="") - - parser.add_argument('-peek', metavar=("","",""),help='[CMD:Firehose] Read memory from offset,length to file', nargs=3, default=[]) - parser.add_argument('-peekhex', metavar=("",""),help='[CMD:Firehose] Read memory from offset, length and display', nargs=2, default=[]) - parser.add_argument('-peekdword', metavar=(""),help='[CMD:Firehose] Read dword (hex) from offset, length and display', nargs=1, default=[]) - parser.add_argument('-peekqword', metavar=(""),help='[CMD:Firehose] Read qword (hex) from offset, length and display', nargs=1, default=[]) - parser.add_argument('-poke', metavar=("", ""),help='[CMD:Firehose] write data at offset from file', nargs=2, default=[]) - parser.add_argument('-pokehex', metavar=("", ""),help='[CMD:Firehose] write data at offset as hexstring to memory', nargs=2, default=[]) - parser.add_argument('-pokedword', metavar=("", ""),help='[CMD:Firehose] write dword (hex) at offset to memory', nargs=2, default=[]) - parser.add_argument('-pokeqword', metavar=("", ""), help='[CMD:Firehose] write qword (hex) at offset to memory', nargs=2, default=[]) - parser.add_argument('-memcpy', metavar=("", "", ""), help='[CMD:Firehose] copy memory offset from src to dst', nargs=3, default=[]) - parser.add_argument('-reset', help='[CMD:Firehose] Reset device', action="store_true") - parser.add_argument('-nop', help='[CMD:Firehose] NOP', action="store_true") - parser.add_argument('-getstorageinfo', help='[CMD:Firehose] Get Storage/Flash Info', action="store_true") - parser.add_argument('-setbootablestoragedrive', metavar="", - help='[CMD:Firehose] Set the physical partition number active for booting',default='') - parser.add_argument('-send', metavar="", help='[CMD:Firehose] Send xml command', default='') - parser.add_argument('-xml', metavar="", help='[CMD:Firehose] XML to run in firehose mode', default='') - parser.add_argument('-gpt-num-part-entries', metavar="", type=int, help='[CMD:Firehose] Number of partitions', default=None) - parser.add_argument('-gpt-part-entry-size', metavar="", type=int, help='[CMD:Firehose] Size of partition entry', default=None) - parser.add_argument('-gpt-part-entry-start-lba', metavar="", type=int, help='[CMD:Firehose] Beginning of partition entries', default=None) - parser.add_argument('-server', help='Run as a TCP/IP Server, listing on port 1336', action="store_true") - - - args = parser.parse_args() - mode="" loop=0 - if args.vid!="": - vid=int(args.vid,16) - if args.pid!="": - pid=int(args.pid,16) + vid=int(args["--vid"],16) + pid=int(args["--pid"],16) cdc = usb_class(vid=vid, pid=pid) sahara = qualcomm_sahara(cdc) - if args.loader=='none': + if args["--loader"]=='None': logger.info("Trying with no loader given ...") sahara.programmer = None - elif (args.loader==""): - logger.info("Trying with loaders in Loader directory ...") - sahara.programmer = None - elif (args.loader!=''): - logger.info(f"Using loader {args.loader} ...") - with open(args.loader, "rb") as rf: - sahara.programmer = rf.read() else: - print("Sorry, you need a firehose loader (-loader) or try without loader \"-loader none\" !") - print("Use with -h for displaying help.") - exit(0) + loader=args["--loader"] + logger.info(f"Using loader {loader} ...") + with open(loader, "rb") as rf: + sahara.programmer = rf.read() logger.info("Waiting for the device") @@ -402,14 +428,14 @@ def doconnect(cdc, loop, mode, resp, sahara): def handle_streaming(args, cdc, sahara): fh = qualcomm_streaming(cdc,sahara) -def do_firehose_server(args,cdc,sahara): +def do_firehose_server(mainargs,cdc,sahara): cfg = qualcomm_firehose.cfg() - cfg.MemoryName = args.memory + cfg.MemoryName = mainargs["--memory"] cfg.ZLPAwareHost = 1 - cfg.SkipStorageInit = args.skipstorageinit - cfg.SkipWrite = args.skipwrite - cfg.MaxPayloadSizeToTargetInBytes = args.maxpayload - cfg.SECTOR_SIZE_IN_BYTES = args.sectorsize + cfg.SkipStorageInit = mainargs["--skipstorageinit"] + cfg.SkipWrite = mainargs["--skipwrite"] + cfg.MaxPayloadSizeToTargetInBytes = int(mainargs["--maxpayload"],16) + cfg.SECTOR_SIZE_IN_BYTES = int(mainargs["--sectorsize"],16) cfg.bit64 = sahara.bit64 fh = qualcomm_firehose(cdc, xmlparser(), cfg) supported_functions = fh.connect(0) @@ -422,7 +448,7 @@ def do_firehose_server(args,cdc,sahara): import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_address = ('localhost',1340) + server_address = ('localhost',int(mainargs["--tcpport"])) print ('starting up on %s port %s' % server_address) sock.bind(server_address) sock.listen(1) @@ -447,12 +473,12 @@ def do_firehose_server(args,cdc,sahara): args=[args] if cmd=="gpt": if len(args) != 1: - response="\n"+"Usage: gpt:" + response="\n"+"Usage: gpt:," connection.sendall(bytes(response,'utf-8')) else: lun=int(args[0]) - fh.cmd_read(lun, 0, 0x4000 // cfg.SECTOR_SIZE_IN_BYTES*4, args.gpt) - response=f"\n"+f"Dumped GPT to {args.gpt}" + fh.cmd_read(lun, 0, 0x4000 // cfg.SECTOR_SIZE_IN_BYTES*4, args[1]) + response=f"\n"+f"Dumped GPT to {args[1]}" connection.sendall(bytes(response,'utf-8')) elif cmd=="printgpt": if len(args) != 1: @@ -460,8 +486,8 @@ def do_firehose_server(args,cdc,sahara): connection.sendall(bytes(response,'utf-8')) else: lun = int(args[0]) - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt != None: response="\n"+guid_gpt.tostring() connection.sendall(bytes(response,'utf-8')) @@ -477,8 +503,8 @@ def do_firehose_server(args,cdc,sahara): lun = int(args[0]) partitionname = args[1] filename = args[2] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt == None: response = "\n" + f"Error reading GPT Table" connection.sendall(bytes(response, 'utf-8')) @@ -503,8 +529,8 @@ def do_firehose_server(args,cdc,sahara): directory = args[1] if not os.path.exists(directory): os.mkdir(directory) - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt == None: response = "\n" + f"Error reading GPT Table" connection.sendall(bytes(response, 'utf-8')) @@ -523,13 +549,13 @@ def do_firehose_server(args,cdc,sahara): else: lun=int(args[0]) filename = args[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt == None: response = "\n" + f"Error: Couldn't reading GPT Table" connection.sendall(bytes(response, 'utf-8')) else: - fh.cmd_read(args.lun, 0, guid_gpt.totalsectors, filename) + fh.cmd_read(lun, 0, guid_gpt.totalsectors, filename) response="\n"+f"Dumped sector 0 with sector count {str(guid_gpt.totalsectors)} as {filename}." connection.sendall(bytes(response,'utf-8')) elif cmd=="pbl": @@ -541,7 +567,7 @@ def do_firehose_server(args,cdc,sahara): response = "\n" + "Peek command isn't supported by edl loader" connection.sendall(bytes(response, 'utf-8')) else: - filename = args.pbl + filename = args[0] if TargetName in infotbl: v = infotbl[TargetName] if len(v[0]) > 0: @@ -632,8 +658,8 @@ def do_firehose_server(args,cdc,sahara): else: lun=int(args[0]) filename = args[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt == None: response = "\n" + f"Error: Couldn't reading GPT Table" connection.sendall(bytes(response, 'utf-8')) @@ -875,8 +901,9 @@ def do_firehose_server(args,cdc,sahara): response="\n"+f"Error: Couldn't find file: {filename}" connection.sendall(bytes(response,'utf-8')) else: - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), + int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt == None: response = "\n" + f"Error: Couldn't reading GPT Table" connection.sendall(bytes(response, 'utf-8')) @@ -914,6 +941,24 @@ def do_firehose_server(args,cdc,sahara): else: response="\n"+f"Error on writing {filename} to sector {str(start)}" connection.sendall(bytes(response,'utf-8')) + elif cmd=="wf": + if len(args) != 2: + response="\n"+"Usage: wf:," + connection.sendall(bytes(response,'utf-8')) + else: + lun = int(args[0]) + start = 0 + filename = args[1] + if not os.path.exists(filename): + response="\n"+f"Error: Couldn't find file: {filename}" + connection.sendall(bytes(response,'utf-8')) + else: + if fh.cmd_write(lun, start, filename) == True: + response="\n"+f"Wrote {filename} to sector {str(start)}." + connection.sendall(bytes(response,'utf-8')) + else: + response="\n"+f"Error on writing {filename} to sector {str(start)}" + connection.sendall(bytes(response,'utf-8')) elif cmd=="e": if len(args) != 2: response = "\n" + "Usage: e:," @@ -921,8 +966,8 @@ def do_firehose_server(args,cdc,sahara): else: lun=int(args[0]) partitionname = args[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(mainargs["--gpt-num-part-entries"]), int(mainargs["--gpt-part-entry-size"]), + int(mainargs["--gpt-part-entry-start-lba"])) if guid_gpt == None: response = "\n" + f"Error: Couldn't reading GPT Table" connection.sendall(bytes(response, 'utf-8')) @@ -939,7 +984,7 @@ def do_firehose_server(args,cdc,sahara): connection.sendall(bytes(response,'utf-8')) elif cmd=="es": if len(args) != 3: - response="\n"+"Usage: ws:,," + response="\n"+"Usage: es:,," connection.sendall(bytes(response,'utf-8')) else: lun=int(args[0]) @@ -972,12 +1017,12 @@ def do_firehose_server(args,cdc,sahara): def handle_firehose(args, cdc, sahara): cfg = qualcomm_firehose.cfg() - cfg.MemoryName = args.memory + cfg.MemoryName = args["--memory"] cfg.ZLPAwareHost = 1 - cfg.SkipStorageInit = args.skipstorageinit - cfg.SkipWrite = args.skipwrite - cfg.MaxPayloadSizeToTargetInBytes = args.maxpayload - cfg.SECTOR_SIZE_IN_BYTES = args.sectorsize + cfg.SkipStorageInit = args["--skipstorageinit"] + cfg.SkipWrite = args["--skipwrite"] + cfg.MaxPayloadSizeToTargetInBytes = int(args["--maxpayload"],16) + cfg.SECTOR_SIZE_IN_BYTES = int(args["--sectorsize"],16) cfg.bit64 = sahara.bit64 fh = qualcomm_firehose(cdc, xmlparser(), cfg) supported_functions = fh.connect(0) @@ -987,34 +1032,28 @@ def handle_firehose(args, cdc, sahara): if hwid in msmids: TargetName = msmids[hwid] - if len(args.gpt) != 0: - if len(args.gpt) != 2: - print("Usage: -gpt ") - exit(0) - lun=args.gpt[0] - filename = args.gpt[1] + if args["gpt"]: + lun=int(args[""]) + filename = args[""] fh.cmd_read(lun, 0, 0x4000 // cfg.SECTOR_SIZE_IN_BYTES, filename) print(f"Dumped GPT to {filename}") exit(0) - elif args.printgpt!="": - lun=int(args.printgpt) - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + elif args["printgpt"]: + lun=int(args[""]) + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: guid_gpt.print() exit(0) - elif len(args.r) != 0: - if len(args.r) != 3: - print("Usage: -r ") - exit(0) - lun = int(args.r[0]) - partitionname = args.r[1] - filename = args.r[2] + elif args["r"]: + lun = int(args["--lun"]) + partitionname = args[""] + filename = args[""] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: @@ -1028,14 +1067,11 @@ def handle_firehose(args, cdc, sahara): for partition in guid_gpt.partentries: print(partition.name) exit(0) - elif len(args.rl) != 0: - if len(args.rl) != 2: - print("Usage: -rl ") - exit(0) - lun = int(args.rl[0]) - directory = args.rl[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + elif args["rl"]: + lun = int(args["--lun"]) + directory = args[""] + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: @@ -1048,26 +1084,23 @@ def handle_firehose(args, cdc, sahara): fh.cmd_read(lun, partition.sector, partition.sectors, filename) exit(0) exit(0) - elif len(args.rf) != 0: - if len(args.r) != 2: - print("Usage: -r ") - exit(0) - lun = int(args.rf[0]) - filename = args.rf[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + elif args["rf"]: + lun = int(args["--lun"]) + filename = args[""] + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: fh.cmd_read(lun, 0, guid_gpt.totalsectors, filename) print(f"Dumped sector 0 with sector count {str(guid_gpt.totalsectors)} as {filename}.") exit(0) - elif args.pbl != '': + elif args["pbl"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - filename = args.pbl + filename = args[""] if TargetName in infotbl: v = infotbl[TargetName] if len(v[0]) > 0: @@ -1080,12 +1113,12 @@ def handle_firehose(args, cdc, sahara): logger.error("Unknown target chipset") logger.error("Error on dumping pbl") exit(0) - elif args.qfp != '': + elif args["qfp"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - filename = args.qfp + filename = args[""] if TargetName in infotbl: v = infotbl[TargetName] if len(v[1]) > 0: @@ -1098,7 +1131,7 @@ def handle_firehose(args, cdc, sahara): logger.error("Unknown target chipset") logger.error("Error on dumping qfprom") exit(0) - elif args.secureboot == True: + elif args["secureboot"] == True: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) @@ -1123,12 +1156,12 @@ def handle_firehose(args, cdc, sahara): else: logger.error("Unknown target chipset") exit(0) - elif args.memtbl != '': + elif args["memtbl"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - filename = args.memtbl + filename = args[""] if TargetName in infotbl: v = infotbl[TargetName] if len(v[2]) > 0: @@ -1141,14 +1174,11 @@ def handle_firehose(args, cdc, sahara): logger.error("Unknown target chipset") logger.error("Error on dumping memtbl") exit(0) - elif len(args.footer) != 0: - if len(args.footer) != 2: - print("Usage: -footer ") - exit(0) - lun = int(args.footer[0]) - filename = args.footer[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + elif args["footer"]: + lun = int(args["--lun"]) + filename = args[""] + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: @@ -1168,98 +1198,77 @@ def handle_firehose(args, cdc, sahara): else: logger.error(f"Error: Couldn't detect partition: {partition.name}") exit(0) - elif len(args.rs) != 0: - if len(args.rs) != 4: - print("Usage: -rs ") - exit(0) - lun = int(args.rs[0]) - start = int(args.rs[1]) - sectors = int(args.rs[2]) - filename = args.rs[3] + elif args["rs"]: + lun = int(args["--lun"]) + start = int(args[""]) + sectors = int(args[""]) + filename = args[" ") - exit(0) + elif args["peek"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - offset = int(args.peek[0], 16) - length = int(args.peek[1], 16) - filename = args.peek[2] + offset = int(args[""], 16) + length = int(args[""], 16) + filename = args[" ") - exit(0) + elif args["peekhex"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - offset = int(args.peekhex[0], 16) - length = int(args.peekhex[1], 16) + offset = int(args[""], 16) + length = int(args[""], 16) resp=fh.cmd_peek(offset, length, "",True) print("\n") print(hexlify(resp)) exit(0) - elif len(args.peekqword) != 0: - if len(args.peekqword) != 1: - print("Usage: -peekqword ") - exit(0) + elif args["peekqword"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - offset = int(args.peekqword[0], 16) + offset = int(args[""], 16) resp=fh.cmd_peek(offset, 8, "",True) print("\n") print(hex(unpack("") - exit(0) + elif args["peekdword"]: if not check_cmd(supported_functions,"peek"): logger.error("Peek command isn't supported by edl loader") exit(0) else: - offset = int(args.peekdword[0], 16) + offset = int(args[""], 16) resp=fh.cmd_peek(offset, 4, "",True) print("\n") print(hex(unpack(""] resp=fh.cmd_send(command,True) print("\n") print(resp) exit(0) - elif len(args.poke) != 0: - if len(args.poke) != 2: - print("Usage: -poke ") - exit(0) + elif args["poke"]: if not check_cmd(supported_functions,"poke"): logger.error("Poke command isn't supported by edl loader") exit(0) else: - offset = int(args.poke[0], 16) - filename = unhexlify(args.poke[1]) + offset = int(args[""], 16) + filename = unhexlify(args[""]) fh.cmd_poke(offset, "", filename, True) exit(0) - elif len(args.pokehex) != 0: - if len(args.pokehex) != 2: - print("Usage: -pokehex ") - exit(0) + elif args["pokehex"]: if not check_cmd(supported_functions,"poke"): logger.error("Poke command isn't supported by edl loader") exit(0) else: - offset = int(args.pokehex[0], 16) - data = unhexlify(args.pokehex[1]) + offset = int(args[""], 16) + data = unhexlify(args[""]) fh.cmd_poke(offset, data, "", True) resp = fh.cmd_peek(offset, len(data), "", True) if resp==data: @@ -1267,70 +1276,61 @@ def handle_firehose(args, cdc, sahara): else: print("Sending data failed") exit(0) - elif len(args.pokeqword) != 0: - if len(args.pokeqword) != 2: - print("Usage: -pokeqword ") - exit(0) + elif args["pokeqword"]: if not check_cmd(supported_functions,"poke"): logger.error("Poke command isn't supported by edl loader") exit(0) else: - offset = int(args.pokeqword[0], 16) - data = pack(""], 16) + data = pack(""],16)) fh.cmd_poke(offset, data, "", True) resp = fh.cmd_peek(offset, 8, "", True) print(hex(unpack(" ") - exit(0) + elif args["pokedword"]: if not check_cmd(supported_functions,"poke"): logger.error("Poke command isn't supported by edl loader") exit(0) else: - offset = int(args.pokedword[0], 16) - data = pack(""], 16) + data = pack(""], 16)) fh.cmd_poke(offset, data, "", True) resp = fh.cmd_peek(offset, 4, "", True) print(hex(unpack(""])) exit(0) - elif args.getstorageinfo: + elif args["getstorageinfo"]: if not check_cmd(supported_functions,"getstorageinfo"): logger.error("getstorageinfo command isn't supported by edl loader") exit(0) else: fh.cmd_getstorageinfo() exit(0) - elif len(args.w) != 0: - if len(args.w) != 3: - print("Usage: -w ") - exit(0) - lun = int(args.w[0]) - partitionname = args.w[1] - filename = args.w[2] + elif args["w"]: + lun = int(args["--lun"]) + partitionname = args[""] + filename = args[""] if not os.path.exists(filename): logger.error(f"Error: Couldn't find file: {filename}") exit(0) - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: @@ -1350,13 +1350,44 @@ def handle_firehose(args, cdc, sahara): else: print("Couldn't write partition. Either wrong memorytype given or no gpt partition.") exit(0) - elif len(args.ws) != 0: - if len(args.ws) != 3: - print("Usage: -ws ") + elif args["wl"]: + directory=args[""] + lun = int(args["--lun"]) + if not os.path.exists(directory): + logger.error(f"Error: Couldn't find directory: {directory}") exit(0) - lun = int(args.ws[0]) - start = int(args.ws[1]) - filename = args.ws[2] + filenames = [] + for dirName, subdirList, fileList in os.walk(directory): + for fname in fileList: + filenames.append(os.path.join(dirName, fname)) + + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) + if guid_gpt == None: + logger.error("Error on reading GPT, maybe wrong memoryname given ?") + else: + if "partentries" in dir(guid_gpt): + for filename in filenames: + for partition in guid_gpt.partentries: + partname=filename[filename.rfind("/")+1:] + if ".bin" in partname[-4:]: + partname=partname[:-4] + if partition.name == partname: + sectors = os.stat(filename).st_size // fh.cfg.SECTOR_SIZE_IN_BYTES + if (os.stat(filename).st_size % fh.cfg.SECTOR_SIZE_IN_BYTES) > 0: + sectors += 1 + if sectors > partition.sectors: + logger.error(f"Error: {filename} has {sectors} sectors but partition only has {partition.sectors}.") + exit(0) + print(f"Writing {filename} to partition {str(partition.name)}.") + fh.cmd_write(lun, partition.sector, filename) + else: + print("Couldn't write partition. Either wrong memorytype given or no gpt partition.") + exit(0) + elif args["ws"]: + lun = int(args["--lun"]) + start = int(args[""]) + filename = args[""] if not os.path.exists(filename): logger.error(f"Error: Couldn't find file: {filename}") exit(0) @@ -1365,14 +1396,23 @@ def handle_firehose(args, cdc, sahara): else: logger.error(f"Error on writing {filename} to sector {str(start)}") exit(0) - elif len(args.e) != 0: - if len(args.e) != 2: - print("Usage: -e ") + elif args["wf"]: + lun = int(args["--lun"]) + start = 0 + filename = args[""] + if not os.path.exists(filename): + logger.error(f"Error: Couldn't find file: {filename}") exit(0) - lun = args.e[0] - partitionname = args.e[1] - guid_gpt = fh.get_gpt(lun, args.gpt_num_part_entries, args.gpt_part_entry_size, - args.gpt_part_entry_start_lba) + if fh.cmd_write(lun, start, filename) == True: + print(f"Wrote {filename} to sector {str(start)}.") + else: + logger.error(f"Error on writing {filename} to sector {str(start)}") + exit(0) + elif args["e"]: + lun = int(args["--lun"]) + partitionname = args[""] + guid_gpt = fh.get_gpt(lun, int(args["--gpt-num-part-entries"]), int(args["--gpt-part-entry-size"]), + int(args["--gpt-part-entry-start-lba"])) if guid_gpt == None: logger.error("Error on reading GPT, maybe wrong memoryname given ?") else: @@ -1388,20 +1428,17 @@ def handle_firehose(args, cdc, sahara): exit(0) logger.error(f"Error: Couldn't detect partition: {partitionname}") exit(0) - elif len(args.es) != 0: - if len(args.es) != 3: - print("Usage: -ws ") - exit(0) - lun = int(args.es[0]) - start = int(args.es[1]) - sectors = int(args.es[2]) + elif args["es"]: + lun = int(args["--lun"]) + start = int(args[""]) + sectors = int(args[""]) exit(0) - elif args.server != '': + elif args["server"]: do_firehose_server(args,cdc,sahara) exit(0) else: diff --git a/fhloaderparse.py b/fhloaderparse.py old mode 100644 new mode 100755 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e77ef8e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pyusb +pyserial +docopt diff --git a/tcpclient.py b/tcpclient.py index 673015b..544bbe4 100755 --- a/tcpclient.py +++ b/tcpclient.py @@ -6,7 +6,7 @@ class client(): self.commands=[] def send(self): - self.tcp = tcpclient() + self.tcp = tcpclient(1340) #define Port 1340 self.tcp.sendcommands(self.commands) def read(self,src):