Beautify code, improve progress bar output

This commit is contained in:
Bjoern Kerler 2021-03-01 11:35:37 +01:00
parent a5fdf09c93
commit 3433b60403
14 changed files with 274 additions and 250 deletions

View file

@ -418,12 +418,13 @@ class firehose(metaclass=LogBase):
self.cdc.write(wdata, wlen)
prog = int(float(total-bytestowrite) / float(total) * float(100))
prog = round(float(total-bytestowrite) / float(total) * float(100),1)
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)'
% ((total-bytestowrite) // self.cfg.SECTOR_SIZE_IN_BYTES),
bar_length=50)
old=prog
self.cdc.write(b'', self.cfg.MaxPayloadSizeToTargetInBytes)
# time.sleep(0.2)
@ -490,7 +491,7 @@ class firehose(metaclass=LogBase):
wdata += b"\x00" * (filllen - wlen)
wlen = len(wdata)
self.cdc.write(wdata, wlen)
prog = int(float(pos) / float(total) * float(100))
prog = round(float(pos) / float(total) * float(100),1)
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Written (Sector %d)'
@ -544,7 +545,7 @@ class firehose(metaclass=LogBase):
if bytesToWrite < wlen:
wlen = bytesToWrite
self.cdc.write(empty[0:wlen], self.cfg.MaxPayloadSizeToTargetInBytes)
prog = int(float(pos) / float(total) * float(100))
prog = round(float(pos) / float(total) * float(100),1)
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Erased (Sector %d)'
@ -608,7 +609,7 @@ class firehose(metaclass=LogBase):
bytesToRead -= len(tmp)
wr.write(tmp)
if display:
prog = int(float(total-bytesToRead) / float(total) * float(100))
prog = round(float(total-bytesToRead) / float(total) * float(100),1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Read (Sector %d)'
% ((total-bytesToRead) // self.cfg.SECTOR_SIZE_IN_BYTES),
@ -671,7 +672,7 @@ class firehose(metaclass=LogBase):
bytesToRead -= len(tmp)
dataread += len(tmp)
resData += tmp
prog = int(float(dataread) / float(total) * float(100))
prog = round(float(dataread) / float(total) * float(100),1)
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Read (Sector %d)'
@ -1136,7 +1137,7 @@ class firehose(metaclass=LogBase):
datawritten += maxsize
lengthtowrite -= maxsize
if info:
prog = int(float(datawritten) / float(SizeInBytes) * float(100))
prog = round(float(datawritten) / float(SizeInBytes) * float(100),1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
@ -1210,7 +1211,7 @@ class firehose(metaclass=LogBase):
else:
resp += tmp2
if info:
prog = int(float(dataread) / float(SizeInBytes) * float(100))
prog = round(float(dataread) / float(SizeInBytes) * float(100),1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog

View file

@ -586,6 +586,7 @@ class firehose_client(metaclass=LogBase):
self.printer(f"Error writing {filename} to sector {str(startsector)}.")
return False
else:
if len(res) > 0:
fpartitions = res[1]
self.error(f"Error: Couldn't detect partition: {partitionname}\nAvailable partitions:")
for lun in fpartitions:

View file

@ -119,6 +119,7 @@ class gpt(metaclass=LogBase):
def __init__(self, num_part_entries=0, part_entry_size=0, part_entry_start_lba=0, loglevel=logging.INFO, *args,
**kwargs):
self.num_part_entries = num_part_entries
self.__logger = self.__logger
self.part_entry_size = part_entry_size
self.part_entry_start_lba = part_entry_start_lba
self.totalsectors = None
@ -203,21 +204,15 @@ class gpt(metaclass=LogBase):
return True
def print(self):
print("\nGPT Table:\n-------------")
for partition in self.partentries:
print("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:08x}, UUID {}, Type {}".format(
partition.name + ":", partition.sector * self.sectorsize, partition.sectors * self.sectorsize,
partition.flags, partition.unique, partition.type))
print("\nTotal disk size:0x{:016x}, sectors:0x{:016x}".format(self.totalsectors * self.sectorsize,
self.totalsectors))
print(self.tostring())
def tostring(self):
mstr = "\nGPT Table:\n-------------"
mstr = "\nGPT Table:\n-------------\n"
for partition in self.partentries:
mstr += ("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:08x}, UUID {}, Type {}".format(
mstr += ("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:08x}, UUID {}, Type {}\n".format(
partition.name + ":", partition.sector * self.sectorsize, partition.sectors * self.sectorsize,
partition.flags, partition.unique, partition.type))
mstr += ("\nTotal disk size:0x{:016x}, sectors:0x{:016x}".format(self.totalsectors * self.sectorsize,
mstr += ("\nTotal disk size:0x{:016x}, sectors:0x{:016x}\n".format(self.totalsectors * self.sectorsize,
self.totalsectors))
return mstr

View file

@ -40,49 +40,41 @@ crcTbl = (
0x7bc7, 0x6a4e, 0x58d5, 0x495c, 0x3de3, 0x2c6a, 0x1ef1, 0x0f78)
class hdlc:
def __init__(self, cdc):
self.cdc = cdc
self.programmer = None
self.timeout = 1500
def serial16(self, data):
out = bytearray()
out.append((data >> 8) & 0xFF)
out.append(data & 0xFF)
return out
def serial16le(self, data):
def serial16le(data):
out = bytearray()
out.append(data & 0xFF)
out.append((data >> 8) & 0xFF)
return out
def serial32(self, data):
def serial16(data):
out = bytearray()
out += self.serial16((data >> 16) & 0xFFFF)
out += self.serial16(data & 0xFFFF)
out.append((data >> 8) & 0xFF)
out.append(data & 0xFF)
return out
def serial32le(self, data):
def serial32le(data):
out = bytearray()
out += self.serial16le(data & 0xFFFF)
out += self.serial16le((data >> 16) & 0xFFFF)
out += serial16le(data & 0xFFFF)
out += serial16le((data >> 16) & 0xFFFF)
return out
def crc16(self, iv, data):
def crc16(iv, data):
for byte in data:
iv = ((iv >> 8) & 0xFFFF) ^ crcTbl[(iv ^ byte) & 0xFF]
return ~iv & 0xFFFF
def convert_cmdbuf(self, indata):
crc16val = self.crc16(0xFFFF, indata)
indata.extend(bytearray(self.serial16le(crc16val)))
outdata = self.escape(indata)
outdata.append(0x7E)
return outdata
def escape(self, indata):
def serial32(data):
out = bytearray()
out += serial16((data >> 16) & 0xFFFF)
out += serial16(data & 0xFFFF)
return out
def escape(indata):
outdata = bytearray()
for i in range(0, len(indata)):
buf = indata[i]
@ -96,11 +88,12 @@ class hdlc:
outdata.append(buf)
return outdata
def unescape(self, indata):
escape = False
def unescape(indata):
mescape = False
out = bytearray()
for buf in indata:
if escape:
if mescape:
if buf == 0x5e:
out.append(0x7e)
elif buf == 0x5d:
@ -108,20 +101,35 @@ class hdlc:
else:
logging.error("Fatal error unescaping buffer!")
return None
escape = False
mescape = False
else:
if buf == 0x7d:
escape = True
mescape = True
else:
out.append(buf)
if len(out) == 0:
return None
return out
def receive_reply(self,timeout=None):
def convert_cmdbuf(indata):
crc16val = crc16(0xFFFF, indata)
indata.extend(bytearray(serial16le(crc16val)))
outdata = escape(indata)
outdata.append(0x7E)
return outdata
class hdlc:
def __init__(self, cdc):
self.cdc = cdc
self.programmer = None
self.timeout = 1500
def receive_reply(self, timeout=None):
replybuf = bytearray()
if timeout is None:
timeout=self.timeout
timeout = self.timeout
tmp = self.cdc.read(MAX_PACKET_LEN, timeout)
if tmp == bytearray():
return 0
@ -135,10 +143,10 @@ class hdlc:
if retry > 5:
break
replybuf.extend(tmp)
data = self.unescape(replybuf)
data = unescape(replybuf)
# print(hexlify(data))
if len(data) > 3:
crc16val = self.crc16(0xFFFF, data[:-3])
crc16val = crc16(0xFFFF, data[:-3])
reccrc = int(data[-3]) + (int(data[-2]) << 8)
if crc16val != reccrc:
return -1
@ -146,17 +154,17 @@ class hdlc:
time.sleep(0.01)
data = self.cdc.read(MAX_PACKET_LEN, timeout)
if len(data) > 3:
crc16val = self.crc16(0xFFFF, data[:-3])
crc16val = crc16(0xFFFF, data[:-3])
reccrc = int(data[-3]) + (int(data[-2]) << 8)
if crc16val != reccrc:
return -1
return data
return data[:-3]
def receive_reply_nocrc(self,timeout=None):
def receive_reply_nocrc(self, timeout=None):
replybuf = bytearray()
if timeout is None:
timeout=self.timeout
timeout = self.timeout
tmp = self.cdc.read(MAX_PACKET_LEN, timeout)
if tmp == bytearray():
return 0
@ -170,7 +178,7 @@ class hdlc:
if retry > 5:
break
replybuf.extend(tmp)
data = self.unescape(replybuf)
data = unescape(replybuf)
# print(hexlify(data))
if len(data) > 3:
# crc16val = self.crc16(0xFFFF, data[:-3])
@ -197,9 +205,9 @@ class hdlc:
# FlushFileBuffers(ser)
def send_cmd_base(self, outdata, prefixflag, nocrc=False):
if isinstance(outdata,str):
outdata=bytes(outdata,'utf-8')
packet = self.convert_cmdbuf(bytearray(outdata))
if isinstance(outdata, str):
outdata = bytes(outdata, 'utf-8')
packet = convert_cmdbuf(bytearray(outdata))
if self.send_unframed_buf(packet, prefixflag):
if nocrc:
return self.receive_reply_nocrc()

View file

@ -3,11 +3,12 @@ import pt64
import pt
import argparse
def pt64_walk(data, ttbr, tnsz, levels=3):
print("Dumping page tables (levels=%d)" % levels)
print("First level (ptbase = %016x)" % ttbr)
print("---------------------------------------------")
fl = data[ttbr-ttbr:ttbr-ttbr+0x1000]
fl = data[ttbr - ttbr:ttbr - ttbr + 0x1000]
if levels <= 1:
return
@ -17,24 +18,24 @@ def pt64_walk(data, ttbr, tnsz, levels=3):
print("Second level (ptbase = %016x)" % fle.output)
print("---------------------------------------------")
sl = data[fle.output-ttbr:fle.output-ttbr+0x4000]
sl = data[fle.output - ttbr:fle.output - ttbr + 0x4000]
sl = pt64.parse_pt(sl, va, tnsz, 2)
if levels <= 2:
continue
for (va, sle) in sl:
for (mva, sle) in sl:
if "TABLE" in str(sle):
print("Third level (ptbase = %016x)" % sle.output)
print("---------------------------------------------")
tl = data[sle.output-ttbr:sle.output-ttbr+0x1000]
pt64.parse_pt(tl, va, tnsz, 3)
tl = data[sle.output - ttbr:sle.output - ttbr + 0x1000]
pt64.parse_pt(tl, mva, tnsz, 3)
def pt32_walk(data, ttbr, skip):
print("First level (va = %08x)" % ttbr)
print("---------------------------------------------")
fl = data[ttbr-ttbr:ttbr-ttbr+0x4000]
fl = data[ttbr - ttbr:ttbr - ttbr + 0x4000]
i = 0
for (va, fl) in pt.parse_pt(fl):
@ -45,9 +46,10 @@ def pt32_walk(data, ttbr, skip):
print("")
print("Second level (va = %08x)" % va)
print("---------------------------------------------")
sldata = data[fl.coarse_base-ttbr:fl.coarse_base-ttbr+0x400]
sldata = data[fl.coarse_base - ttbr:fl.coarse_base - ttbr + 0x400]
pt.parse_spt(sldata, va)
def main():
parser = argparse.ArgumentParser(
prog="memparse",
@ -57,15 +59,16 @@ def main():
parser.add_argument('-arch', '--arch', dest='arch', help='architecture=32,64', default="32")
parser.add_argument('-mem', '--mem', dest='mem', help='memoryoffset', default="0x200000")
args = parser.parse_args()
if args.infile=="":
if args.infile == "":
print("You need to add an -in [memorydump filename]")
return
with open(args.infile,"rb") as rf:
data=rf.read()
if args.arch=="32":
pt32_walk(data,int(args.mem,16),False)
with open(args.infile, "rb") as rf:
data = rf.read()
if args.arch == "32":
pt32_walk(data, int(args.mem, 16), False)
else:
pt64_walk(data,int(args.mem,16),0,3)
pt64_walk(data, int(args.mem, 16), 0, 3)
main()

View file

@ -1,6 +1,5 @@
import ctypes
from enum import Enum
from struct import unpack, pack
from Config.qualcomm_config import secgen, secureboottbl
c_uint8 = ctypes.c_uint8
@ -447,7 +446,6 @@ class NandDevice:
flashinfo = {}
tid = nand_toshiba_id_t()
tid.asDword = nandid
small_slc = False
# did,slc_small_device,device_width,density_mbits
if tid.did in tbl:
fdev = tbl[tid.did]

View file

@ -36,35 +36,35 @@ def parse_spt(data, base):
va += 4
def get_fld(fld):
s = fld & 3
def get_fld(mfld):
s = mfld & 3
if s == 0:
return fault_desc(fld)
return fault_desc(mfld)
if s == 1:
return pt_desc(fld)
return pt_desc(mfld)
if s == 2:
return section_desc(fld)
return section_desc(mfld)
if s == 3:
return reserved_desc(fld)
return reserved_desc(mfld)
return None
def get_sld(sld):
s = sld & 3
def get_sld(msld):
s = msld & 3
if s == 1:
return sld_lp(sld)
return sld_lp(msld)
if s > 1:
return sld_xsp(sld)
return sld_xsp(msld)
return "UNSUPPORTED"
class descriptor(object):
def __init__(self, fld):
def __init__(self, mfld):
pass
def get_name(self):

View file

@ -54,14 +54,14 @@ def parse_pt(data, base, tnsz, level=1):
i = 0
entries = []
while i < min(len(data), get_level_size(tnsz, level)):
entry = struct.unpack("<Q", data[i:i + 8])[0]
mentry = struct.unpack("<Q", data[i:i + 8])[0]
f = get_fld(entry, level)
f = get_fld(mentry, level)
if f is None:
i += 8
continue
va = get_va_for_level(base, int(i / 8), level)
if (f != 'UNSUPPORTED' and f.apx == 0 and f.ap == 3 and f.xn == 0):
if f != 'UNSUPPORTED' and f.apx == 0 and f.ap == 3 and f.xn == 0:
print("%016x %s - WX !!" % (va, f))
else:
print("%016x %s" % (va, f))
@ -72,19 +72,19 @@ def parse_pt(data, base, tnsz, level=1):
return entries
def get_fld(fld, level):
s = fld & 3
def get_fld(mfld, level):
s = mfld & 3
if s == 0:
return None
if s == 1:
return block_entry4k(fld, level)
return block_entry4k(mfld, level)
if s == 2:
return None
if s == 3:
return table_entry4k(fld, level)
return table_entry4k(mfld, level)
return None
class descriptor(object):

View file

@ -168,14 +168,16 @@ class sahara(metaclass=LogBase):
devid = hwid[8:]
pkhash = filename.split("_")[1].lower()
for msmid in convertmsmid(msmid):
mhwid = (msmid + devid).lower()
mhwid = msmid + devid
mhwid = mhwid.lower()
if mhwid not in loaderdb:
loaderdb[mhwid] = {}
if pkhash not in loaderdb[mhwid]:
loaderdb[mhwid][pkhash] = fn
else:
loaderdb[mhwid][pkhash].append(fn)
except: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
continue
self.loaderdb = loaderdb
return loaderdb
@ -277,6 +279,12 @@ class sahara(metaclass=LogBase):
def __init__(self, cdc, loglevel):
self.cdc = cdc
self.__logger = self.__logger
self.info = self.__logger.info
self.debug = self.__logger.debug
self.error = self.__logger.error
self.warning = self.__logger.warning
self.id = None
self.loaderdb = None
self.version = 2.1
self.programmer = None
@ -297,10 +305,6 @@ class sahara(metaclass=LogBase):
self.pktsize = None
self.init_loader_db()
self.info = self.__logger.info
self.debug = self.__logger.debug
self.error = self.__logger.error
self.warning = self.__logger.warning
self.__logger.setLevel(loglevel)
if loglevel == logging.DEBUG:
@ -441,6 +445,7 @@ class sahara(metaclass=LogBase):
try:
return unpack("<Q", res[0:0x8])[0]
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
return None
def cmdexec_get_pkhash(self):
@ -535,7 +540,7 @@ class sahara(metaclass=LogBase):
break
# print("Couldn't find a loader for given hwid and pkhash :(")
# exit(0)
elif self.hwidstr != None and self.pkhash != None:
elif self.hwidstr is not None and self.pkhash is not None:
msmid = self.hwidstr[:8]
for hwidstr in self.loaderdb:
if msmid == hwidstr[:8]:
@ -582,7 +587,8 @@ class sahara(metaclass=LogBase):
self.cdc.write(pack("<II", self.cmd.SAHARA_RESET_REQ, 0x8))
try:
cmd, pkt = self.get_rsp()
except: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
return False
if cmd["cmd"] == self.cmd.SAHARA_RESET_RSP:
return True
@ -607,6 +613,7 @@ class sahara(metaclass=LogBase):
try:
self.cdc.read(1, 1)
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
pass
if self.bit64:
if not self.cdc.write(pack("<IIQQ", self.cmd.SAHARA_64BIT_MEMORY_READ, 0x8 + 8 + 8, addr + pos,
@ -620,6 +627,7 @@ class sahara(metaclass=LogBase):
try:
tmp = self.cdc.read(length)
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
return None
length -= len(tmp)
pos += len(tmp)
@ -629,7 +637,7 @@ class sahara(metaclass=LogBase):
else:
data += tmp
if display:
prog = int(float(pos) / float(total) * float(100))
prog = round(float(pos) / float(total) * float(100), 1)
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
@ -676,7 +684,7 @@ class sahara(metaclass=LogBase):
if memory_table_length % pktsize == 0:
if memory_table_length != 0:
print(
f"Reading 64-Bit partition from {hex(memory_table_addr)} with length of " + \
f"Reading 64-Bit partition from {hex(memory_table_addr)} with length of " +
"{hex(memory_table_length)}")
ptbldata = self.read_memory(memory_table_addr, memory_table_length)
num_entries = len(ptbldata) // pktsize
@ -692,7 +700,7 @@ class sahara(metaclass=LogBase):
partitions.append(dict(desc=desc, filename=filename, mem_base=mem_base, length=length,
save_pref=save_pref))
print(
f"{filename}({desc}): Offset {hex(mem_base)}, Length {hex(length)}, " + \
f"{filename}({desc}): Offset {hex(mem_base)}, Length {hex(length)}, " +
"SavePref {hex(save_pref)}")
self.dump_partitions(partitions)
@ -744,7 +752,7 @@ class sahara(metaclass=LogBase):
try:
datalen = len(programmer)
done = False
while datalen > 0 or done == True:
while datalen > 0 or done:
cmd, pkt = self.get_rsp()
if cmd == -1 or pkt == -1:
if self.cmd_done():

View file

@ -1,13 +1,15 @@
#!/usr/bin/env python3
import sys
from struct import unpack
from queue import Queue
import logging
import sys
from queue import Queue
from struct import unpack
try:
from Library.utils import LogBase, print_progress
except Exception as e:
import os, sys, inspect
except ImportError as e:
import os
import sys
import inspect
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
@ -19,6 +21,7 @@ class QCSparse(metaclass=LogBase):
def __init__(self, filename, loglevel):
self.rf = open(filename, 'rb')
self.data = Queue()
self.__logger = self.__logger
self.offset = 0
self.tmpdata = bytearray()
self.__logger.setLevel(loglevel)
@ -227,7 +230,7 @@ if __name__ == "__main__":
wf.write(wdata)
prog = int(float(pos) / float(total) * float(100))
prog = round(float(pos) / float(total) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)'
% (pos // SECTOR_SIZE_IN_BYTES),

View file

@ -2,11 +2,13 @@ from struct import pack
from binascii import unhexlify
from Library.utils import *
from Library.hdlc import *
from Library.nand_config import BadFlags, SettingsOpt, nand_ids, nand_manuf_ids, nandregs, NandDevice
from Library.nand_config import BadFlags, SettingsOpt, nandregs, NandDevice
class Streaming(metaclass=LogBase):
def __init__(self, cdc, sahara, loglevel=logging.INFO):
self.__logger = self.__logger
self.regs = None
self.cdc = cdc
self.hdlc = hdlc(self.cdc)
self.mode = sahara.mode
@ -17,9 +19,9 @@ class Streaming(metaclass=LogBase):
self.nanddevice = None
self.nandbase = 0
self.__logger.setLevel(loglevel)
self.info=self.__logger.info
self.debug=self.__logger.debug
self.error=self.__logger.error
self.info = self.__logger.info
self.debug = self.__logger.debug
self.error = self.__logger.error
self.warning = self.__logger.warning
self.modules = None
self.Qualcomm = 0
@ -43,13 +45,9 @@ class Streaming(metaclass=LogBase):
self.regs.NAND_FLASH_CMD = 0x8000b
self.regs.NAND_EXEC_CMD = self.nanddevice.NAND_CMD_SOFT_RESET
self.nandwait()
dev_cfg0 = self.regs.NAND_DEV0_CFG0
dev_cfg1 = self.regs.NAND_DEV0_CFG1
dev_ecc_cfg = self.regs.NAND_DEV0_ECC_CFG
# dev_ecc1_cfg = self.regs.NAND_DEV1_ECC_CFG
# dev_cfg1_0 = self.regs.NAND_DEV1_CFG0
# dev_cfg1_1 = self.regs.NAND_DEV1_CFG1
# dev_cfg0 = self.regs.NAND_DEV0_CFG0
# dev_cfg1 = self.regs.NAND_DEV0_CFG1
# dev_ecc_cfg = self.regs.NAND_DEV0_ECC_CFG
"""
self.nand_reset()
@ -79,9 +77,12 @@ class Streaming(metaclass=LogBase):
self.regs.NAND_FLASH_CMD = self.nanddevice.NAND_CMD_PAGE_READ
self.regs.NAND_ADDR0 = 0
self.regs.NAND_ADDR1 = 0
self.regs.NAND_DEV0_CFG0 = 0 << self.nanddevice.CW_PER_PAGE | 512 << self.nanddevice.UD_SIZE_BYTES | 5 << self.nanddevice.NUM_ADDR_CYCLES | 0 << self.nanddevice.SPARE_SIZE_BYTES
self.regs.NAND_DEV1_CFG1 = 7 << self.nanddevice.NAND_RECOVERY_CYCLES | 0 << self.nanddevice.CS_ACTIVE_BSY | 17 << self.nanddevice.BAD_BLOCK_BYTE_NUM | \
1 << self.nanddevice.BAD_BLOCK_IN_SPARE_AREA | 2 << self.nanddevice.WR_RD_BSY_GAP | 0 << self.nanddevice.WIDE_FLASH | \
self.regs.NAND_DEV0_CFG0 = 0 << self.nanddevice.CW_PER_PAGE | 512 << self.nanddevice.UD_SIZE_BYTES | \
5 << self.nanddevice.NUM_ADDR_CYCLES | 0 << self.nanddevice.SPARE_SIZE_BYTES
self.regs.NAND_DEV1_CFG1 = 7 << self.nanddevice.NAND_RECOVERY_CYCLES | 0 << self.nanddevice.CS_ACTIVE_BSY | \
17 << self.nanddevice.BAD_BLOCK_BYTE_NUM | \
1 << self.nanddevice.BAD_BLOCK_IN_SPARE_AREA | 2 << self.nanddevice.WR_RD_BSY_GAP | \
0 << self.nanddevice.WIDE_FLASH | \
1 << self.nanddevice.DEV0_CFG1_ECC_DISABLE
self.regs.NAND_EBI2_ECC_BUF_CFG = 1 << self.nanddevice.ECC_CFG_ECC_DISABLE
self.regs.NAND_DEV_CMD_VLD = self.regs.NAND_DEV_CMD_VLD & ~(1 << self.nanddevice.READ_START_VLD)
@ -140,7 +141,7 @@ class Streaming(metaclass=LogBase):
self.mempoke(self.nanddevice.NAND_FLASH_BUFFER + i, 0xffffffff)
def secure_mode(self):
resp = self.send(b"\x17\x01", True)
self.send(b"\x17\x01", True)
return 0
def qclose(self, errmode):
@ -165,7 +166,7 @@ class Streaming(metaclass=LogBase):
def enter_flash_mode(self, ptable=None):
self.secure_mode()
self.qclose(0)
if ptable != None:
if ptable is None:
self.send_ptable(ptable, 0) # 1 for fullflash
def write_flash(self, partname, filename):
@ -211,7 +212,7 @@ class Streaming(metaclass=LogBase):
write_handle.write(data)
sector += sectorstoread
if info:
prog = int(float(sector) / float(sectors) * float(100))
prog = round(float(sector) / float(sectors) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)' % sector,
bar_length=50)
@ -342,7 +343,7 @@ class Streaming(metaclass=LogBase):
return self.memwrite(address, data)
def reset(self):
data = self.send(b"\x0B", True)
self.send(b"\x0B", True)
return True
def nandwait(self):
@ -692,6 +693,7 @@ class Streaming(metaclass=LogBase):
numberOfSectors = 0
sectorSizes = 0
featureBits = 0
hp = hellopacket()
info = b"\x01QCOM fast download protocol host\x03\x23\x23\x23\x20"
resp = self.send(info, True)
@ -742,7 +744,7 @@ class Streaming(metaclass=LogBase):
[0x11, 0x00, 0x12, 0x00, 0xa0, 0xe3, 0x00, 0x00, 0xc1, 0xe5, 0x01, 0x40, 0xa0, 0xe3, 0x1e, 0xff, 0x2f,
0xe1])
resp = self.send(cmdbuf, True)
resp2 = self.hdlc.receive_reply(5)
self.hdlc.receive_reply(5)
i = resp[1]
if i == 0x12:
# if not self.tst_loader():
@ -759,7 +761,7 @@ class Streaming(metaclass=LogBase):
return True
return True
else:
if not b"Invalid" in resp:
if b"Invalid" not in resp:
self.streaming_mode = self.Qualcomm
self.memread = self.qc_memread
self.settings = SettingsOpt(self, 0xFF)
@ -816,8 +818,9 @@ class Streaming(metaclass=LogBase):
markerpos = "spare" if self.nanddevice.BAD_BLOCK_IN_SPARE_AREA else "user"
self.info(
"Defective block marker position: %s+%x" % (markerpos, self.nanddevice.BAD_BLOCK_BYTE_NUM))
self.info("The total size of the flash memory = %u blocks (%i MB)" % (self.settings.MAXBLOCK,
self.settings.MAXBLOCK * self.settings.num_pages_per_blk / 1024 * self.settings.PAGESIZE / 1024))
self.info("The total size of the flash memory = %u blocks (%i MB)" %
(self.settings.MAXBLOCK, self.settings.MAXBLOCK *
self.settings.num_pages_per_blk / 1024 * self.settings.PAGESIZE / 1024))
val = resp[1].flashId.decode('utf-8') if resp[1].flashId[0] != 0x65 else ""
self.info("Flash memory: %s %s, %s" % (self.settings.flash_mfr, val, self.settings.flash_descr))
@ -863,7 +866,7 @@ class Streaming(metaclass=LogBase):
toread -= size
pos += size
if info:
prog = int(float(pos) / float(length) * float(100))
prog = round(float(pos) / float(length) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete (Offset: %08X)' % (offset + pos),
bar_length=50)
@ -896,7 +899,7 @@ class Streaming(metaclass=LogBase):
badblocks += 1
pos += self.settings.PAGESIZE
if info:
prog = int(float(pos) / float(totallength) * float(100))
prog = round(float(pos) / float(totallength) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
@ -988,7 +991,7 @@ class Streaming(metaclass=LogBase):
"faffff3a0000a0e3000000ea000090e504109de40100c1e5aa00a0e30000c1e50240a0e31eff2fe1efbeadde")
cmd = b"\x11\x00" + search
resp = self.send(cmd, True)
resp2 = self.hdlc.receive_reply(5)
self.hdlc.receive_reply(5)
if b"Power off not supported" in resp:
self.streaming_mode = self.Qualcomm
self.memread = self.qc_memread

View file

@ -10,14 +10,15 @@ from Library.utils import do_tcp_server, LogBase, getint
class streaming_client(metaclass=LogBase):
def __init__(self, arguments, cdc, sahara, loglevel, printer):
self.cdc = cdc
self.__logger = self.__logger
self.sahara = sahara
self.arguments = arguments
self.streaming = Streaming(cdc, sahara, loglevel)
self.printer = printer
self.__logger.setLevel(loglevel)
self.error=self.__logger.error
self.info=self.__logger.info
if loglevel==logging.DEBUG:
self.error = self.__logger.error
self.info = self.__logger.info
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
self.__logger.addHandler(fh)
@ -74,8 +75,8 @@ class streaming_client(metaclass=LogBase):
mode = options["<mode>"]
if self.streaming.connect(mode):
xflag = 0
res=self.streaming.hdlc.receive_reply(5)
if self.streaming.streaming_mode==self.streaming.Patched:
self.streaming.hdlc.receive_reply(5)
if self.streaming.streaming_mode == self.streaming.Patched:
self.streaming.nand_init(xflag)
if cmd == "gpt":
directory = options["<directory>"]
@ -119,18 +120,19 @@ class streaming_client(metaclass=LogBase):
self.error(f"Error: Couldn't detect partition: {partition}\nAvailable partitions:")
self.print_partitions(rpartitions)
elif cmd == "rs":
sector = getint(options["<start_sector>"]) #Page
sector = getint(options["<start_sector>"]) # Page
sectors = getint(options["<sectors>"])
filename = options["<filename>"]
self.printer(f"Dumping at Sector {hex(sector)} with Sectorcount {hex(sectors)}...")
if self.streaming.read_sectors(sector,sectors,filename,True):
if self.streaming.read_sectors(sector, sectors, filename, True):
self.printer(f"Dumped sector {str(sector)} with sector count {str(sectors)} as {filename}.")
elif cmd == "rf":
sector = 0
sectors = self.streaming.settings.MAXBLOCK*self.streaming.settings.num_pages_per_blk*self.streaming.settings.sectors_per_page
sectors = self.streaming.settings.MAXBLOCK * self.streaming.settings.num_pages_per_blk * \
self.streaming.settings.sectors_per_page
filename = options["<filename>"]
self.printer(f"Dumping Flash from sector 0 to sector {hex(sectors)}...")
if self.streaming.read_sectors(sector,sectors,filename,True):
if self.streaming.read_sectors(sector, sectors, filename, True):
self.printer(f"Dumped sector {str(sector)} with sector count {str(sectors)} as {filename}.")
elif cmd == "rl":
directory = options["<directory>"]
@ -170,7 +172,7 @@ class streaming_client(metaclass=LogBase):
offset = getint(options["<offset>"])
length = getint(options["<length>"])
filename = options["<filename>"]
if self.streaming.memtofile(offset,length,filename):
if self.streaming.memtofile(offset, length, filename):
self.info(
f"Peek data from offset {hex(offset)} and length {hex(length)} was written to {filename}")
elif cmd == "peekhex":
@ -292,7 +294,7 @@ class streaming_client(metaclass=LogBase):
return False
###############################
elif cmd == "nop":
#resp=self.streaming.send(b"\x7E\x09")
# resp=self.streaming.send(b"\x7E\x09")
self.error("Nop command isn't supported by streaming loader")
return True
elif cmd == "setbootablestoragedrive":
@ -306,7 +308,7 @@ class streaming_client(metaclass=LogBase):
return False
partitionname = options["<partitionname>"]
filename = options["<filename>"]
partitionfilename=""
partitionfilename = ""
if "--partitionfilename" in options:
partitionfilename = options["--partitionfilename"]
if not os.path.exists(partitionfilename):
@ -315,7 +317,7 @@ class streaming_client(metaclass=LogBase):
if not os.path.exists(filename):
self.error(f"Error: Couldn't find file: {filename}")
return False
if partitionfilename=="":
if partitionfilename == "":
rpartitions = self.streaming.get_partitions()
else:
rpartitions = self.streaming.get_partitions(partitionfilename)
@ -328,7 +330,8 @@ class streaming_client(metaclass=LogBase):
# attr2 = spartition["attr2"]
# attr3 = spartition["attr3"]
sectors = int(os.stat(
filename).st_size / self.streaming.settings.num_pages_per_blk / self.streaming.settings.PAGESIZE)
filename).st_size / self.streaming.settings.num_pages_per_blk /
self.streaming.settings.PAGESIZE)
if sectors > length:
self.error(
f"Error: {filename} has {sectors} sectors but partition only has {length}.")

View file

@ -34,6 +34,7 @@ def do_tcp_server(client, arguments, handler):
print('starting up on %s port %s' % server_address)
sock.bind(server_address)
sock.listen(1)
response = None
while True:
print('waiting for a connection')
connection, client_address = sock.accept()

View file

@ -1,7 +1,7 @@
import xml.etree.ElementTree as ET
class xmlparser():
class xmlparser:
def getresponse(self, input):
lines = input.split(b"<?xml")
content = {}