#!/usr/bin/env python3 # -*- coding: utf-8 -*- # (c) B.Kerler 2018-2023 under GPLv3 license # If you use my code, make sure you refer to my name # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! import binascii import io import os.path import platform import time import json from struct import unpack from binascii import hexlify from queue import Queue from threading import Thread from edlclient.Library.Modules.nothing import nothing from edlclient.Library.utils import * from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE, MAX_PRIORITY, PART_ATT_PRIORITY_BIT from edlclient.Library.gpt import PART_ATT_PRIORITY_VAL, PART_ATT_ACTIVE_VAL, PART_ATT_MAX_RETRY_COUNT_VAL, PART_ATT_SUCCESSFUL_VAL, PART_ATT_UNBOOTABLE_VAL from edlclient.Library.sparse import QCSparse from edlclient.Library.utils import progress from queue import Queue from threading import Thread rq = Queue() def writedata(filename, rq): pos = 0 with open(filename, "wb") as wf: while True: data = rq.get() if data is None: break pos += len(data) wf.write(data) rq.task_done() class response: resp = False data = b"" error = "" log = None def __init__(self, resp=False, data=b"", error: str = "", log: dict = ""): self.resp = resp self.data = data self.error = error self.log = log try: from edlclient.Library.Modules.init import modules except ImportError as e: pass class nand_partition: partentries = {} def __init__(self, parent, printer=None): if printer is None: self.printer = print else: self.printer = printer self.partentries = {} self.partitiontblsector = None self.parent = parent self.storage_info = {} self.totalsectors = None def parse(self, partdata): self.partentries = {} class partf: sector = 0 sectors = 0 name = "" attr1 = 0 attr2 = 0 attr3 = 0 which_flash = 0 magic1, magic2, version, numparts = unpack("<IIII", partdata[0:0x10]) if magic1 == 0x55EE73AA and magic2 == 0xE35EBDDB: data = partdata[0x10:] for i in range(numparts): name, offset, length, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB", data[i * 0x1C:(i * 0x1C) + 0x1C]) np = partf() if name[:2] == b"0:": name = name[2:] np.name = name.rstrip(b"\x00").decode('utf-8').lower() if self.parent.cfg.block_size == 0: np.sector = offset np.sectors = length else: np.sector = offset * self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES np.sectors = (length & 0xFFFF) * self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES np.attr1 = attr1 np.attr2 = attr2 np.attr3 = attr3 np.which_flash = which_flash self.partentries[np.name] = np if self.parent.cfg.block_size != 0 and self.parent.cfg.total_blocks != 0: self.totalsectors = (self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES) * \ self.parent.cfg.total_blocks else: sectors = 0 for part in self.partentries: if self.partentries[part].sector >= sectors: sectors += self.partentries[part].sectors self.totalsectors = sectors return True return False def print(self): self.printer("Name Offset\t\tLength\t\tAttr\t\t\tFlash") self.printer("-------------------------------------------------------------") for selpart in self.partentries: partition = self.partentries[selpart] name = partition.name for i in range(0x10 - len(partition.name)): name += " " offset = partition.sector * self.parent.cfg.SECTOR_SIZE_IN_BYTES length = partition.sectors * self.parent.cfg.SECTOR_SIZE_IN_BYTES attr1 = partition.attr1 attr2 = partition.attr2 attr3 = partition.attr3 which_flash = partition.which_flash self.printer( f"{name}\t%08X\t%08X\t{hex(attr1)}/{hex(attr2)}/{hex(attr3)}\t{which_flash}" % (offset, length)) def writefile(wf, q, stop): while True: data = q.get() if len(data) > 0: wf.write(data) q.task_done() if stop() and q.empty(): break class asyncwriter(): def __init__(self, wf): self.writequeue = Queue() self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,)) self.worker.setDaemon(True) self.stopthreads = False self.worker.start() def write(self, data): self.writequeue.put_nowait(data) def stop(self): self.stopthreads = True self.writequeue.join() class firehose(metaclass=LogBase): class cfg: TargetName = "" Version = "" ZLPAwareHost = 1 SkipStorageInit = 0 SkipWrite = 0 MaxPayloadSizeToTargetInBytes = 1048576 MaxPayloadSizeFromTargetInBytes = 8192 MaxXMLSizeInBytes = 4096 bit64 = True total_blocks = 0 num_physical = 0 block_size = 0 SECTOR_SIZE_IN_BYTES = 0 MemoryName = "eMMC" prod_name = "Unknown" maxlun = 99 def __init__(self, cdc, xml, cfg, loglevel, devicemodel, serial, skipresponse, luns, args): self.cdc = cdc self.lasterror = b"" self.loglevel = loglevel self.args = args self.xml = xml self.cfg = cfg self.prog = 0 self.progtime = 0 self.progpos = 0 self.pk = None self.modules = None self.serial = serial self.devicemodel = devicemodel self.skipresponse = skipresponse self.luns = luns self.supported_functions = [] self.lunsizes = {} self.info = self.__logger.info self.error = self.__logger.error self.debug = self.__logger.debug self.warning = self.__logger.warning self.__logger.setLevel(loglevel) if loglevel == logging.DEBUG: logfilename = "log.txt" fh = logging.FileHandler(logfilename) self.__logger.addHandler(fh) self.nandparttbl = None self.nandpart = nand_partition(parent=self, printer=print) def detect_partition(self, arguments, partitionname, send_full=False): if arguments is None: arguments = { "--gpt-num-part-entries" : 0, "--gpt-part-entry-size" : 0, "--gpt-part-entry-start-lba" : 0 } fpartitions = {} for lun in self.luns: lunname = "Lun" + str(lun) fpartitions[lunname] = [] data, guid_gpt = self.get_gpt(lun, int(arguments["--gpt-num-part-entries"]), int(arguments["--gpt-part-entry-size"]), int(arguments["--gpt-part-entry-start-lba"])) if guid_gpt is None: break else: if partitionname in guid_gpt.partentries: return [True, lun, data, guid_gpt] if send_full else [True, lun, guid_gpt.partentries[partitionname]] for part in guid_gpt.partentries: fpartitions[lunname].append(part) return [False, fpartitions] def getstatus(self, resp): if "value" in resp: value = resp["value"] if value == "ACK" or value == "true": return True else: return False return True def decoder(self, data): if isinstance(data, bytes) or isinstance(data, bytearray): if data[:5] == b"<?xml": try: rdata = "" for line in data.split(b"\n"): try: rdata += line.decode('utf-8') + "\n" except Exception as err: self.debug(str(err)) rdata += hexlify(line).decode('utf-8') + "\n" return rdata except Exception as err: # pylint: disable=broad-except self.debug(str(err)) pass return data def xmlsend(self, data, skipresponse=False) -> response: self.cdc.flush() self.cdc.xmlread = True if isinstance(data, bytes) or isinstance(data, bytearray): self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes]) else: self.cdc.write(bytes(data, 'utf-8')[:self.cfg.MaxXMLSizeInBytes]) rdata = bytearray() counter = 0 timeout = 3 if not skipresponse: while b"<response value" not in rdata: try: tmp = self.cdc.read(timeout=None) if tmp == b"" in rdata: counter += 1 time.sleep(0.05) if counter > timeout: break rdata += tmp except Exception as err: self.error(err) return response(resp=False, error=str(err)) try: if b"raw hex token" in rdata: rdata = rdata try: resp = self.xml.getresponse(rdata) status = self.getstatus(resp) if "rawmode" in resp: if resp["rawmode"] == "false": if status: log = self.xml.getlog(rdata) return response(resp=status, data=rdata, log=log) else: error = self.xml.getlog(rdata) return response(resp=status, error=error, data=resp, log=error) else: if status: if b"log value=" in rdata: log = self.xml.getlog(rdata) return response(resp=resp, data=rdata, log=log) return response(resp=status, data=rdata) except Exception as e: # pylint: disable=broad-except rdata = bytes(self.decoder(rdata), 'utf-8') resp = self.xml.getresponse(rdata) status = self.getstatus(resp) if status: return response(resp=True, data=resp) else: error = "" if b"<log value" in rdata: error = self.xml.getlog(rdata) return response(resp=False, error=error, data=resp) except Exception as err: self.debug(str(err)) if isinstance(rdata, bytes) or isinstance(rdata, bytearray): try: self.debug("Error on getting xml response:" + rdata.decode('utf-8')) except Exception as err: self.debug("Error on getting xml response:" + hexlify(rdata).decode('utf-8') + ", Error: " + str(err)) elif isinstance(rdata, str): self.debug("Error on getting xml response:" + rdata) return response(resp=False, error=rdata) else: return response(resp=True, data=rdata) def cmd_reset(self, mode="reset"): if mode is None: mode = "reset" data = "<?xml version=\"1.0\" ?><data><power value=\"" + mode + "\"/></data>" val = self.xmlsend(data) try: v = None while v != b'': v = self.cdc.read(timeout=None) if v != b'': resp = self.xml.getlog(v)[0] else: break print(resp) except Exception as err: self.error(str(err)) pass if val.resp: self.info("Reset succeeded.") return True else: self.error("Reset failed: " + val.error) return False def cmd_xml(self, filename): with open(filename, 'rb') as rf: data = rf.read() val = self.xmlsend(data) if val.resp: self.info("Command succeeded." + str(val.data)) return val.data else: self.error("Command failed:" + str(val.error)) return val.error def cmd_nop(self): data = "<?xml version=\"1.0\" ?><data><nop /></data>" resp = self.xmlsend(data, True) self.debug(resp.data.hex()) info = b"" tmp = None while tmp != b"": tmp = self.cdc.read(timeout=None) if tmp == b"": break info += tmp if info != b"": self.info("Nop succeeded.") return self.xml.getlog(info) else: self.error("Nop failed.") return False def cmd_getsha256digest(self, physical_partition_number, start_sector, num_partition_sectors): data = f"<?xml version=\"1.0\" ?><data><getsha256digest" + \ f" SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" num_partition_sectors=\"{num_partition_sectors}\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" start_sector=\"{start_sector}\"/>\n</data>" val = self.xmlsend(data) if val.resp: res = self.xml.getlog(val.data) for line in res: self.info(line) if "Digest " in res: return res.split("Digest ")[1] else: return res else: self.error("GetSha256Digest failed: " + val.error) return False def cmd_setbootablestoragedrive(self, partition_number): data = f"<?xml version=\"1.0\" ?><data>\n<setbootablestoragedrive value=\"{str(partition_number)}\" /></data>" val = self.xmlsend(data) if val.resp: self.info("Setbootablestoragedrive succeeded.") return True else: self.error("Setbootablestoragedrive failed: " + val.error) return False def cmd_send(self, content, responsexml=True): data = f"<?xml version=\"1.0\" ?><data>\n<{content} /></data>" if responsexml: val = self.xmlsend(data) if val.resp: return val.data else: self.error(f"{content} failed.") self.error(f"{val.error}") return val.error else: self.xmlsend(data, True) return True def cmd_patch(self, physical_partition_number, start_sector, byte_offset, value, size_in_bytes, display=True): """ <patch SECTOR_SIZE_IN_BYTES="512" byte_offset="16" filename="DISK" physical_partition_number="0" size_in_bytes="4" start_sector="NUM_DISK_SECTORS-1." value="0" what="Zero Out Header CRC in Backup Header."/> """ data = f"<?xml version=\"1.0\" ?><data>\n" + \ f"<patch SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" byte_offset=\"{byte_offset}\"" + \ f" filename=\"DISK\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" size_in_bytes=\"{size_in_bytes}\" " + \ f" start_sector=\"{start_sector}\" " + \ f" value=\"{value}\" " if self.modules is not None: data += self.modules.addpatch() data += f"/>\n</data>" rsp = self.xmlsend(data) if rsp.resp: if display: self.info(f"Patch:\n--------------------\n") self.info(rsp.data) return True else: self.error(f"Error:{rsp.error}") return False def wait_for_data(self): tmp = bytearray() timeout = 0 while b'response value' not in tmp: res = self.cdc.read(timeout=None) if res == b'': timeout += 1 if timeout == 4: break time.sleep(0.1) tmp += res return tmp def cmd_program(self, physical_partition_number, start_sector, filename, display=True): total = os.stat(filename).st_size sparse = QCSparse(filename, self.loglevel) sparseformat = False if sparse.readheader(): sparseformat = True total = sparse.getsize() bytestowrite = total progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES) with open(filename, "rb") as rf: # Make sure we fill data up to the sector size num_partition_sectors = total // self.cfg.SECTOR_SIZE_IN_BYTES if (total % self.cfg.SECTOR_SIZE_IN_BYTES) != 0: num_partition_sectors += 1 if display: self.info(f"\nWriting to physical partition {str(physical_partition_number)}, " + f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}") data = f"<?xml version=\"1.0\" ?><data>\n" + \ f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" num_partition_sectors=\"{num_partition_sectors}\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" start_sector=\"{start_sector}\" " if self.modules is not None: data += self.modules.addprogram() data += f"/>\n</data>" rsp = self.xmlsend(data, self.skipresponse) progbar.show_progress(prefix="Write", pos=0, total=total, display=display) if rsp.resp: while bytestowrite > 0: wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes) if sparseformat: wdata = sparse.read(wlen) else: wdata = rf.read(wlen) bytestowrite -= wlen if wlen % self.cfg.SECTOR_SIZE_IN_BYTES != 0: filllen = (wlen // self.cfg.SECTOR_SIZE_IN_BYTES * self.cfg.SECTOR_SIZE_IN_BYTES) + \ self.cfg.SECTOR_SIZE_IN_BYTES wdata += b"\x00" * (filllen - wlen) self.cdc.write(wdata) progbar.show_progress(prefix="Write", pos=total - bytestowrite, total=total, display=display) self.cdc.write(b'') # time.sleep(0.2) wd = self.wait_for_data() log = self.xml.getlog(wd) rsp = self.xml.getresponse(wd) if "value" in rsp: if rsp["value"] != "ACK": self.error(f"Error:") for line in log: self.error(line) return False else: self.error(f"Error:{rsp}") return False return True def cmd_program_buffer(self, physical_partition_number, start_sector, wfdata, display=True): bytestowrite = len(wfdata) total = bytestowrite # Make sure we fill data up to the sector size num_partition_sectors = bytestowrite // self.cfg.SECTOR_SIZE_IN_BYTES if (bytestowrite % self.cfg.SECTOR_SIZE_IN_BYTES) != 0: num_partition_sectors += 1 if display: self.info(f"\nWriting to physical partition {str(physical_partition_number)}, " + f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}") data = f"<?xml version=\"1.0\" ?><data>\n" + \ f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" num_partition_sectors=\"{num_partition_sectors}\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" start_sector=\"{start_sector}\" " if self.modules is not None: data += self.modules.addprogram() data += f"/>\n</data>" rsp = self.xmlsend(data, self.skipresponse) progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES) progbar.show_progress(prefix="Write", pos=0, total=total, display=display) if rsp.resp: pos = 0 while bytestowrite > 0: wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes) wrdata = wfdata[pos:pos + wlen] pos += wlen bytestowrite -= wlen if wlen % self.cfg.SECTOR_SIZE_IN_BYTES != 0: filllen = (wlen // self.cfg.SECTOR_SIZE_IN_BYTES * self.cfg.SECTOR_SIZE_IN_BYTES) + \ self.cfg.SECTOR_SIZE_IN_BYTES wrdata += b"\x00" * (filllen - wlen) self.cdc.write(wrdata) progbar.show_progress(prefix="Write", pos=total - bytestowrite, total=total, display=display) self.cdc.write(b'') # time.sleep(0.2) wd = self.wait_for_data() log = self.xml.getlog(wd) rsp = self.xml.getresponse(wd) if "value" in rsp: if rsp["value"] != "ACK": self.error(f"Error:") for line in log: self.error(line) return False else: self.error(f"Error:{rsp}") return False else: self.error(f"Error:{rsp.error}") return True def cmd_erase(self, physical_partition_number, start_sector, num_partition_sectors, display=True): if display: self.info(f"\nErasing from physical partition {str(physical_partition_number)}, " + f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}") data = f"<?xml version=\"1.0\" ?><data>\n" + \ f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" num_partition_sectors=\"{num_partition_sectors}\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" start_sector=\"{start_sector}\" " if self.modules is not None: data += self.modules.addprogram() data += f"/>\n</data>" rsp = self.xmlsend(data, self.skipresponse) empty = b"\x00" * self.cfg.MaxPayloadSizeToTargetInBytes pos = 0 bytestowrite = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors total = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors progbar = progress(self.cfg.MaxPayloadSizeToTargetInBytes) progbar.show_progress(prefix="Erase", pos=0, total=total, display=display) if rsp.resp: while bytestowrite > 0: wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes) self.cdc.write(empty[:wlen]) progbar.show_progress(prefix="Erase", pos=total - bytestowrite, total=total, display=display) bytestowrite -= wlen pos += wlen self.cdc.write(b'') res = self.wait_for_data() info = self.xml.getlog(res) rsp = self.xml.getresponse(res) if "value" in rsp: if rsp["value"] != "ACK": self.error(f"Error:") for line in info: self.error(line) return False else: self.error(f"Error:{rsp}") return False else: self.error(f"Error:{rsp.error}") return False return True def cmd_read(self, physical_partition_number, start_sector, num_partition_sectors, filename, display=True): global rq self.lasterror = b"" progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES) if display: self.info( f"\nReading from physical partition {str(physical_partition_number)}, " + f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}") data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" num_partition_sectors=\"{num_partition_sectors}\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" start_sector=\"{start_sector}\"/>\n</data>" rsp = self.xmlsend(data, self.skipresponse) self.cdc.xmlread = False time.sleep(0.01) if not rsp.resp: if display: self.error(rsp.error) return b"" else: bytestoread = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors total = bytestoread show_progress = progbar.show_progress usb_read = self.cdc.read progbar.show_progress(prefix="Read", pos=0, total=total, display=display) worker = Thread(target=writedata, args=(filename, rq), daemon=True) worker.start() while bytestoread > 0: if self.cdc.is_serial: maxsize = self.cfg.MaxPayloadSizeFromTargetInBytes else: maxsize = 5 * 1024 * 1024 size = min(maxsize, bytestoread) data = usb_read(size) if len(data) > 0: rq.put(data) bytestoread -= len(data) show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display) rq.put(None) worker.join(60) self.cdc.xmlread = True wd = self.wait_for_data() info = self.xml.getlog(wd) rsp = self.xml.getresponse(wd) if "value" in rsp: if rsp["value"] != "ACK": if bytestoread!=0: self.error(f"Error:") for line in info: self.error(line) self.lasterror += bytes(line + "\n", "utf-8") return False else: if display: self.error(f"Error:{rsp[2]}") return False return True def cmd_read_buffer(self, physical_partition_number, start_sector, num_partition_sectors, display=True): self.lasterror = b"" prog = 0 if display: self.info( f"\nReading from physical partition {str(physical_partition_number)}, " + f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}") print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \ f" num_partition_sectors=\"{num_partition_sectors}\"" + \ f" physical_partition_number=\"{physical_partition_number}\"" + \ f" start_sector=\"{start_sector}\"/>\n</data>" progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES) rsp = self.xmlsend(data, self.skipresponse) self.cdc.xmlread = False resData = bytearray() if not rsp.resp: if display: self.error(rsp.error) return rsp else: bytestoread = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors total = bytestoread if display: progbar.show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display) while bytestoread > 0: tmp = self.cdc.read(min(self.cdc.maxsize, bytestoread)) size = len(tmp) bytestoread -= size resData.extend(tmp) progbar.show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display) self.cdc.xmlread = True wd = self.wait_for_data() info = self.xml.getlog(wd) rsp = self.xml.getresponse(wd) if "value" in rsp: if rsp["value"] != "ACK": self.error(f"Error:") for line in info: self.error(line) return response(resp=False, data=resData, error=info) elif "rawmode" in rsp: if rsp["rawmode"] == "false": return response(resp=True, data=resData) else: if len(rsp) > 1: if b"Failed to open the UFS Device" in rsp[2]: self.error(f"Error:{rsp[2]}") self.lasterror = rsp[2] return response(resp=False, data=resData, error=rsp[2]) if rsp["value"] != "ACK": self.lasterror = rsp[2] if display and prog != 100: progbar.show_progress(prefix="Read", pos=total, total=total, display=display) resp = rsp["value"] == "ACK" return response(resp=resp, data=resData, error=rsp[2]) # Do not remove, needed for oneplus # TODO: this should be able to get backup gpt as well def get_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba, start_sector=1): try: resp = self.cmd_read_buffer(lun, 0, 1, False) except Exception as err: self.debug(str(err)) self.skipresponse = True resp = self.cmd_read_buffer(lun, 0, 1, False) if not resp.resp: for line in resp.error: self.error(line) return None, None data = resp.data magic = unpack("<I", data[0:4])[0] data += self.cmd_read_buffer(lun, start_sector, 1, False).data if magic == 0x844bdcd1: self.info("Nand storage detected.") self.info("Scanning for partition table ...") progbar = progress(1) if self.nandpart.partitiontblsector is None: sector = 0x280 progbar.show_progress(prefix="Scanning", pos=sector, total=1024, display=True) resp = self.cmd_read_buffer(0, sector, 1, False) if resp.resp: if resp.data[0:8] in [b"\xac\x9f\x56\xfe\x7a\x12\x7f\xcd", b"\xAA\x73\xEE\x55\xDB\xBD\x5E\xE3"]: progbar.show_progress(prefix="Scanning", pos=1024, total=1024, display=True) self.nandpart.partitiontblsector = sector self.info(f"Found partition table at sector {sector} :)") else: self.error("Error on reading partition table data") return None, None if self.nandpart.partitiontblsector is not None: resp = self.cmd_read_buffer(0, self.nandpart.partitiontblsector + 1, 2, False) if resp.resp: if self.nandpart.parse(resp.data): return resp.data, self.nandpart else: self.error("Couldn't find partition table, but command \"rs\" might still work !") sys.exit(0) return None, None else: data = resp.data guid_gpt = gpt( num_part_entries=gpt_num_part_entries, part_entry_size=gpt_part_entry_size, part_entry_start_lba=gpt_part_entry_start_lba, loglevel=self.__logger.level ) try: sectorsize = self.cfg.SECTOR_SIZE_IN_BYTES header = guid_gpt.parseheader(data, sectorsize) #if not primary: # print(f"signature: {header.signature}") # print(f"revision: {header.revision}") # print(f"header size: {header.header_size}") # print(f"crc32: {header.crc32}") # print(f"reserved: {header.reserved}") # print(f"current_lba: {header.current_lba}") # print(f"backup_lba: {header.backup_lba}") # print(f"first_usable_lba: {header.first_usable_lba}") # print(f"last_usable_lba: {header.last_usable_lba}") # print(f"disk_guid: {header.disk_guid}") # print(f"part_entry_stzrt_lba: {header.part_entry_start_lba}") # print(f"num_part_entries: {header.num_part_entries}") # print(f"part_entry_size: {header.part_entry_size}") # print(f"crc32_part_entries: {header.crc32_part_entries}") # return None, None if header.signature == b"EFI PART": part_table_size = header.num_part_entries * header.part_entry_size sectors = part_table_size // self.cfg.SECTOR_SIZE_IN_BYTES if part_table_size % self.cfg.SECTOR_SIZE_IN_BYTES != 0: sectors += 1 if sectors == 0: return None, None if sectors > 64: sectors = 64 data += self.cmd_read_buffer(lun, header.part_entry_start_lba, sectors, False).data if data == b"": return None, None guid_gpt.parse(data, self.cfg.SECTOR_SIZE_IN_BYTES) return data, guid_gpt else: return None, None except Exception as err: self.debug(str(err)) return None, None def get_backup_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba): resp = self.cmd_read_buffer(lun, 0, 2, False) if not resp.resp: self.error("Error on reading backup gpt") return None guid_gpt = gpt( num_part_entries=gpt_num_part_entries, part_entry_size=gpt_part_entry_size, part_entry_start_lba=gpt_part_entry_start_lba, loglevel=self.__logger.level ) header = guid_gpt.parseheader(resp.data, self.cfg.SECTOR_SIZE_IN_BYTES) if "backup_lba" in header: sectors = header.first_usable_lba - 1 data = self.cmd_read_buffer(lun, header.backup_lba, sectors, False) if data == b"": return None return data else: return None def calc_offset(self, sector, offset): sector = sector + (offset // self.cfg.SECTOR_SIZE_IN_BYTES) offset = offset % self.cfg.SECTOR_SIZE_IN_BYTES return sector, offset def getluns(self, argument): if argument["--lun"] is not None: return [int(argument["--lun"])] luns = [] if self.cfg.MemoryName.lower() == "ufs": for i in range(0, self.cfg.maxlun): luns.append(i) else: luns = [0] return luns def configure(self, lvl): if self.cfg.SECTOR_SIZE_IN_BYTES == 0: if self.cfg.MemoryName.lower() == "emmc": self.cfg.SECTOR_SIZE_IN_BYTES = 512 else: self.cfg.SECTOR_SIZE_IN_BYTES = 4096 connectcmd = f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>" + \ f"<configure MemoryName=\"{self.cfg.MemoryName}\" " + \ f"Verbose=\"0\" " + \ f"AlwaysValidate=\"0\" " + \ f"MaxDigestTableSizeInBytes=\"2048\" " + \ f"MaxPayloadSizeToTargetInBytes=\"{str(self.cfg.MaxPayloadSizeToTargetInBytes)}\" " + \ f"ZLPAwareHost=\"{str(self.cfg.ZLPAwareHost)}\" " + \ f"SkipStorageInit=\"{str(int(self.cfg.SkipStorageInit))}\" " + \ f"SkipWrite=\"{str(int(self.cfg.SkipWrite))}\"/>" + \ "</data>" ''' "<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data><response value=\"ACK\" MinVersionSupported=\"1\"" \ "MemoryName=\"eMMC\" MaxPayloadSizeFromTargetInBytes=\"4096\" MaxPayloadSizeToTargetInBytes=\"1048576\" " \ "MaxPayloadSizeToTargetInBytesSupported=\"1048576\" MaxXMLSizeInBytes=\"4096\" Version=\"1\" TargetName=\"8953\" />" \ "</data>" ''' rsp = self.xmlsend(connectcmd) if not rsp.resp: if rsp.error == "": try: if "MemoryName" in rsp.data: self.cfg.MemoryName = rsp.data["MemoryName"] except TypeError: self.warning("!DEBUG! rsp.data: '%s'" % (rsp.data,)) return self.configure(lvl + 1) if "MaxPayloadSizeFromTargetInBytes" in rsp.data: self.cfg.MaxPayloadSizeFromTargetInBytes = int(rsp.data["MaxPayloadSizeFromTargetInBytes"]) if "MaxPayloadSizeToTargetInBytes" in rsp.data: self.cfg.MaxPayloadSizeToTargetInBytes = int(rsp.data["MaxPayloadSizeToTargetInBytes"]) if "MaxPayloadSizeToTargetInBytesSupported" in rsp.data: self.cfg.MaxPayloadSizeToTargetInBytesSupported = int( rsp.data["MaxPayloadSizeToTargetInBytesSupported"]) if "TargetName" in rsp.data: self.cfg.TargetName = rsp.data["TargetName"] return self.configure(lvl + 1) for line in rsp.error: if "Not support configure MemoryName eMMC" in line: self.info("eMMC is not supported by the firehose loader. Trying UFS instead.") self.cfg.MemoryName = "UFS" return self.configure(lvl + 1) elif "Only nop and sig tag can be" in line: self.info("Xiaomi EDL Auth detected.") try: self.modules = modules(fh=self, serial=self.serial, supported_functions=self.supported_functions, loglevel=self.__logger.level, devicemodel=self.devicemodel, args=self.args) except Exception as err: # pylint: disable=broad-except self.modules = None if self.modules.edlauth(): rsp = self.xmlsend(connectcmd) return rsp.resp else: self.error("Error on EDL Authentification") return False elif "MaxPayloadSizeToTargetInBytes" in rsp.data: try: self.cfg.MemoryName = rsp.data["MemoryName"] self.cfg.MaxPayloadSizeToTargetInBytes = int(rsp.data["MaxPayloadSizeToTargetInBytes"]) self.cfg.MaxPayloadSizeToTargetInBytesSupported = int( rsp.data["MaxPayloadSizeToTargetInBytesSupported"]) if "MaxXMLSizeInBytes" in rsp.data: self.cfg.MaxXMLSizeInBytes = int(rsp.data["MaxXMLSizeInBytes"]) else: self.cfg.MaxXMLSizeInBytes = 4096 if "MaxPayloadSizeFromTargetInBytes" in rsp.data: self.cfg.MaxPayloadSizeFromTargetInBytes = int(rsp.data["MaxPayloadSizeFromTargetInBytes"]) else: self.cfg.MaxPayloadSizeFromTargetInBytes = 4096 if "TargetName" in rsp.data: self.cfg.TargetName = rsp.data["TargetName"] else: self.cfg.TargetName = "Unknown" if "MSM" not in self.cfg.TargetName: self.cfg.TargetName = "MSM" + self.cfg.TargetName if "Version" in rsp.data: self.cfg.Version = rsp.data["Version"] else: self.cfg.Version = "Unknown" if lvl == 0: return self.configure(lvl + 1) else: self.error(f"Error:{rsp}") sys.exit() except Exception as e: pass elif "ERROR" in line or "WARN" in line: if "ERROR" in line: self.error(line) sys.exit() elif "WARN" in line: self.warning(line) else: info = self.cdc.read(timeout=1) if isinstance(rsp.resp, dict): field = rsp.resp if "MemoryName" not in field: # print(rsp[1]) field["MemoryName"] = "eMMC" if "MaxXMLSizeInBytes" not in field: field["MaxXMLSizeInBytes"] = "4096" self.warning("Couldn't detect MaxPayloadSizeFromTargetinBytes") if "MaxPayloadSizeToTargetInBytes" not in field: field["MaxPayloadSizeToTargetInBytes"] = "1038576" if "MaxPayloadSizeToTargetInBytesSupported" not in field: field["MaxPayloadSizeToTargetInBytesSupported"] = "1038576" if field["MemoryName"].lower() != self.cfg.MemoryName.lower(): self.warning("Memory type was set as " + self.cfg.MemoryName + " but device reported it is " + field["MemoryName"] + " instead.") self.cfg.MemoryName = field["MemoryName"] if "MaxPayloadSizeToTargetInBytes" in field: self.cfg.MaxPayloadSizeToTargetInBytes = int(field["MaxPayloadSizeToTargetInBytes"]) else: self.cfg.MaxPayloadSizeToTargetInBytes = 1048576 if "MaxPayloadSizeToTargetInBytesSupported" in field: self.cfg.MaxPayloadSizeToTargetInBytesSupported = int( field["MaxPayloadSizeToTargetInBytesSupported"]) else: self.cfg.MaxPayloadSizeToTargetInBytesSupported = 1048576 if "MaxXMLSizeInBytes" in field: self.cfg.MaxXMLSizeInBytes = int(field["MaxXMLSizeInBytes"]) else: self.cfg.MaxXMLSizeInBytes = 4096 if "MaxPayloadSizeFromTargetInBytes" in field: self.cfg.MaxPayloadSizeFromTargetInBytes = int(field["MaxPayloadSizeFromTargetInBytes"]) else: self.cfg.MaxPayloadSizeFromTargetInBytes = self.cfg.MaxXMLSizeInBytes self.warning("Couldn't detect MaxPayloadSizeFromTargetinBytes") if "TargetName" in field: self.cfg.TargetName = field["TargetName"] if "MSM" not in self.cfg.TargetName: self.cfg.TargetName = "MSM" + self.cfg.TargetName else: self.cfg.TargetName = "Unknown" self.warning("Couldn't detect TargetName") if "Version" in field: self.cfg.Version = field["Version"] else: self.cfg.Version = 0 self.warning("Couldn't detect Version") self.info(f"TargetName={self.cfg.TargetName}") self.info(f"MemoryName={self.cfg.MemoryName}") self.info(f"Version={self.cfg.Version}") self.info("Trying to read first storage sector...") rsp = self.cmd_read_buffer(0, 1, 1, False) self.info("Running configure...") if not rsp.resp and self.args["--memory"] is None: for line in rsp.error: if "Failed to set the IO options" in line: self.warning( "Memory type eMMC doesn't seem to match (Failed to init). Trying to use NAND instead.") self.cfg.MemoryName = "nand" return self.configure(0) elif "Failed to open the SDCC Device" in line: self.warning( "Memory type eMMC doesn't seem to match (Failed to init). Trying to use UFS instead.") self.cfg.MemoryName = "UFS" return self.configure(0) elif "Failed to initialize (open whole lun) UFS Device slot" in line: self.warning( "Memory type UFS doesn't seem to match (Failed to init). Trying to use eMMC instead.") self.cfg.MemoryName = "eMMC" return self.configure(0) elif "Attribute \'SECTOR_SIZE_IN_BYTES\'=4096 must be equal to disk sector size 512" in line \ or "different from device sector size (512)" in line: self.cfg.SECTOR_SIZE_IN_BYTES = 512 return self.configure(0) elif "Attribute \'SECTOR_SIZE_IN_BYTES\'=512 must be equal to disk sector size 4096" in line \ or "different from device sector size (4096)" in line: self.cfg.SECTOR_SIZE_IN_BYTES = 4096 return self.configure(0) self.parse_storage() for function in self.supported_functions: if function == "checkntfeature": if type(self.devicemodel)==list: self.devicemodel=self.devicemodel[0] self.nothing = nothing(fh=self, projid=self.devicemodel, serial=self.serial, supported_functions=self.supported_functions, loglevel=self.loglevel) if self.nothing is not None: self.nothing.ntprojectverify() self.luns = self.getluns(self.args) return True def getlunsize(self, lun): if lun not in self.lunsizes: try: data, guid_gpt = self.get_gpt(lun, int(self.args["--gpt-num-part-entries"]), int(self.args["--gpt-part-entry-size"]), int(self.args["--gpt-part-entry-start-lba"])) self.lunsizes[lun] = guid_gpt.totalsectors except Exception as err: self.error(err) return -1 else: return self.lunsizes[lun] return guid_gpt.totalsectors def get_supported_functions(self): supfunc = False info = self.cmd_nop() if not info: self.info("No supported functions detected, configuring qc generic commands") self.supported_functions = ['configure', 'program', 'firmwarewrite', 'patch', 'setbootablestoragedrive', 'ufs', 'emmc', 'power', 'benchmark', 'read', 'getstorageinfo', 'getcrc16digest', 'getsha256digest', 'erase', 'peek', 'poke', 'nop', 'xml'] else: self.supported_functions = [] for line in info: if "chip serial num" in line.lower(): self.info(line) try: serial = line.split("0x")[1][:-1] self.serial = int(serial, 16) except Exception as err: # pylint: disable=broad-except self.debug(str(err)) serial = line.split(": ")[2] self.serial = int(serial.split(" ")[0]) if supfunc and "end of supported functions" not in line.lower(): rs = line.replace("\n", "") if rs != "": rs = rs.replace("INFO: ", "") self.supported_functions.append(rs) if "supported functions" in line.lower(): supfunc = True if len(self.supported_functions) > 1: info = "Supported Functions: " for line in self.supported_functions: info += line + "," self.info(info[:-1]) data = self.cdc.read(timeout=None) try: self.info(data.decode('utf-8')) except Exception as err: # pylint: disable=broad-except self.debug(str(err)) pass if not self.supported_functions: self.supported_functions = ['configure', 'program', 'firmwarewrite', 'patch', 'setbootablestoragedrive', 'ufs', 'emmc', 'power', 'benchmark', 'read', 'getstorageinfo', 'getcrc16digest', 'getsha256digest', 'erase', 'peek', 'poke', 'nop', 'xml'] def connect(self): v = b'-1' if platform.system() == 'Windows': self.cdc.timeout = 50 elif platform.system() == 'Darwin': # must ensure the timeout is enough to fill the buffer we alloc # which is 1MB, othwise some data are dropped in the underlying usb libraries self.cdc.timeout = 50 else: self.cdc.timeout = 50 info = [] while v != b'': try: v = self.cdc.read(timeout=None) if (b"response" in v and b"</data>" in v) or v == b'': break data = self.xml.getlog(v) if len(data) > 0: info.append(data[0]) if not info: break except Exception as err: # pylint: disable=broad-except pass if info == [] or (len(info) > 0 and 'ERROR' in info[0]): if len(info) > 0: self.debug(info[0]) if len(info) > 0: supfunc = False for line in info: self.info(line) if "chip serial num" in line.lower(): try: serial = line.split("0x")[1][:-1] if ")" in serial: serial=serial[:serial.rfind(")")] self.serial = int(serial, 16) except Exception as err: # pylint: disable=broad-except self.debug(str(err)) serial = line.split(": ")[2] self.serial = int(serial.split(" ")[0]) if supfunc and "end of supported functions" not in line.lower(): rs = line.replace("\n", "") if rs != "": rs = rs.replace("INFO: ", "") self.supported_functions.append(rs) if "supported functions" in line.lower(): supfunc = True if "program" in line.lower(): idx = line.find("Functions: ") if idx != -1: v = line[idx + 11:].split(" ") for val in v: if val != "": self.supported_functions.append(val) supfunc = False try: if os.path.exists(self.cfg.programmer): data = open(self.cfg.programmer, "rb").read() for cmd in [b"demacia", b"setprojmodel", b"setswprojmodel", b"setprocstart", b"SetNetType", b"checkntfeature"]: if cmd in data: self.supported_functions.append(cmd.decode('utf-8')) state = { "supported_functions": self.supported_functions, "programmer": self.cfg.programmer, "serial": self.serial } if os.path.exists("edl_config.json"): data = json.loads(open("edl_config.json","rb").read().decode('utf-8')) if "serial" in data and data["serial"]!=state["serial"]: open("edl_config.json", "w").write(json.dumps(state)) else: self.supported_functions = data["supported_functions"] self.cfg.programmer = data["programmer"] else: open("edl_config.json", "w").write(json.dumps(state)) if "001920e101cf0000_fa2836525c2aad8a_fhprg.bin" in self.cfg.programmer: self.devicemodel = '20111' elif "000b80e100020000_467f3020c4cc788d_fhprg.bin" in self.cfg.programmer: self.devicemodel = '22111' except: pass elif self.serial is None or self.supported_functions is []: try: if os.path.exists("edl_config.json"): pinfo = json.loads(open("edl_config.json", "rb").read()) if not self.supported_functions: if "supported_functions" in pinfo: self.supported_functions = pinfo["supported_functions"] if self.serial is None: if "serial" in pinfo: self.serial = pinfo["serial"] else: self.get_supported_functions() except: self.get_supported_functions() pass # rsp = self.xmlsend(data, self.skipresponse) return self.supported_functions def parse_storage(self): storageinfo = self.cmd_getstorageinfo() if storageinfo is None or storageinfo.resp and len(storageinfo.data) == 0: return False info = storageinfo.data if "UFS Inquiry Command Output" in info: self.cfg.prod_name = info["UFS Inquiry Command Output"] self.info(info) if "UFS Erase Block Size" in info: self.cfg.block_size = int(info["UFS Erase Block Size"], 16) self.info(info) self.cfg.MemoryName = "UFS" self.cfg.SECTOR_SIZE_IN_BYTES = 4096 if "UFS Boot Partition Enabled" in info: self.info(info["UFS Boot Partition Enabled"]) if "UFS Total Active LU" in info: self.cfg.maxlun = int(info["UFS Total Active LU"], 16) if "SECTOR_SIZE_IN_BYTES" in info: self.cfg.SECTOR_SIZE_IN_BYTES = int(info["SECTOR_SIZE_IN_BYTES"]) if "num_physical_partitions" in info: self.cfg.num_physical = int(info["num_physical_partitions"]) return True # OEM Stuff here below -------------------------------------------------- def cmd_writeimei(self, imei): if len(imei) != 16: self.info("IMEI must be 16 digits") return False data = "<?xml version=\"1.0\" ?><data><writeIMEI len=\"16\"/></data>" val = self.xmlsend(data) if val.resp: self.info("writeIMEI succeeded.") return True else: self.error("writeIMEI failed.") return False def cmd_getstorageinfo(self): data = "<?xml version=\"1.0\" ?><data><getstorageinfo physical_partition_number=\"0\"/></data>" val = self.xmlsend(data) if val.data == '' and val.log == '' and val.resp: return None if isinstance(val.data, dict): if "bNumberLu" in val.data: self.cfg.maxlun = int(val.data["bNumberLu"]) if val.resp: if val.log is not None: res = {} for value in val.log: v = value.split("=") if len(v) > 1: res[v[0]] = v[1] else: if "\"storage_info\"" in value: try: info = value.replace("INFO:", "") si = json.loads(info)["storage_info"] except Exception as err: # pylint: disable=broad-except self.debug(str(err)) continue self.info("Storage report:") for sii in si: self.info(f"{sii}:{si[sii]}") if "total_blocks" in si: self.cfg.total_blocks = si["total_blocks"] if "num_physical" in si: self.cfg.num_physical = si["num_physical"] self.cfg.maxlun = self.cfg.num_physical if "block_size" in si: self.cfg.block_size = si["block_size"] if "page_size" in si: self.cfg.SECTOR_SIZE_IN_BYTES = si["page_size"] if "mem_type" in si: self.cfg.MemoryName = si["mem_type"] if "prod_name" in si: self.cfg.prod_name = si["prod_name"] else: v = value.split(":") if len(v) > 1: res[v[0]] = v[1].lstrip(" ") return response(resp=val.resp, data=res) return response(resp=val.resp, data=val.data) else: if val.error: for v in val.error: if "Failed to open the SDCC Device" in v: self.cfg.MemoryName = "ufs" self.configure(0) return self.cmd_getstorageinfo() self.warning("GetStorageInfo command isn't supported.") return None # 1. check for integrity of the primary header and integrity of backup header # Yes?: update primary header + update backup header # No? : which one is corrupted? --> update the corrupted one with the other one # 2. how to fix corruption # -> fix header (patch) # -> fix partition table (write) def check_gpt_integrity(self, gptData, guid_gpt): primary_header = guid_gpt.header def cmd_setactiveslot(self, slot: str): # flags: 0x3a for inactive and 0x6f for active boot partition def set_flags(flags, active, is_boot): new_flags = flags if active: if is_boot: #new_flags |= (PART_ATT_PRIORITY_VAL | PART_ATT_ACTIVE_VAL | PART_ATT_MAX_RETRY_COUNT_VAL) #new_flags &= (~PART_ATT_SUCCESSFUL_VAL & ~PART_ATT_UNBOOTABLE_VAL) new_flags = 0x6f << (AB_FLAG_OFFSET*8) else: new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8) else: if is_boot: #new_flags &= (~PART_ATT_PRIORITY_VAL & ~PART_ATT_ACTIVE_VAL) #new_flags |= ((MAX_PRIORITY-1) << PART_ATT_PRIORITY_BIT) new_flags = 0x3a << (AB_FLAG_OFFSET*8) else: new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8)) return new_flags def patch_helper(gpt_data_a, gpt_data_b, guid_gpt, partition_a, partition_b, slot_a_status, slot_b_status, is_boot): part_entry_size = guid_gpt.header.part_entry_size rf_a = BytesIO(gpt_data_a) rf_b = BytesIO(gpt_data_b) rf_a.seek(partition_a.entryoffset) rf_b.seek(partition_b.entryoffset) sdata_a = rf_a.read(part_entry_size) sdata_b = rf_b.read(part_entry_size) partentry_a = gpt.gpt_partition(sdata_a) partentry_b = gpt.gpt_partition(sdata_b) partentry_a.flags = set_flags(partentry_a.flags, slot_a_status, is_boot) partentry_b.flags = set_flags(partentry_b.flags, slot_b_status, is_boot) partentry_a.type, partentry_b.type = partentry_b.type, partentry_a.type pdata_a, pdata_b = partentry_a.create(), partentry_b.create() return pdata_a, partition_a.entryoffset, pdata_b, partition_b.entryoffset def cmd_patch_multiple(lun, start_sector, byte_offset, patch_data): offset = 0 size_each_patch = 4 write_size = len(patch_data) for i in range(0, write_size, size_each_patch): pdata_subset = int(unpack("<I", patch_data[offset:offset+size_each_patch])[0]) self.cmd_patch( lun, start_sector, \ byte_offset + offset, \ pdata_subset, \ size_each_patch, True) offset += size_each_patch return True def update_gpt_info(guid_gpt_a, guid_gpt_b, partitionname_a, partitionname_b, gpt_data_a, gpt_data_b, slot_a_status, slot_b_status, lun_a, lun_b ): part_a = guid_gpt_a.partentries[partitionname_a] part_b = guid_gpt_b.partentries[partitionname_b] is_boot = False if partitionname_a == "boot_a": is_boot = True pdata_a, poffset_a, pdata_b, poffset_b = patch_helper( gpt_data_a, gpt_data_b, guid_gpt_a, part_a, part_b, slot_a_status, slot_b_status, is_boot ) if gpt_data_a and gpt_data_b: start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES byte_offset_patch_a = poffset_a % self.cfg.SECTOR_SIZE_IN_BYTES #cmd_patch_multiple(lun_a, start_sector_patch_a, byte_offset_patch_a, pdata_a) if lun_a != lun_b: entryoffset_a = poffset_a - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize) gpt_data_a[entryoffset_a: entryoffset_a+len(pdata_a)] = pdata_a new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a) start_sector_hdr_a = guid_gpt_a.header.current_lba headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size] cmd_patch_multiple(lun_a, start_sector_hdr_a, 0, new_hdr_a) start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES byte_offset_patch_b = poffset_b % self.cfg.SECTOR_SIZE_IN_BYTES cmd_patch_multiple(lun_b, start_sector_patch_b, byte_offset_patch_b, pdata_b) entryoffset_b = poffset_b - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize) gpt_data_b[entryoffset_b:entryoffset_b + len(pdata_b)] = pdata_b new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b) start_sector_hdr_b = guid_gpt_b.header.current_lba headeroffset_b = guid_gpt_b.sectorsize new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size] cmd_patch_multiple(lun_b, start_sector_hdr_b, 0, new_hdr_b) return True return True if slot.lower() not in ["a", "b"]: self.error("Only slots a or b are accepted. Aborting.") return False slot_a_status = None if slot == "a": slot_a_status = True elif slot == "b": slot_a_status = False slot_b_status = not slot_a_status fpartitions = {} try: for lun_a in self.luns: lunname = "Lun" + str(lun_a) fpartitions[lunname] = [] gpt_data_a, guid_gpt_a = self.get_gpt(lun_a, int(0), int(0), int(0)) #back_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba) if guid_gpt_a is None: break else: for partitionname_a in guid_gpt_a.partentries: slot = partitionname_a.lower()[-2:] if slot == "_a": partitionname_b = partitionname_a[:-1] + "b" if partitionname_b in guid_gpt_a.partentries: lun_b = lun_a gpt_data_b = gpt_data_a guid_gpt_b = guid_gpt_a else: resp = self.detect_partition(arguments=None, partitionname=partitionname_b, send_full=True) if not resp[0]: self.error(f"Cannot find partition {partitionname_b}") return False _, lun_b, gpt_data_b, guid_gpt_b = resp update_gpt_info(guid_gpt_a, guid_gpt_b, partitionname_a, partitionname_b, gpt_data_a, gpt_data_b, slot_a_status, slot_b_status, lun_a, lun_b ) #part_a = guid_gpt_a.partentries[partitionname_a] #part_b = guid_gpt_b.partentries[partitionname_b] #is_boot = False #if partitionname_a == "boot_a": # is_boot = True #pdata_a, poffset_a, pdata_b, poffset_b = patch_helper( # gpt_data_a, gpt_data_b, # guid_gpt_a, part_a, part_b, # slot_a_status, slot_b_status, # is_boot #) #if gpt_data_a and gpt_data_b: # gpt_data_a[poffset_a : poffset_a+len(pdata_a)] = pdata_a # new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a) # gpt_data_b[poffset_b:poffset_b + len(pdata_b)] = pdata_b # new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b) # prim_start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES # prim_byte_offset_patch_a = poffset_a % self.cfg.SECTOR_SIZE_IN_BYTES # prim_start_sector_hdr_a = guid_gpt_a.header.current_lba # prim_start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES # prim_byte_offset_patch_b = poffset_b % self.cfg.SECTOR_SIZE_IN_BYTES # prim_start_sector_hdr_b = guid_gpt_b.header.current_lba # headeroffset_a = prim_start_sector_hdr_a * guid_gpt_a.sectorsize # prim_new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size] # headeroffset_b = prim_start_sector_hdr_b * guid_gpt_b.sectorsize # prim_new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size] #self.update_gpt_info(prim_new_hdr_a, prim_new_hdr_b, # pdata_a, pdata_b, # lun_a, lun_b, # prim_start_sector_hdr_a, prim_start_sector_hdr_b, # prim_start_sector_patch_a, prim_start_sector_patch_b, # prim_byte_offset_patch_a, prim_byte_offset_patch_b) #self.update_gpt_info(new_hdr_a, new_hdr_b, # pdata_a, pdata_b, # lun_a, lun_b, # prim_start_sector_hdr_a, prim_start_sector_hdr_b, # prim_start_sector_patch_a, prim_start_sector_patch_b, # prim_byte_offset_patch_a, prim_byte_offset_patch_b) #back_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba) #print(f"signature: {backup_guid_gpt_a.header.signature}") #print(f"revision: {backup_guid_gpt_a.header.revision}") #print(f"header size: {backup_guid_gpt_a.header.header_size}") #print(f"crc32: {backup_guid_gpt_a.header.crc32}") #print(f"reserved: {backup_guid_gpt_a.header.reserved}") #print(f"current_lba: {backup_guid_gpt_a.header.current_lba}") #print(f"backup_lba: {backup_guid_gpt_a.header.backup_lba}") #print(f"first_usable_lba: {backup_guid_gpt_a.header.first_usable_lba}") #print(f"last_usable_lba: {backup_guid_gpt_a.header.last_usable_lba}") #print(f"disk_guid: {backup_guid_gpt_a.header.disk_guid}") #print(f"part_entry_stzrt_lba: {backup_guid_gpt_a.header.part_entry_start_lba}") #print(f"num_part_entries: {backup_guid_gpt_a.header.num_part_entries}") #print(f"part_entry_size: {backup_guid_gpt_a.header.part_entry_size}") #print(f"crc32_part_entries: {backup_guid_gpt_a.header.crc32_part_entries}") #return True #if (backup_guid_gpt_a is None): # self.error("error in backup") # return False except Exception as err: self.error(str(err)) return False return True def cmd_test(self, cmd): token = "1234" pk = "1234" data = "<?xml version=\"1.0\" ?>\n<data>\n<" + cmd + " token=\"" + token + "\" pk=\"" + pk + "\" />\n</data>" val = self.xmlsend(data) if val.resp: if b"raw hex token" in val[2]: return True if b"opcmd is not enabled" in val[2]: return True return False def cmd_getstorageinfo_string(self): data = "<?xml version=\"1.0\" ?><data><getstorageinfo /></data>" val = self.xmlsend(data) if val.resp: self.info(f"GetStorageInfo:\n--------------------\n") data = self.xml.getlog(val.data) for line in data: self.info(line) return True else: self.warning("GetStorageInfo command isn't supported.") return False def cmd_poke(self, address, data, filename="", info=False): rf = None if filename != "": rf = open(filename, "rb") SizeInBytes = os.stat(filename).st_size else: SizeInBytes = len(data) if info: self.info(f"Poke: Address({hex(address)}),Size({hex(SizeInBytes)})") ''' <?xml version="1.0" ?><data><poke address64="1048576" SizeInBytes="90112" value="0x22 0x00 0x00"/></data> ''' maxsize = 8 lengthtowrite = SizeInBytes if lengthtowrite < maxsize: maxsize = lengthtowrite pos = 0 old = 0 datawritten = 0 mode = 0 if info: print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50) while lengthtowrite > 0: if rf is not None: content = hex(int(hexlify(rf.read(maxsize)).decode('utf-8'), 16)) else: content = 0 if lengthtowrite < maxsize: maxsize = lengthtowrite for i in range(0, maxsize): content = (content << 8) + int( hexlify(data[pos + maxsize - i - 1:pos + maxsize - i]).decode('utf-8'), 16) # content=hex(int(hexlify(data[pos:pos+maxsize]).decode('utf-8'),16)) content = hex(content) if mode == 0: xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \ f"size_in_bytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n" else: xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \ f"SizeInBytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n" try: self.cdc.write(xdata[:self.cfg.MaxXMLSizeInBytes]) except Exception as err: # pylint: disable=broad-except self.debug(str(err)) pass addrinfo = self.cdc.read(timeout=None) if b"SizeInBytes" in addrinfo or b"Invalid parameters" in addrinfo: tmp = b"" while b"NAK" not in tmp and b"ACK" not in tmp: tmp += self.cdc.read(timeout=None) xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \ f"SizeInBytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n" self.cdc.write(xdata[:self.cfg.MaxXMLSizeInBytes]) addrinfo = self.cdc.read(timeout=None) if (b'<response' in addrinfo and 'NAK' in addrinfo) or b"Invalid parameters" in addrinfo: self.error(f"Error:{addrinfo}") return False if b"address" in addrinfo and b"can\'t" in addrinfo: tmp = b"" while b"NAK" not in tmp and b"ACK" not in tmp: tmp += self.cdc.read(timeout=None) self.error(f"Error:{addrinfo}") return False addrinfo = self.cdc.read(timeout=None) if b'<response' in addrinfo and b'NAK' in addrinfo: print(f"Error:{addrinfo}") return False pos += maxsize datawritten += maxsize lengthtowrite -= maxsize if info: prog = round(float(datawritten) / float(SizeInBytes) * float(100), 1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) old = prog if info: self.info("Done writing.") return True def cmd_peek(self, address, SizeInBytes, filename="", info=False): if info: self.info(f"Peek: Address({hex(address)}),Size({hex(SizeInBytes)})") wf = None if filename != "": wf = open(filename, "wb") ''' <?xml version="1.0" ?><data><peek address64="1048576" SizeInBytes="90112" /></data> ''' data = f"<?xml version=\"1.0\" ?><data><peek address64=\"{address}\" " + \ f"size_in_bytes=\"{SizeInBytes}\" /></data>\n" ''' <?xml version="1.0" encoding="UTF-8" ?><data><log value="Using address 00100000" /></data> <?xml version="1.0" encoding="UTF-8" ?><data><log value="0x22 0x00 0x00 0xEA 0x70 0x00 0x00 0xEA 0x74 0x00 0x00 0xEA 0x78 0x00 0x00 0xEA 0x7C 0x00 0x00 0xEA 0x80 0x00 0x00 0xEA 0x84 0x00 0x00 0xEA 0x88 0x00 0x00 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF " /></data> ''' try: self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes]) except Exception as err: # pylint: disable=broad-except self.debug(str(err)) pass addrinfo = self.cdc.read(timeout=None) if b"SizeInBytes" in addrinfo or b"Invalid parameters" in addrinfo: tmp = b"" while b"NAK" not in tmp and b"ACK" not in tmp: tmp += self.cdc.read(timeout=None) data = f"<?xml version=\"1.0\" ?><data><peek address64=\"{hex(address)}\" " + \ f"SizeInBytes=\"{hex(SizeInBytes)}\" /></data>" self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes]) addrinfo = self.cdc.read(timeout=None) if (b'<response' in addrinfo and 'NAK' in addrinfo) or b"Invalid parameters" in addrinfo: self.error(f"Error:{addrinfo}") return False if b"address" in addrinfo and b"can\'t" in addrinfo: tmp = b"" while b"NAK" not in tmp and b"ACK" not in tmp: tmp += self.cdc.read(timeout=None) self.error(f"Error:{addrinfo}") return False resp = b"" dataread = 0 old = 0 if info: print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50) while True: tmp = self.cdc.read(timeout=None) if b'<response' in tmp or b"ERROR" in tmp: break rdata = self.xml.getlog(tmp)[0].replace("0x", "").replace(" ", "") tmp2 = b"" try: tmp2 = binascii.unhexlify(rdata) except: # pylint: disable=broad-except print(rdata) exit(0) dataread += len(tmp2) if wf is not None: wf.write(tmp2) else: resp += tmp2 if info: prog = round(float(dataread) / float(SizeInBytes) * float(100), 1) if prog > old: print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50) old = prog if wf is not None: wf.close() if b'<response' in tmp and b'ACK' in tmp: if info: self.info(f"Bytes from {hex(address)}, bytes read {hex(dataread)}, written to {filename}.") return True else: self.error(f"Error:{addrinfo}") return False else: return resp def cmd_memcpy(self, destaddress, sourceaddress, size): data = self.cmd_peek(sourceaddress, size) if data != b"" and data: if self.cmd_poke(destaddress, data): return True return False def cmd_rawxml(self, data, response=True): if response: val = self.xmlsend(data) if val.resp: self.info(f"{data} succeeded.") return val.data else: self.error(f"{data} failed.") self.error(f"{val.error}") return False else: self.xmlsend(data, False) return True