From 3433b60403516764ff06dfd48360e008d65f283f Mon Sep 17 00:00:00 2001 From: Bjoern Kerler Date: Mon, 1 Mar 2021 11:35:37 +0100 Subject: [PATCH] Beautify code, improve progress bar output --- Library/firehose.py | 15 ++-- Library/firehose_client.py | 17 ++-- Library/gpt.py | 15 ++-- Library/hdlc.py | 174 +++++++++++++++++++----------------- Library/memparse.py | 39 ++++---- Library/nand_config.py | 56 ++++++------ Library/pt.py | 22 ++--- Library/pt64.py | 14 +-- Library/sahara.py | 64 +++++++------ Library/sparse.py | 15 ++-- Library/streaming.py | 57 ++++++------ Library/streaming_client.py | 33 +++---- Library/utils.py | 1 + Library/xmlparser.py | 2 +- 14 files changed, 274 insertions(+), 250 deletions(-) diff --git a/Library/firehose.py b/Library/firehose.py index 02ca3a8..030c587 100755 --- a/Library/firehose.py +++ b/Library/firehose.py @@ -418,12 +418,13 @@ class firehose(metaclass=LogBase): self.cdc.write(wdata, wlen) - prog = int(float(total-bytestowrite) / float(total) * float(100)) + prog = round(float(total-bytestowrite) / float(total) * float(100),1) if prog > old: if display: print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)' % ((total-bytestowrite) // self.cfg.SECTOR_SIZE_IN_BYTES), bar_length=50) + old=prog self.cdc.write(b'', self.cfg.MaxPayloadSizeToTargetInBytes) # time.sleep(0.2) @@ -490,7 +491,7 @@ class firehose(metaclass=LogBase): wdata += b"\x00" * (filllen - wlen) wlen = len(wdata) self.cdc.write(wdata, wlen) - prog = int(float(pos) / float(total) * float(100)) + prog = round(float(pos) / float(total) * float(100),1) if prog > old: if display: print_progress(prog, 100, prefix='Progress:', suffix='Written (Sector %d)' @@ -544,7 +545,7 @@ class firehose(metaclass=LogBase): if bytesToWrite < wlen: wlen = bytesToWrite self.cdc.write(empty[0:wlen], self.cfg.MaxPayloadSizeToTargetInBytes) - prog = int(float(pos) / float(total) * float(100)) + prog = round(float(pos) / float(total) * float(100),1) if prog > old: if display: print_progress(prog, 100, prefix='Progress:', suffix='Erased (Sector %d)' @@ -608,7 +609,7 @@ class firehose(metaclass=LogBase): bytesToRead -= len(tmp) wr.write(tmp) if display: - prog = int(float(total-bytesToRead) / float(total) * float(100)) + prog = round(float(total-bytesToRead) / float(total) * float(100),1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Read (Sector %d)' % ((total-bytesToRead) // self.cfg.SECTOR_SIZE_IN_BYTES), @@ -671,7 +672,7 @@ class firehose(metaclass=LogBase): bytesToRead -= len(tmp) dataread += len(tmp) resData += tmp - prog = int(float(dataread) / float(total) * float(100)) + prog = round(float(dataread) / float(total) * float(100),1) if prog > old: if display: print_progress(prog, 100, prefix='Progress:', suffix='Read (Sector %d)' @@ -1136,7 +1137,7 @@ class firehose(metaclass=LogBase): datawritten += maxsize lengthtowrite -= maxsize if info: - prog = int(float(datawritten) / float(SizeInBytes) * float(100)) + prog = round(float(datawritten) / float(SizeInBytes) * float(100),1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) old = prog @@ -1210,7 +1211,7 @@ class firehose(metaclass=LogBase): else: resp += tmp2 if info: - prog = int(float(dataread) / float(SizeInBytes) * float(100)) + prog = round(float(dataread) / float(SizeInBytes) * float(100),1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) old = prog diff --git a/Library/firehose_client.py b/Library/firehose_client.py index 1e38070..af350ae 100644 --- a/Library/firehose_client.py +++ b/Library/firehose_client.py @@ -586,14 +586,15 @@ class firehose_client(metaclass=LogBase): self.printer(f"Error writing {filename} to sector {str(startsector)}.") return False else: - fpartitions = res[1] - self.error(f"Error: Couldn't detect partition: {partitionname}\nAvailable partitions:") - for lun in fpartitions: - for partition in fpartitions[lun]: - if self.cfg.MemoryName == "emmc": - self.error("\t" + partition) - else: - self.error(lun + ":\t" + partition) + if len(res) > 0: + fpartitions = res[1] + self.error(f"Error: Couldn't detect partition: {partitionname}\nAvailable partitions:") + for lun in fpartitions: + for partition in fpartitions[lun]: + if self.cfg.MemoryName == "emmc": + self.error("\t" + partition) + else: + self.error(lun + ":\t" + partition) return False elif cmd == "wl": if not self.check_param([""]): diff --git a/Library/gpt.py b/Library/gpt.py index 7af7a29..37ea589 100755 --- a/Library/gpt.py +++ b/Library/gpt.py @@ -119,6 +119,7 @@ class gpt(metaclass=LogBase): def __init__(self, num_part_entries=0, part_entry_size=0, part_entry_start_lba=0, loglevel=logging.INFO, *args, **kwargs): self.num_part_entries = num_part_entries + self.__logger = self.__logger self.part_entry_size = part_entry_size self.part_entry_start_lba = part_entry_start_lba self.totalsectors = None @@ -203,21 +204,15 @@ class gpt(metaclass=LogBase): return True def print(self): - print("\nGPT Table:\n-------------") - for partition in self.partentries: - print("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:08x}, UUID {}, Type {}".format( - partition.name + ":", partition.sector * self.sectorsize, partition.sectors * self.sectorsize, - partition.flags, partition.unique, partition.type)) - print("\nTotal disk size:0x{:016x}, sectors:0x{:016x}".format(self.totalsectors * self.sectorsize, - self.totalsectors)) + print(self.tostring()) def tostring(self): - mstr = "\nGPT Table:\n-------------" + mstr = "\nGPT Table:\n-------------\n" for partition in self.partentries: - mstr += ("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:08x}, UUID {}, Type {}".format( + mstr += ("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:08x}, UUID {}, Type {}\n".format( partition.name + ":", partition.sector * self.sectorsize, partition.sectors * self.sectorsize, partition.flags, partition.unique, partition.type)) - mstr += ("\nTotal disk size:0x{:016x}, sectors:0x{:016x}".format(self.totalsectors * self.sectorsize, + mstr += ("\nTotal disk size:0x{:016x}, sectors:0x{:016x}\n".format(self.totalsectors * self.sectorsize, self.totalsectors)) return mstr diff --git a/Library/hdlc.py b/Library/hdlc.py index 8bb5555..debb401 100755 --- a/Library/hdlc.py +++ b/Library/hdlc.py @@ -40,88 +40,96 @@ crcTbl = ( 0x7bc7, 0x6a4e, 0x58d5, 0x495c, 0x3de3, 0x2c6a, 0x1ef1, 0x0f78) +def serial16le(data): + out = bytearray() + out.append(data & 0xFF) + out.append((data >> 8) & 0xFF) + return out + + +def serial16(data): + out = bytearray() + out.append((data >> 8) & 0xFF) + out.append(data & 0xFF) + return out + + +def serial32le(data): + out = bytearray() + out += serial16le(data & 0xFFFF) + out += serial16le((data >> 16) & 0xFFFF) + return out + + +def crc16(iv, data): + for byte in data: + iv = ((iv >> 8) & 0xFFFF) ^ crcTbl[(iv ^ byte) & 0xFF] + return ~iv & 0xFFFF + + +def serial32(data): + out = bytearray() + out += serial16((data >> 16) & 0xFFFF) + out += serial16(data & 0xFFFF) + return out + + +def escape(indata): + outdata = bytearray() + for i in range(0, len(indata)): + buf = indata[i] + if buf == 0x7e: + outdata.append(0x7d) + outdata.append(0x5e) + elif buf == 0x7d: + outdata.append(0x7d) + outdata.append(0x5d) + else: + outdata.append(buf) + return outdata + + +def unescape(indata): + mescape = False + out = bytearray() + for buf in indata: + if mescape: + if buf == 0x5e: + out.append(0x7e) + elif buf == 0x5d: + out.append(0x7d) + else: + logging.error("Fatal error unescaping buffer!") + return None + mescape = False + else: + if buf == 0x7d: + mescape = True + else: + out.append(buf) + if len(out) == 0: + return None + return out + + +def convert_cmdbuf(indata): + crc16val = crc16(0xFFFF, indata) + indata.extend(bytearray(serial16le(crc16val))) + outdata = escape(indata) + outdata.append(0x7E) + return outdata + + class hdlc: def __init__(self, cdc): self.cdc = cdc self.programmer = None self.timeout = 1500 - def serial16(self, data): - out = bytearray() - out.append((data >> 8) & 0xFF) - out.append(data & 0xFF) - return out - - def serial16le(self, data): - out = bytearray() - out.append(data & 0xFF) - out.append((data >> 8) & 0xFF) - return out - - def serial32(self, data): - out = bytearray() - out += self.serial16((data >> 16) & 0xFFFF) - out += self.serial16(data & 0xFFFF) - return out - - def serial32le(self, data): - out = bytearray() - out += self.serial16le(data & 0xFFFF) - out += self.serial16le((data >> 16) & 0xFFFF) - return out - - def crc16(self, iv, data): - for byte in data: - iv = ((iv >> 8) & 0xFFFF) ^ crcTbl[(iv ^ byte) & 0xFF] - return ~iv & 0xFFFF - - def convert_cmdbuf(self, indata): - crc16val = self.crc16(0xFFFF, indata) - indata.extend(bytearray(self.serial16le(crc16val))) - outdata = self.escape(indata) - outdata.append(0x7E) - return outdata - - def escape(self, indata): - outdata = bytearray() - for i in range(0, len(indata)): - buf = indata[i] - if buf == 0x7e: - outdata.append(0x7d) - outdata.append(0x5e) - elif buf == 0x7d: - outdata.append(0x7d) - outdata.append(0x5d) - else: - outdata.append(buf) - return outdata - - def unescape(self, indata): - escape = False - out = bytearray() - for buf in indata: - if escape: - if buf == 0x5e: - out.append(0x7e) - elif buf == 0x5d: - out.append(0x7d) - else: - logging.error("Fatal error unescaping buffer!") - return None - escape = False - else: - if buf == 0x7d: - escape = True - else: - out.append(buf) - if len(out) == 0: - return None - return out - - def receive_reply(self,timeout=None): + def receive_reply(self, timeout=None): replybuf = bytearray() if timeout is None: - timeout=self.timeout + timeout = self.timeout tmp = self.cdc.read(MAX_PACKET_LEN, timeout) if tmp == bytearray(): return 0 @@ -135,10 +143,10 @@ class hdlc: if retry > 5: break replybuf.extend(tmp) - data = self.unescape(replybuf) + data = unescape(replybuf) # print(hexlify(data)) if len(data) > 3: - crc16val = self.crc16(0xFFFF, data[:-3]) + crc16val = crc16(0xFFFF, data[:-3]) reccrc = int(data[-3]) + (int(data[-2]) << 8) if crc16val != reccrc: return -1 @@ -146,17 +154,17 @@ class hdlc: time.sleep(0.01) data = self.cdc.read(MAX_PACKET_LEN, timeout) if len(data) > 3: - crc16val = self.crc16(0xFFFF, data[:-3]) + crc16val = crc16(0xFFFF, data[:-3]) reccrc = int(data[-3]) + (int(data[-2]) << 8) if crc16val != reccrc: return -1 return data return data[:-3] - def receive_reply_nocrc(self,timeout=None): + def receive_reply_nocrc(self, timeout=None): replybuf = bytearray() if timeout is None: - timeout=self.timeout + timeout = self.timeout tmp = self.cdc.read(MAX_PACKET_LEN, timeout) if tmp == bytearray(): return 0 @@ -170,7 +178,7 @@ class hdlc: if retry > 5: break replybuf.extend(tmp) - data = self.unescape(replybuf) + data = unescape(replybuf) # print(hexlify(data)) if len(data) > 3: # crc16val = self.crc16(0xFFFF, data[:-3]) @@ -197,9 +205,9 @@ class hdlc: # FlushFileBuffers(ser) def send_cmd_base(self, outdata, prefixflag, nocrc=False): - if isinstance(outdata,str): - outdata=bytes(outdata,'utf-8') - packet = self.convert_cmdbuf(bytearray(outdata)) + if isinstance(outdata, str): + outdata = bytes(outdata, 'utf-8') + packet = convert_cmdbuf(bytearray(outdata)) if self.send_unframed_buf(packet, prefixflag): if nocrc: return self.receive_reply_nocrc() diff --git a/Library/memparse.py b/Library/memparse.py index 106ef94..1feeb6f 100755 --- a/Library/memparse.py +++ b/Library/memparse.py @@ -3,11 +3,12 @@ import pt64 import pt import argparse + def pt64_walk(data, ttbr, tnsz, levels=3): print("Dumping page tables (levels=%d)" % levels) print("First level (ptbase = %016x)" % ttbr) print("---------------------------------------------") - fl = data[ttbr-ttbr:ttbr-ttbr+0x1000] + fl = data[ttbr - ttbr:ttbr - ttbr + 0x1000] if levels <= 1: return @@ -17,24 +18,24 @@ def pt64_walk(data, ttbr, tnsz, levels=3): print("Second level (ptbase = %016x)" % fle.output) print("---------------------------------------------") - sl = data[fle.output-ttbr:fle.output-ttbr+0x4000] + sl = data[fle.output - ttbr:fle.output - ttbr + 0x4000] sl = pt64.parse_pt(sl, va, tnsz, 2) if levels <= 2: continue - for (va, sle) in sl: + for (mva, sle) in sl: if "TABLE" in str(sle): print("Third level (ptbase = %016x)" % sle.output) print("---------------------------------------------") - tl = data[sle.output-ttbr:sle.output-ttbr+0x1000] - pt64.parse_pt(tl, va, tnsz, 3) + tl = data[sle.output - ttbr:sle.output - ttbr + 0x1000] + pt64.parse_pt(tl, mva, tnsz, 3) def pt32_walk(data, ttbr, skip): print("First level (va = %08x)" % ttbr) print("---------------------------------------------") - fl = data[ttbr-ttbr:ttbr-ttbr+0x4000] + fl = data[ttbr - ttbr:ttbr - ttbr + 0x4000] i = 0 for (va, fl) in pt.parse_pt(fl): @@ -45,27 +46,29 @@ def pt32_walk(data, ttbr, skip): print("") print("Second level (va = %08x)" % va) print("---------------------------------------------") - sldata = data[fl.coarse_base-ttbr:fl.coarse_base-ttbr+0x400] + sldata = data[fl.coarse_base - ttbr:fl.coarse_base - ttbr + 0x400] pt.parse_spt(sldata, va) + def main(): parser = argparse.ArgumentParser( - prog="memparse", - usage="python memparse.py -arch <32,64> -in -mem ", - formatter_class=argparse.RawTextHelpFormatter) + prog="memparse", + usage="python memparse.py -arch <32,64> -in -mem ", + formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('-in', '--in', dest='infile', help='memory dump', default="") parser.add_argument('-arch', '--arch', dest='arch', help='architecture=32,64', default="32") parser.add_argument('-mem', '--mem', dest='mem', help='memoryoffset', default="0x200000") args = parser.parse_args() - if args.infile=="": + if args.infile == "": print("You need to add an -in [memorydump filename]") return - with open(args.infile,"rb") as rf: - data=rf.read() - if args.arch=="32": - pt32_walk(data,int(args.mem,16),False) + with open(args.infile, "rb") as rf: + data = rf.read() + if args.arch == "32": + pt32_walk(data, int(args.mem, 16), False) else: - pt64_walk(data,int(args.mem,16),0,3) - -main() \ No newline at end of file + pt64_walk(data, int(args.mem, 16), 0, 3) + + +main() diff --git a/Library/nand_config.py b/Library/nand_config.py index 6831d80..aa1e3d3 100644 --- a/Library/nand_config.py +++ b/Library/nand_config.py @@ -1,6 +1,5 @@ import ctypes from enum import Enum -from struct import unpack, pack from Config.qualcomm_config import secgen, secureboottbl c_uint8 = ctypes.c_uint8 @@ -274,14 +273,14 @@ class SettingsOpt: self.chipname = "Unknown" if chipset in config_tbl: self.chipname, self.bam, self.nandbase, self.bcraddr, self.secureboot, self.pbl, \ - self.qfprom, self.memtbl = config_tbl[chipset] + self.qfprom, self.memtbl = config_tbl[chipset] self.bad_loader = 0 else: loadername = parent.sahara.programmer.lower() for chipid in config_tbl: if config_tbl[chipid][0] in loadername: self.chipname, self.bam, self.nandbase, self.bcraddr, self.secureboot, \ - self.pbl, self.qfprom, self.memtbl = config_tbl[chipid] + self.pbl, self.qfprom, self.memtbl = config_tbl[chipid] self.bad_loader = 0 if chipset == 0xFF: self.bad_loader = 0 @@ -447,7 +446,6 @@ class NandDevice: flashinfo = {} tid = nand_toshiba_id_t() tid.asDword = nandid - small_slc = False # did,slc_small_device,device_width,density_mbits if tid.did in tbl: fdev = tbl[tid.did] @@ -676,30 +674,30 @@ class NandDevice: # UD_SIZE_BYTES must be 512, 516 or 517. If ECC-Protection 516 for x16Bit-Nand and 517 for x8-bit Nand cfg0 = 0 << self.SET_RD_MODE_AFTER_STATUS \ - | 0 << self.STATUS_BFR_READ \ - | 5 << self.NUM_ADDR_CYCLES \ - | self.settings.SPARE_SIZE_BYTES << self.SPARE_SIZE_BYTES \ - | hw_ecc_bytes << self.ECC_PARITY_SIZE_BYTES_RS \ - | self.settings.UD_SIZE_BYTES << self.UD_SIZE_BYTES \ - | self.settings.CW_PER_PAGE << self.CW_PER_PAGE \ - | 0 << self.DISABLE_STATUS_AFTER_WRITE + | 0 << self.STATUS_BFR_READ \ + | 5 << self.NUM_ADDR_CYCLES \ + | self.settings.SPARE_SIZE_BYTES << self.SPARE_SIZE_BYTES \ + | hw_ecc_bytes << self.ECC_PARITY_SIZE_BYTES_RS \ + | self.settings.UD_SIZE_BYTES << self.UD_SIZE_BYTES \ + | self.settings.CW_PER_PAGE << self.CW_PER_PAGE \ + | 0 << self.DISABLE_STATUS_AFTER_WRITE bad_block_byte = self.settings.BAD_BLOCK_BYTE_NUM wide_bus = self.settings.IsWideFlash bch_disabled = self.settings.args_disable_ecc # option in gui, implemented cfg1 = 0 << self.ECC_MODE_DEV1 \ - | 1 << self.ENABLE_NEW_ECC \ - | 0 << self.DISABLE_ECC_RESET_AFTER_OPDONE \ - | 0 << self.ECC_DECODER_CGC_EN \ - | 0 << self.ECC_ENCODER_CGC_EN \ - | 2 << self.WR_RD_BSY_GAP \ - | 0 << self.BAD_BLOCK_IN_SPARE_AREA \ - | bad_block_byte << self.BAD_BLOCK_BYTE_NUM \ - | 0 << self.CS_ACTIVE_BSY \ - | 7 << self.NAND_RECOVERY_CYCLES \ - | wide_bus << self.WIDE_FLASH \ - | bch_disabled << self.ENABLE_BCH_ECC + | 1 << self.ENABLE_NEW_ECC \ + | 0 << self.DISABLE_ECC_RESET_AFTER_OPDONE \ + | 0 << self.ECC_DECODER_CGC_EN \ + | 0 << self.ECC_ENCODER_CGC_EN \ + | 2 << self.WR_RD_BSY_GAP \ + | 0 << self.BAD_BLOCK_IN_SPARE_AREA \ + | bad_block_byte << self.BAD_BLOCK_BYTE_NUM \ + | 0 << self.CS_ACTIVE_BSY \ + | 7 << self.NAND_RECOVERY_CYCLES \ + | wide_bus << self.WIDE_FLASH \ + | bch_disabled << self.ENABLE_BCH_ECC """ cfg0_raw = (self.settings.CW_PER_PAGE-1) << CW_PER_PAGE \ @@ -716,13 +714,13 @@ class NandDevice: | 1 << DEV0_CFG1_ECC_DISABLE """ ecc_bch_cfg = 1 << self.ECC_FORCE_CLK_OPEN \ - | 0 << self.ECC_DEC_CLK_SHUTDOWN \ - | 0 << self.ECC_ENC_CLK_SHUTDOWN \ - | self.settings.UD_SIZE_BYTES << self.ECC_NUM_DATA_BYTES \ - | self.settings.ECC_PARITY_SIZE_BYTES << self.ECC_PARITY_SIZE_BYTES_BCH \ - | self.settings.ECC_MODE << self.ECC_MODE \ - | 0 << self.ECC_SW_RESET \ - | bch_disabled << self.ECC_CFG_ECC_DISABLE + | 0 << self.ECC_DEC_CLK_SHUTDOWN \ + | 0 << self.ECC_ENC_CLK_SHUTDOWN \ + | self.settings.UD_SIZE_BYTES << self.ECC_NUM_DATA_BYTES \ + | self.settings.ECC_PARITY_SIZE_BYTES << self.ECC_PARITY_SIZE_BYTES_BCH \ + | self.settings.ECC_MODE << self.ECC_MODE \ + | 0 << self.ECC_SW_RESET \ + | bch_disabled << self.ECC_CFG_ECC_DISABLE if self.settings.UD_SIZE_BYTES == 516: ecc_buf_cfg = 0x203 diff --git a/Library/pt.py b/Library/pt.py index af777ed..758c720 100755 --- a/Library/pt.py +++ b/Library/pt.py @@ -36,35 +36,35 @@ def parse_spt(data, base): va += 4 -def get_fld(fld): - s = fld & 3 +def get_fld(mfld): + s = mfld & 3 if s == 0: - return fault_desc(fld) + return fault_desc(mfld) if s == 1: - return pt_desc(fld) + return pt_desc(mfld) if s == 2: - return section_desc(fld) + return section_desc(mfld) if s == 3: - return reserved_desc(fld) + return reserved_desc(mfld) return None -def get_sld(sld): - s = sld & 3 +def get_sld(msld): + s = msld & 3 if s == 1: - return sld_lp(sld) + return sld_lp(msld) if s > 1: - return sld_xsp(sld) + return sld_xsp(msld) return "UNSUPPORTED" class descriptor(object): - def __init__(self, fld): + def __init__(self, mfld): pass def get_name(self): diff --git a/Library/pt64.py b/Library/pt64.py index a5f7692..4a82520 100755 --- a/Library/pt64.py +++ b/Library/pt64.py @@ -54,14 +54,14 @@ def parse_pt(data, base, tnsz, level=1): i = 0 entries = [] while i < min(len(data), get_level_size(tnsz, level)): - entry = struct.unpack(" 0: try: tmp = self.cdc.read(length) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except + self.debug(str(e)) return None length -= len(tmp) pos += len(tmp) @@ -629,7 +637,7 @@ class sahara(metaclass=LogBase): else: data += tmp if display: - prog = int(float(pos) / float(total) * float(100)) + prog = round(float(pos) / float(total) * float(100), 1) if prog > old: if display: print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) @@ -676,7 +684,7 @@ class sahara(metaclass=LogBase): if memory_table_length % pktsize == 0: if memory_table_length != 0: print( - f"Reading 64-Bit partition from {hex(memory_table_addr)} with length of " + \ + f"Reading 64-Bit partition from {hex(memory_table_addr)} with length of " + "{hex(memory_table_length)}") ptbldata = self.read_memory(memory_table_addr, memory_table_length) num_entries = len(ptbldata) // pktsize @@ -692,7 +700,7 @@ class sahara(metaclass=LogBase): partitions.append(dict(desc=desc, filename=filename, mem_base=mem_base, length=length, save_pref=save_pref)) print( - f"{filename}({desc}): Offset {hex(mem_base)}, Length {hex(length)}, " + \ + f"{filename}({desc}): Offset {hex(mem_base)}, Length {hex(length)}, " + "SavePref {hex(save_pref)}") self.dump_partitions(partitions) @@ -734,7 +742,7 @@ class sahara(metaclass=LogBase): self.info(f"Uploading loader {self.programmer} ...") with open(self.programmer, "rb") as rf: programmer = rf.read() - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except self.error(str(e)) sys.exit() @@ -744,7 +752,7 @@ class sahara(metaclass=LogBase): try: datalen = len(programmer) done = False - while datalen > 0 or done == True: + while datalen > 0 or done: cmd, pkt = self.get_rsp() if cmd == -1 or pkt == -1: if self.cmd_done(): @@ -791,7 +799,7 @@ class sahara(metaclass=LogBase): self.cmd_done() return self.mode return "" - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except self.error("Unexpected error on uploading, maybe signature of loader wasn't accepted ?\n" + str(e)) return "" diff --git a/Library/sparse.py b/Library/sparse.py index 6a2dc96..4f7faa4 100755 --- a/Library/sparse.py +++ b/Library/sparse.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 -import sys -from struct import unpack -from queue import Queue import logging +import sys +from queue import Queue +from struct import unpack try: from Library.utils import LogBase, print_progress -except Exception as e: - import os, sys, inspect +except ImportError as e: + import os + import sys + import inspect current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(current_dir) @@ -19,6 +21,7 @@ class QCSparse(metaclass=LogBase): def __init__(self, filename, loglevel): self.rf = open(filename, 'rb') self.data = Queue() + self.__logger = self.__logger self.offset = 0 self.tmpdata = bytearray() self.__logger.setLevel(loglevel) @@ -227,7 +230,7 @@ if __name__ == "__main__": wf.write(wdata) - prog = int(float(pos) / float(total) * float(100)) + prog = round(float(pos) / float(total) * float(100), 1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)' % (pos // SECTOR_SIZE_IN_BYTES), diff --git a/Library/streaming.py b/Library/streaming.py index e7915ce..dcba890 100755 --- a/Library/streaming.py +++ b/Library/streaming.py @@ -2,11 +2,13 @@ from struct import pack from binascii import unhexlify from Library.utils import * from Library.hdlc import * -from Library.nand_config import BadFlags, SettingsOpt, nand_ids, nand_manuf_ids, nandregs, NandDevice +from Library.nand_config import BadFlags, SettingsOpt, nandregs, NandDevice class Streaming(metaclass=LogBase): def __init__(self, cdc, sahara, loglevel=logging.INFO): + self.__logger = self.__logger + self.regs = None self.cdc = cdc self.hdlc = hdlc(self.cdc) self.mode = sahara.mode @@ -17,9 +19,9 @@ class Streaming(metaclass=LogBase): self.nanddevice = None self.nandbase = 0 self.__logger.setLevel(loglevel) - self.info=self.__logger.info - self.debug=self.__logger.debug - self.error=self.__logger.error + self.info = self.__logger.info + self.debug = self.__logger.debug + self.error = self.__logger.error self.warning = self.__logger.warning self.modules = None self.Qualcomm = 0 @@ -43,13 +45,9 @@ class Streaming(metaclass=LogBase): self.regs.NAND_FLASH_CMD = 0x8000b self.regs.NAND_EXEC_CMD = self.nanddevice.NAND_CMD_SOFT_RESET self.nandwait() - dev_cfg0 = self.regs.NAND_DEV0_CFG0 - dev_cfg1 = self.regs.NAND_DEV0_CFG1 - dev_ecc_cfg = self.regs.NAND_DEV0_ECC_CFG - - # dev_ecc1_cfg = self.regs.NAND_DEV1_ECC_CFG - # dev_cfg1_0 = self.regs.NAND_DEV1_CFG0 - # dev_cfg1_1 = self.regs.NAND_DEV1_CFG1 + # dev_cfg0 = self.regs.NAND_DEV0_CFG0 + # dev_cfg1 = self.regs.NAND_DEV0_CFG1 + # dev_ecc_cfg = self.regs.NAND_DEV0_ECC_CFG """ self.nand_reset() @@ -79,10 +77,13 @@ class Streaming(metaclass=LogBase): self.regs.NAND_FLASH_CMD = self.nanddevice.NAND_CMD_PAGE_READ self.regs.NAND_ADDR0 = 0 self.regs.NAND_ADDR1 = 0 - self.regs.NAND_DEV0_CFG0 = 0 << self.nanddevice.CW_PER_PAGE | 512 << self.nanddevice.UD_SIZE_BYTES | 5 << self.nanddevice.NUM_ADDR_CYCLES | 0 << self.nanddevice.SPARE_SIZE_BYTES - self.regs.NAND_DEV1_CFG1 = 7 << self.nanddevice.NAND_RECOVERY_CYCLES | 0 << self.nanddevice.CS_ACTIVE_BSY | 17 << self.nanddevice.BAD_BLOCK_BYTE_NUM | \ - 1 << self.nanddevice.BAD_BLOCK_IN_SPARE_AREA | 2 << self.nanddevice.WR_RD_BSY_GAP | 0 << self.nanddevice.WIDE_FLASH | \ - 1 << self.nanddevice.DEV0_CFG1_ECC_DISABLE + self.regs.NAND_DEV0_CFG0 = 0 << self.nanddevice.CW_PER_PAGE | 512 << self.nanddevice.UD_SIZE_BYTES | \ + 5 << self.nanddevice.NUM_ADDR_CYCLES | 0 << self.nanddevice.SPARE_SIZE_BYTES + self.regs.NAND_DEV1_CFG1 = 7 << self.nanddevice.NAND_RECOVERY_CYCLES | 0 << self.nanddevice.CS_ACTIVE_BSY | \ + 17 << self.nanddevice.BAD_BLOCK_BYTE_NUM | \ + 1 << self.nanddevice.BAD_BLOCK_IN_SPARE_AREA | 2 << self.nanddevice.WR_RD_BSY_GAP | \ + 0 << self.nanddevice.WIDE_FLASH | \ + 1 << self.nanddevice.DEV0_CFG1_ECC_DISABLE self.regs.NAND_EBI2_ECC_BUF_CFG = 1 << self.nanddevice.ECC_CFG_ECC_DISABLE self.regs.NAND_DEV_CMD_VLD = self.regs.NAND_DEV_CMD_VLD & ~(1 << self.nanddevice.READ_START_VLD) self.regs.NAND_DEV_CMD1 = (self.regs.NAND_DEV_CMD1 & ~( @@ -140,7 +141,7 @@ class Streaming(metaclass=LogBase): self.mempoke(self.nanddevice.NAND_FLASH_BUFFER + i, 0xffffffff) def secure_mode(self): - resp = self.send(b"\x17\x01", True) + self.send(b"\x17\x01", True) return 0 def qclose(self, errmode): @@ -165,7 +166,7 @@ class Streaming(metaclass=LogBase): def enter_flash_mode(self, ptable=None): self.secure_mode() self.qclose(0) - if ptable != None: + if ptable is None: self.send_ptable(ptable, 0) # 1 for fullflash def write_flash(self, partname, filename): @@ -211,7 +212,7 @@ class Streaming(metaclass=LogBase): write_handle.write(data) sector += sectorstoread if info: - prog = int(float(sector) / float(sectors) * float(100)) + prog = round(float(sector) / float(sectors) * float(100), 1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)' % sector, bar_length=50) @@ -342,7 +343,7 @@ class Streaming(metaclass=LogBase): return self.memwrite(address, data) def reset(self): - data = self.send(b"\x0B", True) + self.send(b"\x0B", True) return True def nandwait(self): @@ -692,6 +693,7 @@ class Streaming(metaclass=LogBase): numberOfSectors = 0 sectorSizes = 0 featureBits = 0 + hp = hellopacket() info = b"\x01QCOM fast download protocol host\x03\x23\x23\x23\x20" resp = self.send(info, True) @@ -730,7 +732,7 @@ class Streaming(metaclass=LogBase): self.settings.sectors_per_page=hp.numberOfSectors """ return True, hp - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except self.error(str(e)) return False, hp @@ -742,7 +744,7 @@ class Streaming(metaclass=LogBase): [0x11, 0x00, 0x12, 0x00, 0xa0, 0xe3, 0x00, 0x00, 0xc1, 0xe5, 0x01, 0x40, 0xa0, 0xe3, 0x1e, 0xff, 0x2f, 0xe1]) resp = self.send(cmdbuf, True) - resp2 = self.hdlc.receive_reply(5) + self.hdlc.receive_reply(5) i = resp[1] if i == 0x12: # if not self.tst_loader(): @@ -759,7 +761,7 @@ class Streaming(metaclass=LogBase): return True return True else: - if not b"Invalid" in resp: + if b"Invalid" not in resp: self.streaming_mode = self.Qualcomm self.memread = self.qc_memread self.settings = SettingsOpt(self, 0xFF) @@ -816,8 +818,9 @@ class Streaming(metaclass=LogBase): markerpos = "spare" if self.nanddevice.BAD_BLOCK_IN_SPARE_AREA else "user" self.info( "Defective block marker position: %s+%x" % (markerpos, self.nanddevice.BAD_BLOCK_BYTE_NUM)) - self.info("The total size of the flash memory = %u blocks (%i MB)" % (self.settings.MAXBLOCK, - self.settings.MAXBLOCK * self.settings.num_pages_per_blk / 1024 * self.settings.PAGESIZE / 1024)) + self.info("The total size of the flash memory = %u blocks (%i MB)" % + (self.settings.MAXBLOCK, self.settings.MAXBLOCK * + self.settings.num_pages_per_blk / 1024 * self.settings.PAGESIZE / 1024)) val = resp[1].flashId.decode('utf-8') if resp[1].flashId[0] != 0x65 else "" self.info("Flash memory: %s %s, %s" % (self.settings.flash_mfr, val, self.settings.flash_descr)) @@ -863,7 +866,7 @@ class Streaming(metaclass=LogBase): toread -= size pos += size if info: - prog = int(float(pos) / float(length) * float(100)) + prog = round(float(pos) / float(length) * float(100), 1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete (Offset: %08X)' % (offset + pos), bar_length=50) @@ -896,7 +899,7 @@ class Streaming(metaclass=LogBase): badblocks += 1 pos += self.settings.PAGESIZE if info: - prog = int(float(pos) / float(totallength) * float(100)) + prog = round(float(pos) / float(totallength) * float(100), 1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) old = prog @@ -988,7 +991,7 @@ class Streaming(metaclass=LogBase): "faffff3a0000a0e3000000ea000090e504109de40100c1e5aa00a0e30000c1e50240a0e31eff2fe1efbeadde") cmd = b"\x11\x00" + search resp = self.send(cmd, True) - resp2 = self.hdlc.receive_reply(5) + self.hdlc.receive_reply(5) if b"Power off not supported" in resp: self.streaming_mode = self.Qualcomm self.memread = self.qc_memread diff --git a/Library/streaming_client.py b/Library/streaming_client.py index b5c66d1..f29c61d 100644 --- a/Library/streaming_client.py +++ b/Library/streaming_client.py @@ -10,14 +10,15 @@ from Library.utils import do_tcp_server, LogBase, getint class streaming_client(metaclass=LogBase): def __init__(self, arguments, cdc, sahara, loglevel, printer): self.cdc = cdc + self.__logger = self.__logger self.sahara = sahara self.arguments = arguments self.streaming = Streaming(cdc, sahara, loglevel) self.printer = printer self.__logger.setLevel(loglevel) - self.error=self.__logger.error - self.info=self.__logger.info - if loglevel==logging.DEBUG: + self.error = self.__logger.error + self.info = self.__logger.info + if loglevel == logging.DEBUG: logfilename = "log.txt" fh = logging.FileHandler(logfilename) self.__logger.addHandler(fh) @@ -74,8 +75,8 @@ class streaming_client(metaclass=LogBase): mode = options[""] if self.streaming.connect(mode): xflag = 0 - res=self.streaming.hdlc.receive_reply(5) - if self.streaming.streaming_mode==self.streaming.Patched: + self.streaming.hdlc.receive_reply(5) + if self.streaming.streaming_mode == self.streaming.Patched: self.streaming.nand_init(xflag) if cmd == "gpt": directory = options[""] @@ -119,18 +120,19 @@ class streaming_client(metaclass=LogBase): self.error(f"Error: Couldn't detect partition: {partition}\nAvailable partitions:") self.print_partitions(rpartitions) elif cmd == "rs": - sector = getint(options[""]) #Page + sector = getint(options[""]) # Page sectors = getint(options[""]) filename = options[""] self.printer(f"Dumping at Sector {hex(sector)} with Sectorcount {hex(sectors)}...") - if self.streaming.read_sectors(sector,sectors,filename,True): + if self.streaming.read_sectors(sector, sectors, filename, True): self.printer(f"Dumped sector {str(sector)} with sector count {str(sectors)} as {filename}.") elif cmd == "rf": sector = 0 - sectors = self.streaming.settings.MAXBLOCK*self.streaming.settings.num_pages_per_blk*self.streaming.settings.sectors_per_page + sectors = self.streaming.settings.MAXBLOCK * self.streaming.settings.num_pages_per_blk * \ + self.streaming.settings.sectors_per_page filename = options[""] self.printer(f"Dumping Flash from sector 0 to sector {hex(sectors)}...") - if self.streaming.read_sectors(sector,sectors,filename,True): + if self.streaming.read_sectors(sector, sectors, filename, True): self.printer(f"Dumped sector {str(sector)} with sector count {str(sectors)} as {filename}.") elif cmd == "rl": directory = options[""] @@ -164,13 +166,13 @@ class streaming_client(metaclass=LogBase): # attr3 = spartition["attr3"] partfilename = filename self.info(f"Dumping partition {str(partition)} with block count {str(length)} as " + - f"{filename}.") + f"{filename}.") self.streaming.read_raw(offset, length, self.streaming.settings.UD_SIZE_BYTES, partfilename) elif cmd == "peek": offset = getint(options[""]) length = getint(options[""]) filename = options[""] - if self.streaming.memtofile(offset,length,filename): + if self.streaming.memtofile(offset, length, filename): self.info( f"Peek data from offset {hex(offset)} and length {hex(length)} was written to {filename}") elif cmd == "peekhex": @@ -292,7 +294,7 @@ class streaming_client(metaclass=LogBase): return False ############################### elif cmd == "nop": - #resp=self.streaming.send(b"\x7E\x09") + # resp=self.streaming.send(b"\x7E\x09") self.error("Nop command isn't supported by streaming loader") return True elif cmd == "setbootablestoragedrive": @@ -306,7 +308,7 @@ class streaming_client(metaclass=LogBase): return False partitionname = options[""] filename = options[""] - partitionfilename="" + partitionfilename = "" if "--partitionfilename" in options: partitionfilename = options["--partitionfilename"] if not os.path.exists(partitionfilename): @@ -315,7 +317,7 @@ class streaming_client(metaclass=LogBase): if not os.path.exists(filename): self.error(f"Error: Couldn't find file: {filename}") return False - if partitionfilename=="": + if partitionfilename == "": rpartitions = self.streaming.get_partitions() else: rpartitions = self.streaming.get_partitions(partitionfilename) @@ -328,7 +330,8 @@ class streaming_client(metaclass=LogBase): # attr2 = spartition["attr2"] # attr3 = spartition["attr3"] sectors = int(os.stat( - filename).st_size / self.streaming.settings.num_pages_per_blk / self.streaming.settings.PAGESIZE) + filename).st_size / self.streaming.settings.num_pages_per_blk / + self.streaming.settings.PAGESIZE) if sectors > length: self.error( f"Error: {filename} has {sectors} sectors but partition only has {length}.") diff --git a/Library/utils.py b/Library/utils.py index 5c6d791..caf9ec5 100755 --- a/Library/utils.py +++ b/Library/utils.py @@ -34,6 +34,7 @@ def do_tcp_server(client, arguments, handler): print('starting up on %s port %s' % server_address) sock.bind(server_address) sock.listen(1) + response = None while True: print('waiting for a connection') connection, client_address = sock.accept() diff --git a/Library/xmlparser.py b/Library/xmlparser.py index 33d32a2..5beb73a 100755 --- a/Library/xmlparser.py +++ b/Library/xmlparser.py @@ -1,7 +1,7 @@ import xml.etree.ElementTree as ET -class xmlparser(): +class xmlparser: def getresponse(self, input): lines = input.split(b"