From 084ab71db2ed6e924e6160130d27365a6f3ebb9f Mon Sep 17 00:00:00 2001 From: ColdWindScholar <3590361911@qq.com> Date: Mon, 10 Jun 2024 01:06:20 +0800 Subject: [PATCH] Reformat code Fix A bug in enableadb --- Example/tcpclient.py | 15 ++- edl | 21 ++-- edlclient/Library/Connection/devicehandler.py | 2 - edlclient/Library/Connection/seriallib.py | 33 +++-- edlclient/Library/Connection/usblib.py | 25 ++-- edlclient/Library/Modules/generic.py | 4 +- edlclient/Library/Modules/init.py | 5 +- edlclient/Library/Modules/nothing.py | 13 +- edlclient/Library/Modules/oneplus.py | 8 +- edlclient/Library/Modules/oneplus_param.py | 26 ++-- edlclient/Library/Modules/xiaomi.py | 2 +- edlclient/Library/cryptutils.py | 4 +- edlclient/Library/firehose.py | 114 +++++++++--------- edlclient/Library/firehose_client.py | 13 +- edlclient/Library/gpt.py | 15 +-- edlclient/Library/hdlc.py | 3 +- edlclient/Library/loader_db.py | 16 ++- edlclient/Library/memparse.py | 6 +- edlclient/Library/pt64.py | 1 + edlclient/Library/sahara.py | 15 +-- edlclient/Library/sahara_defs.py | 29 +++-- edlclient/Library/sparse.py | 4 +- edlclient/Library/streaming.py | 28 ++--- edlclient/Library/streaming_client.py | 6 +- edlclient/Library/streaming_defs.py | 111 ++++++++--------- edlclient/Library/utils.py | 23 ++-- edlclient/Library/xmlparser.py | 4 +- edlclient/Tools/beagle_to_loader | 76 ++++++------ edlclient/Tools/enableadb | 75 ++++++------ edlclient/Tools/fhloaderparse | 82 +++++++------ edlclient/Tools/qc_diag.py | 12 +- edlclient/Tools/sierrakeygen.py | 16 +-- edlclient/Tools/txt_to_loader.py | 88 +++++++------- 33 files changed, 462 insertions(+), 433 deletions(-) diff --git a/Example/tcpclient.py b/Example/tcpclient.py index 70353f8..b981217 100755 --- a/Example/tcpclient.py +++ b/Example/tcpclient.py @@ -1,25 +1,27 @@ #!/usr/bin/env python3 from edl.Library.tcpclient import tcpclient + class client(): def __init__(self): - self.commands=[] + self.commands = [] def send(self): self.tcp = tcpclient(1340) self.tcp.sendcommands(self.commands) - def read(self,src): + def read(self, src): self.commands.append(f"peekqword:{hex(src)}") - def write(self,dest,value): + def write(self, dest, value): self.commands.append(f"pokeqword:{hex(dest)},{hex(value)}") - def memcpy(self,dest,src,size): + def memcpy(self, dest, src, size): self.commands.append(f"memcpy:{hex(dest)},{hex(src)},{hex(size)}") + def main(): - exp=client() + exp = client() exp.commands = [ "send:nop", "r:boot,boot.img", @@ -29,5 +31,6 @@ def main(): ] exp.send() -if __name__=="__main__": + +if __name__ == "__main__": main() diff --git a/edl b/edl index 9f8018e..0e6ff86 100755 --- a/edl +++ b/edl @@ -127,28 +127,29 @@ Options: --resetmode=mode Resetmode for reset (poweroff, reset, edl, etc.) """ +import logging import os +import re +import subprocess import sys import time -import logging -import subprocess -import re + from docopt import docopt + from edlclient.Config.usb_ids import default_ids -from edlclient.Library.utils import LogBase -from edlclient.Library.Connection.usblib import usb_class from edlclient.Library.Connection.seriallib import serial_class -from edlclient.Library.sahara import sahara -from edlclient.Library.streaming_client import streaming_client +from edlclient.Library.Connection.usblib import usb_class from edlclient.Library.firehose_client import firehose_client -from edlclient.Library.streaming import Streaming +from edlclient.Library.sahara import sahara from edlclient.Library.sahara_defs import cmd_t, sahara_mode_t +from edlclient.Library.streaming import Streaming +from edlclient.Library.streaming_client import streaming_client +from edlclient.Library.utils import LogBase from edlclient.Library.utils import is_windows -from binascii import hexlify args = docopt(__doc__, version='3') -print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2023.") +print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2024.") def parse_cmd(rargs): diff --git a/edlclient/Library/Connection/devicehandler.py b/edlclient/Library/Connection/devicehandler.py index ab6d452..bc36ec2 100644 --- a/edlclient/Library/Connection/devicehandler.py +++ b/edlclient/Library/Connection/devicehandler.py @@ -5,8 +5,6 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import serial -import serial.tools.list_ports import inspect import traceback from binascii import hexlify diff --git a/edlclient/Library/Connection/seriallib.py b/edlclient/Library/Connection/seriallib.py index 95bd65b..956b5a4 100755 --- a/edlclient/Library/Connection/seriallib.py +++ b/edlclient/Library/Connection/seriallib.py @@ -5,25 +5,25 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import os.path -import time import sys + if not sys.platform.startswith('win32'): import termios - + def _reset_input_buffer(): return + def _reset_input_buffer_org(self): if not sys.platform.startswith('win32'): return termios.tcflush(self.fd, termios.TCIFLUSH) + import serial import serial.tools.list_ports import inspect -import traceback -from binascii import hexlify + try: from edlclient.Library.utils import * from edlclient.Library.Connection.devicehandler import DeviceClass @@ -38,13 +38,13 @@ class serial_class(DeviceClass): super().__init__(loglevel, portconfig, devclass) self.is_serial = True - def connect(self, EP_IN=-1, EP_OUT=-1, portname:str=""): + def connect(self, EP_IN=-1, EP_OUT=-1, portname: str = ""): if self.connected: self.close() self.connected = False if portname == "": - devices=self.detectdevices() - if len(devices)>0: + devices = self.detectdevices() + if len(devices) > 0: portname = devices[0] if portname != "": self.device = serial.Serial(baudrate=115200, bytesize=serial.EIGHTBITS, @@ -88,13 +88,12 @@ class serial_class(DeviceClass): self.debug("Break set") def setcontrollinestate(self, RTS=None, DTR=None, isFTDI=False): - if RTS==1: + if RTS == 1: self.device.setRTS(RTS) - if DTR==1: + if DTR == 1: self.device.setDTR(DTR) self.debug("Linecoding set") - def write(self, command, pktsize=None): if pktsize is None: pktsize = 512 @@ -169,18 +168,18 @@ class serial_class(DeviceClass): epr = self.device.read extend = res.extend if self.xmlread: - info=self.device.read(6) - bytestoread=resplen-len(info) + info = self.device.read(6) + bytestoread = resplen - len(info) extend(info) if b"": + while not b"response " in res or res[-7:] != b"": extend(epr(1)) return res bytestoread = resplen while len(res) < bytestoread: try: - val=epr(bytestoread) - if len(val)==0: + val = epr(bytestoread) + if len(val) == 0: break extend(val) except Exception as e: @@ -218,5 +217,3 @@ class serial_class(DeviceClass): self.device.flush() res = self.usbread(resplen) return res - - diff --git a/edlclient/Library/Connection/usblib.py b/edlclient/Library/Connection/usblib.py index 6fe2de6..3e178b3 100755 --- a/edlclient/Library/Connection/usblib.py +++ b/edlclient/Library/Connection/usblib.py @@ -5,26 +5,25 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import io -import logging - -import usb.core # pyusb -import usb.util -import time -import inspect import array -import usb.backend.libusb0 -from enum import Enum +import inspect +import logging from binascii import hexlify from ctypes import c_void_p, c_int +from enum import Enum + +import usb.backend.libusb0 +import usb.core # pyusb +import usb.util + try: from edlclient.Library.utils import * except: from Library.utils import * if not is_windows(): import usb.backend.libusb1 -from struct import pack, calcsize -import traceback +from struct import pack + try: from edlclient.Library.Connection.devicehandler import DeviceClass except: @@ -211,7 +210,7 @@ class usb_class(DeviceClass): def flush(self): return - def connect(self, EP_IN=-1, EP_OUT=-1, portname:str=""): + def connect(self, EP_IN=-1, EP_OUT=-1, portname: str = ""): if self.connected: self.close() self.connected = False @@ -389,7 +388,7 @@ class usb_class(DeviceClass): extend = res.extend while len(res) < resplen: try: - resplen=epr(buffer,timeout) + resplen = epr(buffer, timeout) extend(buffer[:resplen]) if resplen == self.EP_IN.wMaxPacketSize: break diff --git a/edlclient/Library/Modules/generic.py b/edlclient/Library/Modules/generic.py index adfd756..b8bbdc8 100644 --- a/edlclient/Library/Modules/generic.py +++ b/edlclient/Library/Modules/generic.py @@ -16,7 +16,7 @@ class generic(metaclass=LogBase): self.serial = serial self.args = args self.__logger.setLevel(loglevel) - self.error=self.__logger.error + self.error = self.__logger.error if loglevel == logging.DEBUG: logfilename = "log.txt" fh = logging.FileHandler(logfilename) @@ -27,7 +27,7 @@ class generic(metaclass=LogBase): if res[0]: lun = res[1] rpartition = res[2] - if rpartition.sectors <= (0x8000//self.fh.cfg.SECTOR_SIZE_IN_BYTES): + if rpartition.sectors <= (0x8000 // self.fh.cfg.SECTOR_SIZE_IN_BYTES): offsettopatch = 0x7FFF sector, offset = self.fh.calc_offset(rpartition.sector, offsettopatch) else: diff --git a/edlclient/Library/Modules/init.py b/edlclient/Library/Modules/init.py index 96089ed..5f4fe52 100644 --- a/edlclient/Library/Modules/init.py +++ b/edlclient/Library/Modules/init.py @@ -36,8 +36,9 @@ except ImportError as e: nothing = None pass + class modules(metaclass=LogBase): - def __init__(self, fh, serial:int, supported_functions, loglevel, devicemodel:str, args): + def __init__(self, fh, serial: int, supported_functions, loglevel, devicemodel: str, args): self.fh = fh self.args = args self.serial = serial @@ -132,7 +133,7 @@ class modules(metaclass=LogBase): if paramdata.data == b"": self.error("Error on reading param partition.") return False - wdata = self.ops.enable_ops(paramdata.data, enable,self.devicemodel,self.serial) + wdata = self.ops.enable_ops(paramdata.data, enable, self.devicemodel, self.serial) if wdata is not None: self.ops.run() if self.fh.cmd_program_buffer(lun, rpartition.sector, wdata, False): diff --git a/edlclient/Library/Modules/nothing.py b/edlclient/Library/Modules/nothing.py index 811a89c..cbeb79a 100644 --- a/edlclient/Library/Modules/nothing.py +++ b/edlclient/Library/Modules/nothing.py @@ -31,10 +31,11 @@ class nothing(metaclass=LogBase): if token1 is None: token1 = random.randbytes(32).hex() authresp = token1 + self.projid + ("%x" % self.serial) + self.hashverify - token2 = hashlib.sha256(bytes(authresp,'utf-8')).hexdigest()[:64] + token2 = hashlib.sha256(bytes(authresp, 'utf-8')).hexdigest()[:64] token3 = self.hashverify - return bytes(f"\n \n\n",'utf-8') - + return bytes( + f"\n \n\n", + 'utf-8') def ntprojectverify(self): """ @@ -57,9 +58,9 @@ class nothing(metaclass=LogBase): if __name__ == "__main__": nt = nothing(fh=None, projid="22111", serial=1729931115) - res=nt.generatetoken(token1="512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671") - org=b"\n \n\n" - if res!=org: + res = nt.generatetoken(token1="512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671") + org = b"\n \n\n" + if res != org: print("Error !") print(res) print(nt.generatetoken()) diff --git a/edlclient/Library/Modules/oneplus.py b/edlclient/Library/Modules/oneplus.py index 1054900..6d2216c 100755 --- a/edlclient/Library/Modules/oneplus.py +++ b/edlclient/Library/Modules/oneplus.py @@ -25,6 +25,7 @@ from struct import pack import logging from edlclient.Library.utils import LogBase from edlclient.Library.Modules.oneplus_param import paramtools + try: from edlclient.Library.cryptutils import cryptutils except Exception as e: @@ -128,7 +129,8 @@ deviceconfig = { class oneplus(metaclass=LogBase): - def __init__(self, fh, projid:str="18825", serial=123456, ATOBuild=0, Flash_Mode=0, cf=0, supported_functions=None, + def __init__(self, fh, projid: str = "18825", serial=123456, ATOBuild=0, Flash_Mode=0, cf=0, + supported_functions=None, args=None, loglevel=logging.INFO): self.fh = fh self.__logger = self.__logger @@ -203,7 +205,7 @@ class oneplus(metaclass=LogBase): else: assert "Device is not supported" exit(0) - assert "Unknown projid:"+str(projid) + assert "Unknown projid:" + str(projid) return None def run(self): @@ -446,7 +448,7 @@ class oneplus2(metaclass=LogBase): fh = logging.FileHandler(logfilename) self.__logger.addHandler(fh) - def crypt_token(self, data, pk, device_timestamp:int, decrypt=False): + def crypt_token(self, data, pk, device_timestamp: int, decrypt=False): aes = cryptutils().aes() aeskey = b"\x46\xA5\x97\x30\xBB\x0D\x41\xE8" + bytes(pk, 'utf-8') + \ pack(" 4: - if (length % 4): + if length % 4: i += 4 - (length % 4) def parse_data(self, i, itemdata, offset, param, sidindex, encrypted=False): @@ -467,10 +467,10 @@ class paramtools(): name = name + " " if "PWD Hash" in name: items = content.split(" ") - pwdhash = items[0][1:9]+"00000000000000000000000000000000000000000000000000000000" + pwdhash = items[0][1:9] + "00000000000000000000000000000000000000000000000000000000" valid = "True" if items[1] != "-1" else "False" flag = items[2] - date = items[3] + " "+ items[4][:-1] + date = items[3] + " " + items[4][:-1] content = f"{date} ({valid},{flag}): {pwdhash}" ff = f"SID_Index {hex(sidindex)}, Offset {offsetstr}: {name}: {content}" if encrypted: @@ -936,7 +936,7 @@ def main(): filename = args[""] mode = args["--mode"] serial = args["--serial"] - param = paramtools(mode,serial) + param = paramtools(mode, serial) with open(filename, 'rb') as rf: data = rf.read() param.parse_decrypted_fields(data) @@ -948,7 +948,7 @@ def main(): filename = args[""] mode = args["--mode"] serial = args["--serial"] - param = paramtools(mode,serial) + param = paramtools(mode, serial) with open(filename, 'rb') as rf: data = rf.read() with open(filename + ".patched", 'wb') as wf: @@ -965,7 +965,7 @@ def main(): value = int(args[""], 16) mode = args["--mode"] serial = args["--serial"] - param = paramtools(mode,serial) + param = paramtools(mode, serial) with open(filename, 'rb') as rf: data = rf.read() with open(filename + ".patched", 'wb') as wf: @@ -974,7 +974,7 @@ def main(): imei = args[""] mode = 0 serial = None - param = paramtools(mode,serial) + param = paramtools(mode, serial) print("oneplus Factory qr code generator (c) B. Kerler 2019\nGPLv3 License\n----------------------") print("Code : *#*#5646#*#* , *#808#, *#36446337# = com.android.engineeringmode.manualtest.DecryptActivity") results = param.gencode([imei, "YOU_CAN_PASS_NOW"]) diff --git a/edlclient/Library/Modules/xiaomi.py b/edlclient/Library/Modules/xiaomi.py index 53eadee..eab3f3d 100644 --- a/edlclient/Library/Modules/xiaomi.py +++ b/edlclient/Library/Modules/xiaomi.py @@ -43,7 +43,7 @@ class xiaomi(metaclass=LogBase): rsp = self.fh.xmlsend(self.xiaomi_authdata) if rsp.resp: if "value" in rsp.resp: - if rsp.resp["value"]=="ACK": + if rsp.resp["value"] == "ACK": if 'authenticated' in rsp.log[0].lower() and 'true' in rsp.log[0].lower(): return True return False diff --git a/edlclient/Library/cryptutils.py b/edlclient/Library/cryptutils.py index 9f7ff41..e4ef1ce 100755 --- a/edlclient/Library/cryptutils.py +++ b/edlclient/Library/cryptutils.py @@ -425,7 +425,7 @@ class cryptutils: return q def pss_verify(self, e, N, msghash, signature, emBits=1024, salt=None): - if salt == None: + if salt is None: slen = self.digestLen else: slen = len(salt) @@ -482,7 +482,7 @@ class cryptutils: else: return False - class hash(): + class hash: def __init__(self, hashtype="SHA256"): if hashtype == "SHA1": self.hash = self.sha1 diff --git a/edlclient/Library/firehose.py b/edlclient/Library/firehose.py index 799ed96..fb35375 100755 --- a/edlclient/Library/firehose.py +++ b/edlclient/Library/firehose.py @@ -7,27 +7,22 @@ # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! import binascii -import io +import json import os.path import platform -import time -import json -from struct import unpack from binascii import hexlify from queue import Queue from threading import Thread from edlclient.Library.Modules.nothing import nothing -from edlclient.Library.utils import * -from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE, MAX_PRIORITY, PART_ATT_PRIORITY_BIT -from edlclient.Library.gpt import PART_ATT_PRIORITY_VAL, PART_ATT_ACTIVE_VAL, PART_ATT_MAX_RETRY_COUNT_VAL, PART_ATT_SUCCESSFUL_VAL, PART_ATT_UNBOOTABLE_VAL +from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE from edlclient.Library.sparse import QCSparse +from edlclient.Library.utils import * from edlclient.Library.utils import progress -from queue import Queue -from threading import Thread rq = Queue() + def writedata(filename, rq): pos = 0 with open(filename, "wb") as wf: @@ -146,11 +141,10 @@ def writefile(wf, q, stop): break -class asyncwriter(): +class asyncwriter: def __init__(self, wf): self.writequeue = Queue() - self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,)) - self.worker.setDaemon(True) + self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,), daemon=True) self.stopthreads = False self.worker.start() @@ -216,9 +210,9 @@ class firehose(metaclass=LogBase): def detect_partition(self, arguments, partitionname, send_full=False): if arguments is None: arguments = { - "--gpt-num-part-entries" : 0, - "--gpt-part-entry-size" : 0, - "--gpt-part-entry-start-lba" : 0 + "--gpt-num-part-entries": 0, + "--gpt-part-entry-size": 0, + "--gpt-part-entry-start-lba": 0 } fpartitions = {} for lun in self.luns: @@ -231,7 +225,8 @@ class firehose(metaclass=LogBase): break else: if partitionname in guid_gpt.partentries: - return [True, lun, data, guid_gpt] if send_full else [True, lun, guid_gpt.partentries[partitionname]] + return [True, lun, data, guid_gpt] if send_full else [True, lun, + guid_gpt.partentries[partitionname]] for part in guid_gpt.partentries: fpartitions[lunname].append(part) return [False, fpartitions] @@ -682,7 +677,7 @@ class firehose(metaclass=LogBase): rsp = self.xml.getresponse(wd) if "value" in rsp: if rsp["value"] != "ACK": - if bytestoread!=0: + if bytestoread != 0: self.error(f"Error:") for line in info: self.error(line) @@ -1044,8 +1039,8 @@ class firehose(metaclass=LogBase): self.parse_storage() for function in self.supported_functions: if function == "checkntfeature": - if type(self.devicemodel)==list: - self.devicemodel=self.devicemodel[0] + if type(self.devicemodel) == list: + self.devicemodel = self.devicemodel[0] self.nothing = nothing(fh=self, projid=self.devicemodel, serial=self.serial, supported_functions=self.supported_functions, loglevel=self.loglevel) @@ -1148,7 +1143,7 @@ class firehose(metaclass=LogBase): try: serial = line.split("0x")[1][:-1] if ")" in serial: - serial=serial[:serial.rfind(")")] + serial = serial[:serial.rfind(")")] self.serial = int(serial, 16) except Exception as err: # pylint: disable=broad-except self.debug(str(err)) @@ -1182,8 +1177,8 @@ class firehose(metaclass=LogBase): "serial": self.serial } if os.path.exists("edl_config.json"): - data = json.loads(open("edl_config.json","rb").read().decode('utf-8')) - if "serial" in data and data["serial"]!=state["serial"]: + data = json.loads(open("edl_config.json", "rb").read().decode('utf-8')) + if "serial" in data and data["serial"] != state["serial"]: open("edl_config.json", "w").write(json.dumps(state)) else: self.supported_functions = data["supported_functions"] @@ -1318,26 +1313,29 @@ class firehose(metaclass=LogBase): if is_boot: #new_flags |= (PART_ATT_PRIORITY_VAL | PART_ATT_ACTIVE_VAL | PART_ATT_MAX_RETRY_COUNT_VAL) #new_flags &= (~PART_ATT_SUCCESSFUL_VAL & ~PART_ATT_UNBOOTABLE_VAL) - new_flags = 0x6f << (AB_FLAG_OFFSET*8) + new_flags = 0x6f << (AB_FLAG_OFFSET * 8) else: - new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8) + new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8) else: if is_boot: #new_flags &= (~PART_ATT_PRIORITY_VAL & ~PART_ATT_ACTIVE_VAL) #new_flags |= ((MAX_PRIORITY-1) << PART_ATT_PRIORITY_BIT) - new_flags = 0x3a << (AB_FLAG_OFFSET*8) + new_flags = 0x3a << (AB_FLAG_OFFSET * 8) else: - new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8)) + new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8)) return new_flags - def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status, slot_b_status, is_boot): + def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status, + slot_b_status, is_boot): part_entry_size = guid_gpt_a.header.part_entry_size rf_a = BytesIO(gpt_data_a) rf_b = BytesIO(gpt_data_b) - entryoffset_a = partition_a.entryoffset - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize) - entryoffset_b = partition_b.entryoffset - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize) + entryoffset_a = partition_a.entryoffset - ( + (guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize) + entryoffset_b = partition_b.entryoffset - ( + (guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize) rf_a.seek(entryoffset_a) rf_b.seek(entryoffset_b) @@ -1360,8 +1358,8 @@ class firehose(metaclass=LogBase): unpack_fmt = "> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE + active_a = ((partition_a.flags >> ( + AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE if (active_a and slot_a_status) or (not active_a and slot_b_status): return True @@ -1483,14 +1483,22 @@ class firehose(metaclass=LogBase): self.error(f"Cannot find partition {partitionname_b}") return False _, lun_b, gpt_data_b, guid_gpt_b = resp - backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0 , 0, guid_gpt_b.header.backup_lba) + backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0, 0, + guid_gpt_b.header.backup_lba) - if not check_gpt_hdr and partitionname_a[:3] != "xbl": # xbl partition don't need check consistency - sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a, backup_guid_gpt_a, gpt_data_a, backup_gpt_data_a) + if not check_gpt_hdr and partitionname_a[ + :3] != "xbl": # xbl partition don't need check consistency + sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a, + backup_guid_gpt_a, + gpt_data_a, + backup_gpt_data_a) if not sts: return False if lun_a != lun_b: - sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b, backup_guid_gpt_b, gpt_data_b, backup_gpt_data_b) + sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b, + backup_guid_gpt_b, + gpt_data_b, + backup_gpt_data_b) if not sts: return False check_gpt_hdr = True @@ -1513,8 +1521,6 @@ class firehose(metaclass=LogBase): return False return True - - def cmd_test(self, cmd): token = "1234" pk = "1234" diff --git a/edlclient/Library/firehose_client.py b/edlclient/Library/firehose_client.py index b236265..a970271 100644 --- a/edlclient/Library/firehose_client.py +++ b/edlclient/Library/firehose_client.py @@ -351,7 +351,7 @@ class firehose_client(metaclass=LogBase): start_sector=0, num_partition_sectors=1, display=False) if self.get_storage_info(): totalsectors = (self.cfg.block_size * - self.cfg.total_blocks ) // self.cfg.SECTOR_SIZE_IN_BYTES + self.cfg.total_blocks) // self.cfg.SECTOR_SIZE_IN_BYTES if len(luns) > 1: sfilename = filename + f".lun{str(lun)}" @@ -649,7 +649,8 @@ class firehose_client(metaclass=LogBase): prim_guid_gpt = res[3] _, backup_guid_gpt = self.firehose.get_gpt(lun, 0, 0, 0, prim_guid_gpt.header.backup_lba) partition = backup_guid_gpt.partentries["boot_a"] - active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE + active = ((partition.flags >> ( + AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE if active: self.printer("Current active slot: a") return True @@ -659,7 +660,8 @@ class firehose_client(metaclass=LogBase): prim_guid_gpt = res[3] _, backup_guid_gpt = self.firehose.get_gpt(lun, 0, 0, 0, prim_guid_gpt.header.backup_lba) partition = backup_guid_gpt.partentries["boot_b"] - active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE + active = ((partition.flags >> ( + AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE if active: self.printer("Current active slot: b") return True @@ -755,14 +757,15 @@ class firehose_client(metaclass=LogBase): filenames = [] if self.firehose.modules is not None: self.firehose.modules.writeprepare() - for fname in filter(os.path.isfile, [ os.path.join(directory, i) for i in os.listdir(directory) ]): + for fname in filter(os.path.isfile, [os.path.join(directory, i) for i in os.listdir(directory)]): filenames.append(fname) for lun in luns: data, guid_gpt = self.firehose.get_gpt(lun, int(options["--gpt-num-part-entries"]), int(options["--gpt-part-entry-size"]), int(options["--gpt-part-entry-start-lba"])) if guid_gpt is None: - self.error("Error: Can not fetch GPT table from device, you may need to use `edl w gpt` to write a partition table first.`") + self.error( + "Error: Can not fetch GPT table from device, you may need to use `edl w gpt` to write a partition table first.`") break for filename in filenames: partname = os.path.basename(filename) diff --git a/edlclient/Library/gpt.py b/edlclient/Library/gpt.py index a61a37e..9a9027e 100755 --- a/edlclient/Library/gpt.py +++ b/edlclient/Library/gpt.py @@ -5,18 +5,19 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import os -import sys import argparse -import colorama import copy import logging import logging.config -from enum import Enum -from binascii import hexlify -from struct import calcsize, unpack, pack -from io import BytesIO +import os +import sys from binascii import crc32 +from binascii import hexlify +from enum import Enum +from struct import calcsize, unpack, pack + +import colorama + class ColorFormatter(logging.Formatter): LOG_COLORS = { diff --git a/edlclient/Library/hdlc.py b/edlclient/Library/hdlc.py index b242717..7ea88bd 100755 --- a/edlclient/Library/hdlc.py +++ b/edlclient/Library/hdlc.py @@ -7,9 +7,8 @@ # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! import logging -from binascii import hexlify -from struct import unpack import time +from struct import unpack MAX_PACKET_LEN = 4096 diff --git a/edlclient/Library/loader_db.py b/edlclient/Library/loader_db.py index a1f1430..e4f599c 100644 --- a/edlclient/Library/loader_db.py +++ b/edlclient/Library/loader_db.py @@ -5,13 +5,11 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import binascii -import time +import inspect +import logging import os import sys -import logging -import inspect -from struct import unpack, pack + current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) @@ -22,6 +20,7 @@ except: from Library.utils import read_object, print_progress, rmrf, LogBase from Config.qualcomm_config import sochw, msmids, root_cert_hash + class loader_utils(metaclass=LogBase): def __init__(self, loglevel=logging.INFO): self.__logger = self.__logger @@ -38,7 +37,7 @@ class loader_utils(metaclass=LogBase): self.loaderdb = {} def init_loader_db(self): - for (dirpath, dirnames, filenames) in os.walk(os.path.join(parent_dir,"..","Loaders")): + for (dirpath, dirnames, filenames) in os.walk(os.path.join(parent_dir, "..", "Loaders")): for filename in filenames: fn = os.path.join(dirpath, filename) found = False @@ -52,13 +51,13 @@ class loader_utils(metaclass=LogBase): hwid = filename.split("_")[0].lower() msmid = hwid[:8] try: - int(msmid,16) + int(msmid, 16) except: continue devid = hwid[8:] if devid == '': continue - if len(filename.split("_"))<2: + if len(filename.split("_")) < 2: continue pkhash = filename.split("_")[1].lower() for msmid in self.convertmsmid(msmid): @@ -88,4 +87,3 @@ class loader_utils(metaclass=LogBase): rmsmid = '0' + rmsmid msmiddb.append(rmsmid) return msmiddb - diff --git a/edlclient/Library/memparse.py b/edlclient/Library/memparse.py index 493102c..d94158c 100755 --- a/edlclient/Library/memparse.py +++ b/edlclient/Library/memparse.py @@ -6,11 +6,11 @@ # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import os -import pt64 -import pt import argparse +import pt +import pt64 + def pt64_walk(data, ttbr, tnsz, levels=3): print("Dumping page tables (levels=%d)" % levels) diff --git a/edlclient/Library/pt64.py b/edlclient/Library/pt64.py index 4a82520..2201315 100755 --- a/edlclient/Library/pt64.py +++ b/edlclient/Library/pt64.py @@ -87,6 +87,7 @@ def get_fld(mfld, level): return table_entry4k(mfld, level) return None + class descriptor(object): def get_name(self): pass diff --git a/edlclient/Library/sahara.py b/edlclient/Library/sahara.py index 3a2871c..b474d7d 100755 --- a/edlclient/Library/sahara.py +++ b/edlclient/Library/sahara.py @@ -5,21 +5,22 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import binascii -import time +import inspect +import logging import os import sys -import logging -import inspect +import time from struct import pack + current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) -from edlclient.Library.utils import read_object, print_progress, rmrf, LogBase -from edlclient.Config.qualcomm_config import sochw, msmids, root_cert_hash +from edlclient.Library.utils import print_progress, rmrf, LogBase +from edlclient.Config.qualcomm_config import msmids, root_cert_hash from edlclient.Library.loader_db import loader_utils from edlclient.Library.sahara_defs import ErrorDesc, cmd_t, exec_cmd_t, sahara_mode_t, status_t, \ - CommandHandler, SAHARA_VERSION + CommandHandler + class sahara(metaclass=LogBase): def __init__(self, cdc, loglevel): diff --git a/edlclient/Library/sahara_defs.py b/edlclient/Library/sahara_defs.py index a5310f4..166257f 100644 --- a/edlclient/Library/sahara_defs.py +++ b/edlclient/Library/sahara_defs.py @@ -12,9 +12,11 @@ from io import BytesIO SAHARA_VERSION = 2 SAHARA_MIN_VERSION = 1 + class DataError(Exception): pass + class cmd_t: SAHARA_HELLO_REQ = 0x1 SAHARA_HELLO_RSP = 0x2 @@ -36,6 +38,7 @@ class cmd_t: SAHARA_64BIT_MEMORY_READ_DATA = 0x12 SAHARA_RESET_STATE_MACHINE_ID = 0x13 + class cmd_t_version: SAHARA_HELLO_REQ = 0x1 SAHARA_HELLO_RSP = 1 @@ -57,6 +60,7 @@ class cmd_t_version: SAHARA_64BIT_MEMORY_READ_DATA = 2 SAHARA_RESET_STATE_MACHINE_ID = 2 + class exec_cmd_t: SAHARA_EXEC_CMD_NOP = 0x00 SAHARA_EXEC_CMD_SERIAL_NUM_READ = 0x01 @@ -69,6 +73,7 @@ class exec_cmd_t: SAHARA_EXEC_CMD_GET_COMMAND_ID_LIST = 0x08 SAHARA_EXEC_CMD_GET_TRAINING_DATA = 0x09 + class sahara_mode_t: SAHARA_MODE_IMAGE_TX_PENDING = 0x0 SAHARA_MODE_IMAGE_TX_COMPLETE = 0x1 @@ -169,7 +174,7 @@ ErrorDesc = { class CommandHandler: def pkt_hello_req(self, data): - if len(data)<0xC * 0x4: + if len(data) < 0xC * 0x4: raise DataError st = structhelper_io(BytesIO(data)) @@ -190,7 +195,7 @@ class CommandHandler: return req def pkt_cmd_hdr(self, data): - if len(data)<2*4: + if len(data) < 2 * 4: raise DataError st = structhelper_io(BytesIO(data)) @@ -201,7 +206,7 @@ class CommandHandler: return req def pkt_read_data(self, data): - if len(data)<0x5 * 0x4: + if len(data) < 0x5 * 0x4: raise DataError st = structhelper_io(BytesIO(data)) @@ -215,7 +220,7 @@ class CommandHandler: return req def pkt_read_data_64(self, data): - if len(data)<0x8 + 0x3 * 0x8: + if len(data) < 0x8 + 0x3 * 0x8: raise DataError st = structhelper_io(BytesIO(data)) @@ -229,7 +234,7 @@ class CommandHandler: return req def pkt_memory_debug(self, data): - if len(data)<0x8 + 0x2 * 0x4: + if len(data) < 0x8 + 0x2 * 0x4: raise DataError st = structhelper_io(BytesIO(data)) @@ -242,7 +247,7 @@ class CommandHandler: return req def pkt_memory_debug_64(self, data): - if len(data)<0x8 + 0x2 * 0x8: + if len(data) < 0x8 + 0x2 * 0x8: raise DataError st = structhelper_io(BytesIO(data)) @@ -255,7 +260,7 @@ class CommandHandler: return req def pkt_execute_rsp_cmd(self, data): - if len(data)<0x4 * 0x4: + if len(data) < 0x4 * 0x4: raise DataError st = structhelper_io(BytesIO(data)) @@ -268,7 +273,7 @@ class CommandHandler: return req def pkt_image_end(self, data): - if len(data)<0x4 * 0x4: + if len(data) < 0x4 * 0x4: raise DataError st = structhelper_io(BytesIO(data)) @@ -281,7 +286,7 @@ class CommandHandler: return req def pkt_done(self, data): - if len(data)<0x3 * 4: + if len(data) < 0x3 * 4: raise DataError st = structhelper_io(BytesIO(data)) @@ -293,7 +298,7 @@ class CommandHandler: return req def pkt_info(self, data): - if len(data)<0x3 * 4 + 0x20: + if len(data) < 0x3 * 4 + 0x20: raise DataError st = structhelper_io(BytesIO(data)) @@ -306,7 +311,7 @@ class CommandHandler: return req def parttbl(self, data): - if len(data)<(0x3 * 4) + 20 + 20: + if len(data) < (0x3 * 4) + 20 + 20: raise DataError st = structhelper_io(BytesIO(data)) @@ -320,7 +325,7 @@ class CommandHandler: return req def parttbl_64bit(self, data): - if len(data)<(0x3 * 8) + 20 + 20: + if len(data) < (0x3 * 8) + 20 + 20: raise DataError st = structhelper_io(BytesIO(data)) diff --git a/edlclient/Library/sparse.py b/edlclient/Library/sparse.py index 310d7ea..e6909b9 100755 --- a/edlclient/Library/sparse.py +++ b/edlclient/Library/sparse.py @@ -5,13 +5,13 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! +import inspect import logging -import sys import os import sys -import inspect from queue import Queue from struct import unpack + current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) diff --git a/edlclient/Library/streaming.py b/edlclient/Library/streaming.py index b745f66..0cbe1b9 100755 --- a/edlclient/Library/streaming.py +++ b/edlclient/Library/streaming.py @@ -168,7 +168,7 @@ class Streaming(metaclass=LogBase): def send_section_header(self, name): # 0x1b open muliimage, 0xe for user-defined partition - resp = self.send(b"\x1b\x0e" + bytes("0:"+name, 'utf-8') + b"\x00") + resp = self.send(b"\x1b\x0e" + bytes("0:" + name, 'utf-8') + b"\x00") if resp[0] == 0x1c: return True self.error("Error on sending section header") @@ -186,7 +186,7 @@ class Streaming(metaclass=LogBase): return True return False - def write_flash(self, lba:int=0, partname="", filename="", info=True): + def write_flash(self, lba: int = 0, partname="", filename="", info=True): wbsize = 1024 filesize = os.stat(filename).st_size total = filesize @@ -197,8 +197,8 @@ class Streaming(metaclass=LogBase): adr = lba while filesize > 0: subdata = rf.read(wbsize) - if len(subdata)<1024: - subdata += (1024-len(subdata))*b'\xFF' + if len(subdata) < 1024: + subdata += (1024 - len(subdata)) * b'\xFF' scmd = b"\x07" + pack(""] self.printer(f"Dumping Flash from sector 0 to sector {hex(sectors)}...") if self.streaming.read_sectors(sector, sectors, filename, True): @@ -323,7 +323,7 @@ class streaming_client(metaclass=LogBase): self.error(f"Error: Couldn't find partition file: {partitionfilename}") return False else: - ptable = open(partitionfilename,"rb").read() + ptable = open(partitionfilename, "rb").read() else: self.error("Partition file is needed for writing (--partitionfilename)") sys.exit(1) diff --git a/edlclient/Library/streaming_defs.py b/edlclient/Library/streaming_defs.py index 420a643..baee5d8 100644 --- a/edlclient/Library/streaming_defs.py +++ b/edlclient/Library/streaming_defs.py @@ -6,63 +6,64 @@ # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! class open_mode_type: - OPEN_MODE_NONE = 0x00 # Not opened yet - OPEN_BOOTLOADER = 0x01 # Bootloader Image - OPEN_BOOTABLE = 0x02 # Bootable Image - OPEN_CEFS = 0x03 # CEFS Image - OPEN_MODE_FACTORY = 0x04 # Factory Image + OPEN_MODE_NONE = 0x00 # Not opened yet + OPEN_BOOTLOADER = 0x01 # Bootloader Image + OPEN_BOOTABLE = 0x02 # Bootable Image + OPEN_CEFS = 0x03 # CEFS Image + OPEN_MODE_FACTORY = 0x04 # Factory Image class open_multi_mode_type: - OPEN_MULTI_MODE_NONE = 0x00 # Not opened yet - OPEN_MULTI_MODE_PBL = 0x01 # Primary Boot Loader - OPEN_MULTI_MODE_QCSBLHDCFG = 0x02 # QC 2ndary Boot Loader Header and Config Data - OPEN_MULTI_MODE_QCSBL = 0x03 # QC 2ndary Boot Loader - OPEN_MULTI_MODE_OEMSBL = 0x04 # OEM 2ndary Boot Loader - OPEN_MULTI_MODE_AMSS = 0x05 # AMSS modem executable - OPEN_MULTI_MODE_APPS = 0x06 # APPS executable - OPEN_MULTI_MODE_OBL = 0x07 # OTP Boot Loader - OPEN_MULTI_MODE_FOTAUI = 0x08 # FOTA UI binarh - OPEN_MULTI_MODE_CEFS = 0x09 # Modem CEFS image - OPEN_MULTI_MODE_APPSBL = 0x0A # APPS Boot Loader - OPEN_MULTI_MODE_APPS_CEFS = 0x0B # APPS CEFS image - OPEN_MULTI_MODE_FLASH_BIN = 0x0C # Flash.bin image for Windows mobile - OPEN_MULTI_MODE_DSP1 = 0x0D # DSP1 runtime image - OPEN_MULTI_MODE_CUSTOM = 0x0E # Image for user defined partition - OPEN_MULTI_MODE_DBL = 0x0F # DBL Image for SB Architecture 2.0 - OPEN_MULTI_MODE_OSBL = 0x10 # OSBL Image for SB Architecture 2.0 - OPEN_MULTI_MODE_FSBL = 0x11 # FSBL Image for SB Architecture 2.0 - OPEN_MULTI_MODE_DSP2 = 0x12 # DSP2 executable - OPEN_MULTI_MODE_RAW = 0x13 # APPS EFS2 RAW image - OPEN_MULTI_MODE_EMMC_USER = 0x21 # EMMC USER partition - OPEN_MULTI_MODE_EMMC_BOOT0 = 0x22 # EMMC BOOT partition 0 - OPEN_MULTI_MODE_EMMC_BOOT1 = 0x23 # EMMC BOOT partition 1 - OPEN_MULTI_MODE_EMMC_RPMB = 0x24 # EMMC BOOT partition 1 - OPEN_MULTI_MODE_EMMC_GPP1 = 0x25 # EMMC GPP partition 1 - OPEN_MULTI_MODE_EMMC_GPP2 = 0x26 # EMMC GPP partition 2 - OPEN_MULTI_MODE_EMMC_GPP3 = 0x27 # EMMC GPP partition 3 - OPEN_MULTI_MODE_EMMC_GPP4 = 0x28 # EMMC GPP partition 4 + OPEN_MULTI_MODE_NONE = 0x00 # Not opened yet + OPEN_MULTI_MODE_PBL = 0x01 # Primary Boot Loader + OPEN_MULTI_MODE_QCSBLHDCFG = 0x02 # QC 2ndary Boot Loader Header and Config Data + OPEN_MULTI_MODE_QCSBL = 0x03 # QC 2ndary Boot Loader + OPEN_MULTI_MODE_OEMSBL = 0x04 # OEM 2ndary Boot Loader + OPEN_MULTI_MODE_AMSS = 0x05 # AMSS modem executable + OPEN_MULTI_MODE_APPS = 0x06 # APPS executable + OPEN_MULTI_MODE_OBL = 0x07 # OTP Boot Loader + OPEN_MULTI_MODE_FOTAUI = 0x08 # FOTA UI binarh + OPEN_MULTI_MODE_CEFS = 0x09 # Modem CEFS image + OPEN_MULTI_MODE_APPSBL = 0x0A # APPS Boot Loader + OPEN_MULTI_MODE_APPS_CEFS = 0x0B # APPS CEFS image + OPEN_MULTI_MODE_FLASH_BIN = 0x0C # Flash.bin image for Windows mobile + OPEN_MULTI_MODE_DSP1 = 0x0D # DSP1 runtime image + OPEN_MULTI_MODE_CUSTOM = 0x0E # Image for user defined partition + OPEN_MULTI_MODE_DBL = 0x0F # DBL Image for SB Architecture 2.0 + OPEN_MULTI_MODE_OSBL = 0x10 # OSBL Image for SB Architecture 2.0 + OPEN_MULTI_MODE_FSBL = 0x11 # FSBL Image for SB Architecture 2.0 + OPEN_MULTI_MODE_DSP2 = 0x12 # DSP2 executable + OPEN_MULTI_MODE_RAW = 0x13 # APPS EFS2 RAW image + OPEN_MULTI_MODE_EMMC_USER = 0x21 # EMMC USER partition + OPEN_MULTI_MODE_EMMC_BOOT0 = 0x22 # EMMC BOOT partition 0 + OPEN_MULTI_MODE_EMMC_BOOT1 = 0x23 # EMMC BOOT partition 1 + OPEN_MULTI_MODE_EMMC_RPMB = 0x24 # EMMC BOOT partition 1 + OPEN_MULTI_MODE_EMMC_GPP1 = 0x25 # EMMC GPP partition 1 + OPEN_MULTI_MODE_EMMC_GPP2 = 0x26 # EMMC GPP partition 2 + OPEN_MULTI_MODE_EMMC_GPP3 = 0x27 # EMMC GPP partition 3 + OPEN_MULTI_MODE_EMMC_GPP4 = 0x28 # EMMC GPP partition 4 + class response_code_type: - ACK = 0x00 # Successful - RESERVED_1 = 0x01 # Reserved - NAK_INVALID_DEST = 0x02 # Failure: destination address is invalid. - NAK_INVALID_LEN = 0x03 # Failure: operation length is invalid. - NAK_EARLY_END = 0x04 # Failure: packet was too short for this cmd. - NAK_INVALID_CMD = 0x05 # Failure: invalid command - RESERVED_6 = 0x06 # Reserved - NAK_FAILED = 0x07 # Failure: operation did not succeed. - NAK_WRONG_IID = 0x08 # Failure: intelligent ID code was wrong. - NAK_BAD_VPP = 0x09 # Failure: programming voltage out of spec - NAK_VERIFY_FAILED = 0x0A # Failure: readback verify did not match - RESERVED_0xB = 0x0B # Reserved - NAK_INVALID_SEC_CODE = 0x0C # Failure: Incorrect security code - NAK_CANT_POWER_DOWN = 0x0D # Failure: Cannot power down phone - NAK_NAND_NOT_SUPP = 0x0E # Failure: Download to NAND not supported - NAK_CMD_OUT_SEQ = 0x0F # Failure: Command out of sequence - NAK_CLOSE_FAILED = 0x10 # Failure: Close command failed - NAK_BAD_FEATURE_BITS = 0x11 # Failure: Incompatible Feature Bits - NAK_NO_SPACE = 0x12 # Failure: Out of space - NAK_INVALID_SEC_MODE = 0x13 # Failure: Multi-Image invalid security mode - NAK_MIBOOT_NOT_SUPP = 0x14 # Failure: Multi-Image boot not supported - NAK_PWROFF_NOT_SUPP = 0x15 # Failure: Power off not supported + ACK = 0x00 # Successful + RESERVED_1 = 0x01 # Reserved + NAK_INVALID_DEST = 0x02 # Failure: destination address is invalid. + NAK_INVALID_LEN = 0x03 # Failure: operation length is invalid. + NAK_EARLY_END = 0x04 # Failure: packet was too short for this cmd. + NAK_INVALID_CMD = 0x05 # Failure: invalid command + RESERVED_6 = 0x06 # Reserved + NAK_FAILED = 0x07 # Failure: operation did not succeed. + NAK_WRONG_IID = 0x08 # Failure: intelligent ID code was wrong. + NAK_BAD_VPP = 0x09 # Failure: programming voltage out of spec + NAK_VERIFY_FAILED = 0x0A # Failure: readback verify did not match + RESERVED_0xB = 0x0B # Reserved + NAK_INVALID_SEC_CODE = 0x0C # Failure: Incorrect security code + NAK_CANT_POWER_DOWN = 0x0D # Failure: Cannot power down phone + NAK_NAND_NOT_SUPP = 0x0E # Failure: Download to NAND not supported + NAK_CMD_OUT_SEQ = 0x0F # Failure: Command out of sequence + NAK_CLOSE_FAILED = 0x10 # Failure: Close command failed + NAK_BAD_FEATURE_BITS = 0x11 # Failure: Incompatible Feature Bits + NAK_NO_SPACE = 0x12 # Failure: Out of space + NAK_INVALID_SEC_MODE = 0x13 # Failure: Multi-Image invalid security mode + NAK_MIBOOT_NOT_SUPP = 0x14 # Failure: Multi-Image boot not supported + NAK_PWROFF_NOT_SUPP = 0x15 # Failure: Power off not supported diff --git a/edlclient/Library/utils.py b/edlclient/Library/utils.py index 05bd7fd..d6f0e2e 100755 --- a/edlclient/Library/utils.py +++ b/edlclient/Library/utils.py @@ -5,20 +5,21 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! -import sys +import codecs +import copy +import datetime as dt import logging import logging.config -import codecs -import struct import os import shutil import stat -import colorama -import copy -import datetime as dt +import struct +import sys import time from io import BytesIO -from struct import unpack, pack +from struct import unpack + +import colorama try: from capstone import * @@ -124,7 +125,7 @@ class progress: def calcProcessTime(self, starttime, cur_iter, max_iter): telapsed = time.time() - starttime if telapsed > 0 and cur_iter > 0: - testimated = (telapsed / cur_iter) * (max_iter) + testimated = (telapsed / cur_iter) * max_iter finishtime = starttime + testimated finishtime = dt.datetime.fromtimestamp(finishtime).strftime("%H:%M:%S") # in time lefttime = testimated - telapsed # in seconds @@ -575,7 +576,7 @@ class patchtools: badchars = self.has_bad_uart_chars(data) if not badchars: badchars = self.has_bad_uart_chars(data2) - if not (badchars): + if not badchars: return div div += 4 @@ -685,7 +686,7 @@ class patchtools: continue rt += 1 prep = data[rt:].find(t[i]) - if (prep != 0): + if prep != 0: error = 1 break rt += len(t[i]) @@ -699,7 +700,7 @@ class patchtools: return None -def read_object(data: object, definition: object) -> object: +def read_object(data: object, definition: object) -> dict: """ Unpacks a structure using the given data and definition. """ diff --git a/edlclient/Library/xmlparser.py b/edlclient/Library/xmlparser.py index 3c54f80..ac6bb97 100755 --- a/edlclient/Library/xmlparser.py +++ b/edlclient/Library/xmlparser.py @@ -17,7 +17,7 @@ class xmlparser: continue line = b"= 0: - out.append(itoa64[v&0x3f]) + out.append(itoa64[v & 0x3f]) v >>= 6 @@ -191,7 +192,8 @@ class connection: mode = "AT" break elif device.pid == 0x1403: - print(f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode") + print( + f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode") mode = "Web" self.ZTE_Web() break @@ -358,7 +360,7 @@ class adbtools(metaclass=LogBase): if cn.connected: while True: resp2 = cn.serial.read(8) - if len(resp2)>0: + if len(resp2) > 0: break cn.serial.write(mode) response = cn.serial.read(8) @@ -378,36 +380,33 @@ class adbtools(metaclass=LogBase): res = False if "vendor" in info: if info["vendor"] == "Sierra Wireless" or info["vendor"] == "Netgear": - res=self.SierraWireless(cn, info, enable) + res = self.SierraWireless(cn, info, enable) elif info["vendor"] == "Quectel": print("Sending at switch command") - res=self.Quectel(cn, enable) + res = self.Quectel(cn, enable) elif info["vendor"] == "ZTE": print("Sending switch command via diag") - res=self.ZTE(cn, enable) + res = self.ZTE(cn, enable) elif info["vendor"] == "Simcom": - res=self.Simcom(cn) + res = self.Simcom(cn, enable) elif info["vendor"] == "Fibocom": - res=self.Fibocom(cn, enable) + res = self.Fibocom(cn, enable) elif info["vendor"] == "Alcatel": - res=self.Alcatel(enable) + res = self.Alcatel(enable) elif info["vendor"] == "Samsung": - res=self.Samsung(cn, enable) - if enable: - mode="enabled" - else: - mode="disabled" + res = self.Samsung(cn, enable) + mode = "enabled" if enable else "disabled" if res: - print("ADB successfully "+mode) + print("ADB successfully " + mode) else: - print("ADB couldn't be "+mode) + print("ADB couldn't be " + mode) cn.close() else: print("No device detected") def SierraWireless(self, cn, info, enable): print("Sending at switch command") - kg = SierraKeygen(cn=cn,devicegeneration=None) + kg = SierraKeygen(cn=cn, devicegeneration=None) kg.detectdevicegeneration() if kg.openlock(): if enable: @@ -428,25 +427,25 @@ class adbtools(metaclass=LogBase): print("Successfully enabled PID 68E2") return True - index=-1 - type=-1 - bitmask=-1 - resp=cn.send("AT!USBCOMP?") - if resp!=-1: + index = -1 + type = -1 + bitmask = -1 + resp = cn.send("AT!USBCOMP?") + if resp != -1: print(resp) for val in resp: if "Config Index" in val: - index=val[val.find("Config Index: ")+14:] + index = val[val.find("Config Index: ") + 14:] elif "Config Type" in val: - type=val[val.find("Config Type: ")+14:].replace(" (Generic)","") + type = val[val.find("Config Type: ") + 14:].replace(" (Generic)", "") elif "Interface bitmask" in val: - bitmask=val[val.find("Interface bitmask: ")+19:] + bitmask = val[val.find("Interface bitmask: ") + 19:] if " " in bitmask: - bitmask="0x"+bitmask.split(" ")[0] - if index!=-1 and type!=-1 and bitmask!=1: - index=int(index) - type=int(type) - bitmask=int(bitmask,16) + bitmask = "0x" + bitmask.split(" ")[0] + if index != -1 and type != -1 and bitmask != 1: + index = int(index) + type = int(type) + bitmask = int(bitmask, 16) # AT!USBCOMP=,, # - configuration index to which the composition applies, should be 1 # - 1:Generic, 2:USBIF-MBIM, 3:RNDIS @@ -465,13 +464,13 @@ class adbtools(metaclass=LogBase): # ECM - 0x00080000, # UBIST - 0x00200000 #if enable: - cmd=f"AT!USBCOMP={index},{type},%08X" % 0x0080010E + cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0080010E #else: # cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0000010D - resp=cn.send(cmd) - if resp!=-1: - resp=cn.send("AT!RESET") - if resp!=-1: + resp = cn.send(cmd) + if resp != -1: + resp = cn.send("AT!RESET") + if resp != -1: return True return False return True @@ -600,7 +599,7 @@ def main(): else: enable = False ad.run(port=args.port, enable=enable) - #ad.meta(port=args.port) + # ad.meta(port=args.port) if __name__ == "__main__": diff --git a/edlclient/Tools/fhloaderparse b/edlclient/Tools/fhloaderparse index 9832f1c..3dc7c1c 100755 --- a/edlclient/Tools/fhloaderparse +++ b/edlclient/Tools/fhloaderparse @@ -5,25 +5,28 @@ # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! +import hashlib +import inspect import os import sys -from os import walk -import hashlib -from struct import unpack, pack -from shutil import copyfile -import os, sys, inspect from io import BytesIO -from Library.utils import elf -from Library.loader_db import loader_utils +from os import walk +from shutil import copyfile +from struct import unpack + from Config.qualcomm_config import vendor +from Library.loader_db import loader_utils +from Library.utils import elf + current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) -lu=loader_utils() +lu = loader_utils() + class MBN: def __init__(self, memory): self.imageid, self.flashpartitionversion, self.imagesrc, self.loadaddr, self.imagesz, self.codesz, \ - self.sigptr, self.sigsz, self.certptr, self.certsz = unpack("H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4 casignature2offset = signatureoffset + len1 len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4 - rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0] + rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split( + b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0] idx = signatureoffset signature = {} @@ -171,7 +175,8 @@ def extract_old_hdr(signatureoffset, sign_info, mem_section, code_size, signatur len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4 casignature2offset = signatureoffset + len1 len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4 - rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0] + rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split( + b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0] sign_info.pk_hash = hashlib.sha256(rootsignature3).hexdigest() idx = signatureoffset @@ -220,7 +225,7 @@ def extract_old_hdr(signatureoffset, sign_info, mem_section, code_size, signatur def init_loader_db(): loaderdb = {} - loaders=os.path.join(current_dir,"..","..","Loaders") + loaders = os.path.join(current_dir, "..", "..", "Loaders") if not os.path.exists(loaders): loaders = os.path.join(current_dir, "Loaders") if not os.path.exists(loaders): @@ -326,7 +331,7 @@ def main(argv): filelist = [] rt = open(os.path.join(outputdir, argv[1] + ".log"), "w") for filename in file_list: - filesize=os.stat(filename).st_size + filesize = os.stat(filename).st_size elfpos = 0 with open(filename, 'rb') as rhandle: data = rhandle.read() @@ -339,30 +344,30 @@ def main(argv): signinfo.filename = filename signinfo.filesize = os.stat(filename).st_size - while elfpos= 6: # SDM - signinfo = extract_hdr(memsection, version, signinfo, mem_section, code_size, signature_size, hdr1, + signinfo = extract_hdr(memsection, version, signinfo, mem_section, code_size, signature_size, + hdr1, hdr2, hdr3, hdr4) if signinfo is None: continue @@ -390,7 +396,7 @@ def main(argv): else: print("Unknown version for " + filename) continue - if elfpos == -1 and int.from_bytes(data[:4],'little') == 0x844BDCD1: + if elfpos == -1 and int.from_bytes(data[:4], 'little') == 0x844BDCD1: mbn = MBN(mem_section) if mbn.sigsz == 0: print("%s has no signature." % filename) @@ -414,13 +420,13 @@ def main(argv): loaderlists = {} for item in sorted_x: if item.oem_id != '': - oemid=int(item.oem_id,16) + oemid = int(item.oem_id, 16) if oemid in vendor: oeminfo = vendor[oemid] else: - oeminfo=item.oem_id - if len(item.sw_id)<16: - item.sw_id="0"*(16-len(item.sw_id))+item.sw_id + oeminfo = item.oem_id + if len(item.sw_id) < 16: + item.sw_id = "0" * (16 - len(item.sw_id)) + item.sw_id info = f"OEM:{oeminfo}\tMODEL:{item.model_id}\tHWID:{item.hw_id}\tSWID:{item.sw_id}\tSWSIZE:{item.sw_size}\tPK_HASH:{item.pk_hash}\t{item.filename}\t{str(item.filesize)}" if item.oem_version != '': info += "\tOEMVER:" + item.oem_version + "\tQCVER:" + item.qc_version + "\tVAR:" + item.image_variant @@ -444,11 +450,11 @@ def main(argv): if b"peek\x00" in data: auth += "_peek" fna = os.path.join(outputdir, ( - hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower()) + hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower()) if not os.path.exists(fna): copyfile(item.filename, os.path.join(outputdir, hwid + "_" + ( - loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower())) + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower())) elif item.filesize > os.stat(fna).st_size: copyfile(item.filename, os.path.join(outputdir, (hwid + "_" + loader_info.pk_hash[ @@ -470,7 +476,7 @@ def main(argv): except: continue else: - print("Unknown :"+item.filename) + print("Unknown :" + item.filename) copyfile(item.filename, os.path.join(outputdir, "Unknown", os.path.basename(item.filename).lower())) for item in filelist: diff --git a/edlclient/Tools/qc_diag.py b/edlclient/Tools/qc_diag.py index c04b1ac..b23c7e5 100755 --- a/edlclient/Tools/qc_diag.py +++ b/edlclient/Tools/qc_diag.py @@ -90,7 +90,7 @@ subnvitem_type = [ ] -class fs_factimage_read_info(): +class fs_factimage_read_info: def_fs_factimage_read_info = [ ("stream_state", "B"), # 0 indicates no more data to be sent, otherwise set to 1 ("info_cluster_sent", "B"), # 0 indicates if info_cluster was not sent, else 1 @@ -117,7 +117,7 @@ class fs_factimage_read_info(): return data -class FactoryHeader(): +class FactoryHeader: def_factory_header = [ ("magic1", "I"), ("magic2", "I"), @@ -160,7 +160,7 @@ class FactoryHeader(): return data -class nvitem(): +class nvitem: item = 0x0 data = b"" status = 0x0 @@ -409,7 +409,7 @@ class qcdiag(metaclass=LogBase): self.cdc.close(True) def send(self, cmd): - if self.hdlc != None: + if self.hdlc is not None: return self.hdlc.send_cmd_np(cmd) def cmd_info(self): @@ -809,7 +809,7 @@ class qcdiag(metaclass=LogBase): return False write_handle.close() - if efserr == False: + if not efserr: print("Successfully read EFS.") return True else: @@ -1408,7 +1408,7 @@ def main(): parser_nvwritesub.add_argument("-debugmode", help="[Option] Enable verbose logging", action="store_true") parser_writeimei = subparser.add_parser("writeimei", help="Write imei") - parser_writeimei.add_argument("imei", metavar=(""), help="[Option] IMEI to write", default="") + parser_writeimei.add_argument("imei", metavar="", help="[Option] IMEI to write", default="") parser_writeimei.add_argument("-vid", metavar="", help="[Option] Specify vid", default="") parser_writeimei.add_argument("-pid", metavar="", help="[Option] Specify pid", default="") parser_writeimei.add_argument("-interface", metavar="", help="[Option] Specify interface number, default=0)", diff --git a/edlclient/Tools/sierrakeygen.py b/edlclient/Tools/sierrakeygen.py index 98e312a..00f32df 100755 --- a/edlclient/Tools/sierrakeygen.py +++ b/edlclient/Tools/sierrakeygen.py @@ -17,6 +17,7 @@ import logging.config import logging.handlers import colorama + class ColorFormatter(logging.Formatter): LOG_COLORS = { logging.ERROR: colorama.Fore.RED, @@ -153,8 +154,8 @@ infotable = { "SDX65": ["MR6400", "MR6500", "MR6110", "MR6150", "MR6450", "MR6550"] } - # 0 MC8775_H2.0.8.19 !OPENLOCK, !OPENCND .. MC8765V,MC8765,MC8755V,MC8775,MC8775V,MC8775,AC850, - # AC860,AC875,AC881,AC881U,AC875, AC340U 1.13.12.14 +# 0 MC8775_H2.0.8.19 !OPENLOCK, !OPENCND .. MC8765V,MC8765,MC8755V,MC8775,MC8775V,MC8775,AC850, +# AC860,AC875,AC881,AC881U,AC875, AC340U 1.13.12.14 keytable = bytearray([0xF0, 0x14, 0x55, 0x0D, 0x5E, 0xDA, 0x92, 0xB3, 0xA7, 0x6C, 0xCE, 0x84, 0x90, 0xBC, 0x7F, 0xED, # 1 MC8775_H2.0.8.19 AC340U, OPENMEP default 0x61, 0x94, 0xCE, 0xA7, 0xB0, 0xEA, 0x4F, 0x0A, 0x73, 0xC5, 0xC3, 0xA6, 0x5E, 0xEC, 0x1C, 0xE2, @@ -210,7 +211,8 @@ keytable = bytearray([0xF0, 0x14, 0x55, 0x0D, 0x5E, 0xDA, 0x92, 0xB3, 0xA7, 0x6C 0x46, 0x30, 0x33, 0x43, 0x44, 0x36, 0x42, 0x34, 0x41, 0x32, 0x31, 0x32, 0x30, 0x35, 0x39, 0x37 ]) -class SierraGenerator(): + +class SierraGenerator: tbl = bytearray() rtbl = bytearray() devicegeneration = None @@ -265,7 +267,7 @@ class SierraGenerator(): {"challenge": "BE96CBBEE0829BCA", "devicegeneration": "MDM9200", "response": "EEDBF8BFF8DAE346"}, {"challenge": "20E253156762DACE", "devicegeneration": "SDX55", "response": "03940D7067145323"}, {"challenge": "2387885E7D290FEE", "devicegeneration": "MDM9x15A", "response": "DC3E51897BAA9C1E"}, - {"challenge": "4B1FEF9FD43C6DAA", "devicegeneration": "SDX65", "response":"1253C1B1E447B697"} + {"challenge": "4B1FEF9FD43C6DAA", "devicegeneration": "SDX65", "response": "1253C1B1E447B697"} ] for test in test_table: challenge = test["challenge"] @@ -425,7 +427,7 @@ class connection: def readreply(self): info = [] if self.serial is not None: - while (True): + while True: tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '') if "OK" in info: return info @@ -464,7 +466,7 @@ class SierraKeygen(metaclass=LogBase): def __init__(self, cn, devicegeneration=None): self.cn = cn self.keygen = SierraGenerator() - if devicegeneration == None: + if devicegeneration is None: self.detectdevicegeneration() else: self.devicegeneration = devicegeneration @@ -528,7 +530,7 @@ class SierraKeygen(metaclass=LogBase): devicegeneration = "SDX55" else: # MR6400 NTGX65_10.04.13.03 devicegeneration = "SDX65" - # MR6550 NTGX65_12.01.31.00 + # MR6550 NTGX65_12.01.31.00 devicegeneration = "SDX65" else: devicegeneration = "" diff --git a/edlclient/Tools/txt_to_loader.py b/edlclient/Tools/txt_to_loader.py index e469cd9..d4b0644 100755 --- a/edlclient/Tools/txt_to_loader.py +++ b/edlclient/Tools/txt_to_loader.py @@ -7,51 +7,53 @@ # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! # TXT to EDL Loader (c) B.Kerler 2023 -import os,sys +import os, sys from struct import unpack + def main(): - if len(sys.argv)<2: - print("Usage: ./txt_to_loader.py [log.txt] [loader.elf]") - sys.exit(0) - with open(sys.argv[1],"rb") as rf: - data=bytearray() - for line in rf.readlines(): - if line[0]==0x20: - tt=line.split(b" ")[:-1] - tt=tt[1:17] - xx=b"".join(tt) - data.extend(bytes.fromhex(xx.decode('utf-8'))) + if len(sys.argv) < 2: + print("Usage: ./txt_to_loader.py [log.txt] [loader.elf]") + sys.exit(0) + with open(sys.argv[1], "rb") as rf: + data = bytearray() + for line in rf.readlines(): + if line[0] == 0x20: + tt = line.split(b" ")[:-1] + tt = tt[1:17] + xx = b"".join(tt) + data.extend(bytes.fromhex(xx.decode('utf-8'))) - outdata=bytearray() - i=0 - seq=b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00" - with open(sys.argv[2], "wb") as wf: - while True: - idx=data.find(seq) - if idx==-1: - if i==0: - seq=b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00" - i+=1 - continue - else: - break - else: - cmd=unpack("