edl/edlclient/Library/firehose.py

1726 lines
77 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2021-08-05 09:04:49 +02:00
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
2021-08-05 09:04:49 +02:00
import binascii
import io
import os.path
2021-08-05 09:04:49 +02:00
import platform
import time
import json
from struct import unpack
from binascii import hexlify
from queue import Queue
from threading import Thread
from edlclient.Library.Modules.nothing import nothing
2021-10-25 17:57:14 +02:00
from edlclient.Library.utils import *
2024-03-12 00:33:42 -04:00
from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE, MAX_PRIORITY, PART_ATT_PRIORITY_BIT
from edlclient.Library.gpt import PART_ATT_PRIORITY_VAL, PART_ATT_ACTIVE_VAL, PART_ATT_MAX_RETRY_COUNT_VAL, PART_ATT_SUCCESSFUL_VAL, PART_ATT_UNBOOTABLE_VAL
2021-10-25 17:57:14 +02:00
from edlclient.Library.sparse import QCSparse
2021-12-05 14:32:21 +01:00
from edlclient.Library.utils import progress
2023-04-07 16:50:58 +02:00
from queue import Queue
from threading import Thread
rq = Queue()
def writedata(filename, rq):
pos = 0
with open(filename, "wb") as wf:
while True:
data = rq.get()
if data is None:
break
pos += len(data)
wf.write(data)
rq.task_done()
2021-08-05 09:04:49 +02:00
2022-06-28 13:53:44 +02:00
2022-01-26 10:08:33 +01:00
class response:
2022-06-28 13:53:44 +02:00
resp = False
data = b""
error = ""
log = None
def __init__(self, resp=False, data=b"", error: str = "", log: dict = ""):
self.resp = resp
self.data = data
self.error = error
self.log = log
2022-01-26 10:08:33 +01:00
2021-08-05 09:04:49 +02:00
try:
2021-10-25 17:57:14 +02:00
from edlclient.Library.Modules.init import modules
2021-08-05 09:04:49 +02:00
except ImportError as e:
pass
class nand_partition:
2021-10-25 14:43:33 +02:00
partentries = {}
2021-08-05 09:04:49 +02:00
def __init__(self, parent, printer=None):
if printer is None:
self.printer = print
else:
self.printer = printer
2021-10-25 14:43:33 +02:00
self.partentries = {}
2021-08-05 09:04:49 +02:00
self.partitiontblsector = None
self.parent = parent
self.storage_info = {}
2022-08-27 16:10:16 +02:00
self.totalsectors = None
2021-08-05 09:04:49 +02:00
def parse(self, partdata):
2021-10-25 14:43:33 +02:00
self.partentries = {}
2021-08-05 09:04:49 +02:00
class partf:
sector = 0
sectors = 0
name = ""
attr1 = 0
attr2 = 0
attr3 = 0
which_flash = 0
magic1, magic2, version, numparts = unpack("<IIII", partdata[0:0x10])
2022-01-26 10:08:33 +01:00
if magic1 == 0x55EE73AA and magic2 == 0xE35EBDDB:
2021-08-05 09:04:49 +02:00
data = partdata[0x10:]
2022-01-26 10:08:33 +01:00
for i in range(numparts):
2021-08-05 09:04:49 +02:00
name, offset, length, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB",
data[i * 0x1C:(i * 0x1C) + 0x1C])
np = partf()
2022-06-28 13:53:44 +02:00
if name[:2] == b"0:":
name = name[2:]
2022-01-26 10:08:33 +01:00
np.name = name.rstrip(b"\x00").decode('utf-8').lower()
if self.parent.cfg.block_size == 0:
np.sector = offset
np.sectors = length
else:
np.sector = offset * self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES
np.sectors = (length & 0xFFFF) * self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES
2021-08-05 09:04:49 +02:00
np.attr1 = attr1
np.attr2 = attr2
np.attr3 = attr3
np.which_flash = which_flash
2021-12-05 14:32:21 +01:00
self.partentries[np.name] = np
2022-06-28 13:53:44 +02:00
if self.parent.cfg.block_size != 0 and self.parent.cfg.total_blocks != 0:
2022-01-26 10:08:33 +01:00
self.totalsectors = (self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES) * \
self.parent.cfg.total_blocks
else:
sectors = 0
for part in self.partentries:
if self.partentries[part].sector >= sectors:
sectors += self.partentries[part].sectors
self.totalsectors = sectors
2021-08-05 09:04:49 +02:00
return True
return False
def print(self):
2022-01-26 10:08:33 +01:00
self.printer("Name Offset\t\tLength\t\tAttr\t\t\tFlash")
2021-08-05 09:04:49 +02:00
self.printer("-------------------------------------------------------------")
2021-10-25 14:43:33 +02:00
for selpart in self.partentries:
partition = self.partentries[selpart]
2021-08-05 09:04:49 +02:00
name = partition.name
for i in range(0x10 - len(partition.name)):
name += " "
offset = partition.sector * self.parent.cfg.SECTOR_SIZE_IN_BYTES
length = partition.sectors * self.parent.cfg.SECTOR_SIZE_IN_BYTES
attr1 = partition.attr1
attr2 = partition.attr2
attr3 = partition.attr3
which_flash = partition.which_flash
self.printer(
f"{name}\t%08X\t%08X\t{hex(attr1)}/{hex(attr2)}/{hex(attr3)}\t{which_flash}" % (offset, length))
def writefile(wf, q, stop):
while True:
data = q.get()
if len(data) > 0:
wf.write(data)
q.task_done()
if stop() and q.empty():
break
class asyncwriter():
def __init__(self, wf):
self.writequeue = Queue()
self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,))
self.worker.setDaemon(True)
self.stopthreads = False
self.worker.start()
def write(self, data):
self.writequeue.put_nowait(data)
def stop(self):
self.stopthreads = True
self.writequeue.join()
class firehose(metaclass=LogBase):
class cfg:
TargetName = ""
Version = ""
ZLPAwareHost = 1
SkipStorageInit = 0
SkipWrite = 0
MaxPayloadSizeToTargetInBytes = 1048576
MaxPayloadSizeFromTargetInBytes = 8192
MaxXMLSizeInBytes = 4096
bit64 = True
total_blocks = 0
2022-01-26 10:08:33 +01:00
num_physical = 0
2021-08-05 09:04:49 +02:00
block_size = 0
SECTOR_SIZE_IN_BYTES = 0
MemoryName = "eMMC"
prod_name = "Unknown"
maxlun = 99
def __init__(self, cdc, xml, cfg, loglevel, devicemodel, serial, skipresponse, luns, args):
self.cdc = cdc
self.lasterror = b""
self.loglevel = loglevel
self.args = args
self.xml = xml
self.cfg = cfg
self.prog = 0
self.progtime = 0
self.progpos = 0
self.pk = None
self.modules = None
self.serial = serial
self.devicemodel = devicemodel
self.skipresponse = skipresponse
self.luns = luns
self.supported_functions = []
self.lunsizes = {}
self.info = self.__logger.info
self.error = self.__logger.error
self.debug = self.__logger.debug
self.warning = self.__logger.warning
self.__logger.setLevel(loglevel)
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
self.__logger.addHandler(fh)
self.nandparttbl = None
self.nandpart = nand_partition(parent=self, printer=print)
2024-03-09 15:46:48 -05:00
def detect_partition(self, arguments, partitionname, send_full=False):
2024-03-09 14:12:02 -05:00
if arguments is None:
arguments = {
"--gpt-num-part-entries" : 0,
"--gpt-part-entry-size" : 0,
"--gpt-part-entry-start-lba" : 0
}
2021-08-05 09:04:49 +02:00
fpartitions = {}
for lun in self.luns:
lunname = "Lun" + str(lun)
fpartitions[lunname] = []
data, guid_gpt = self.get_gpt(lun, int(arguments["--gpt-num-part-entries"]),
int(arguments["--gpt-part-entry-size"]),
int(arguments["--gpt-part-entry-start-lba"]))
if guid_gpt is None:
break
else:
2021-10-25 14:43:33 +02:00
if partitionname in guid_gpt.partentries:
2024-03-24 22:50:30 -04:00
return [True, lun, data, guid_gpt] if send_full else [True, lun, guid_gpt.partentries[partitionname]]
2021-10-25 14:43:33 +02:00
for part in guid_gpt.partentries:
fpartitions[lunname].append(part)
2021-08-05 09:04:49 +02:00
return [False, fpartitions]
def getstatus(self, resp):
if "value" in resp:
value = resp["value"]
2022-01-26 10:08:33 +01:00
if value == "ACK" or value == "true":
2021-08-05 09:04:49 +02:00
return True
else:
return False
return True
def decoder(self, data):
if isinstance(data, bytes) or isinstance(data, bytearray):
if data[:5] == b"<?xml":
try:
rdata = ""
for line in data.split(b"\n"):
try:
rdata += line.decode('utf-8') + "\n"
except Exception as err:
self.debug(str(err))
rdata += hexlify(line).decode('utf-8') + "\n"
return rdata
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
return data
2022-01-26 10:08:33 +01:00
def xmlsend(self, data, skipresponse=False) -> response:
self.cdc.flush()
self.cdc.xmlread = True
2021-08-05 09:04:49 +02:00
if isinstance(data, bytes) or isinstance(data, bytearray):
self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes])
else:
self.cdc.write(bytes(data, 'utf-8')[:self.cfg.MaxXMLSizeInBytes])
rdata = bytearray()
counter = 0
2022-01-26 10:08:33 +01:00
timeout = 3
2021-08-05 09:04:49 +02:00
if not skipresponse:
2021-12-19 00:28:54 +01:00
while b"<response value" not in rdata:
2021-08-05 09:04:49 +02:00
try:
2022-01-26 10:08:33 +01:00
tmp = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if tmp == b"" in rdata:
counter += 1
time.sleep(0.05)
if counter > timeout:
break
rdata += tmp
except Exception as err:
self.error(err)
2022-06-28 13:53:44 +02:00
return response(resp=False, error=str(err))
2021-08-05 09:04:49 +02:00
try:
if b"raw hex token" in rdata:
rdata = rdata
try:
resp = self.xml.getresponse(rdata)
2022-01-26 10:08:33 +01:00
status = self.getstatus(resp)
if "rawmode" in resp:
2022-06-28 13:53:44 +02:00
if resp["rawmode"] == "false":
2022-01-26 10:08:33 +01:00
if status:
log = self.xml.getlog(rdata)
2023-05-30 16:26:57 +02:00
return response(resp=status, data=rdata, log=log)
2022-01-26 10:08:33 +01:00
else:
error = self.xml.getlog(rdata)
return response(resp=status, error=error, data=resp, log=error)
else:
if status:
if b"log value=" in rdata:
log = self.xml.getlog(rdata)
return response(resp=resp, data=rdata, log=log)
return response(resp=status, data=rdata)
2021-08-05 09:04:49 +02:00
except Exception as e: # pylint: disable=broad-except
rdata = bytes(self.decoder(rdata), 'utf-8')
resp = self.xml.getresponse(rdata)
status = self.getstatus(resp)
2022-01-26 10:08:33 +01:00
if status:
return response(resp=True, data=resp)
else:
2022-06-28 13:53:44 +02:00
error = ""
2022-01-26 10:08:33 +01:00
if b"<log value" in rdata:
2022-06-28 13:53:44 +02:00
error = self.xml.getlog(rdata)
2022-01-26 10:08:33 +01:00
return response(resp=False, error=error, data=resp)
2021-08-05 09:04:49 +02:00
except Exception as err:
self.debug(str(err))
if isinstance(rdata, bytes) or isinstance(rdata, bytearray):
try:
self.debug("Error on getting xml response:" + rdata.decode('utf-8'))
except Exception as err:
self.debug("Error on getting xml response:" + hexlify(rdata).decode('utf-8') +
", Error: " + str(err))
elif isinstance(rdata, str):
self.debug("Error on getting xml response:" + rdata)
2022-06-28 13:53:44 +02:00
return response(resp=False, error=rdata)
2021-08-05 09:04:49 +02:00
else:
2022-06-28 13:53:44 +02:00
return response(resp=True, data=rdata)
2021-08-05 09:04:49 +02:00
2022-01-26 10:08:33 +01:00
def cmd_reset(self, mode="reset"):
if mode is None:
mode = "reset"
2022-06-28 13:53:44 +02:00
data = "<?xml version=\"1.0\" ?><data><power value=\"" + mode + "\"/></data>"
2021-08-05 09:04:49 +02:00
val = self.xmlsend(data)
try:
v = None
while v != b'':
2022-01-26 10:08:33 +01:00
v = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if v != b'':
resp = self.xml.getlog(v)[0]
else:
break
print(resp)
except Exception as err:
self.error(str(err))
pass
2022-01-26 10:08:33 +01:00
if val.resp:
2021-08-05 09:04:49 +02:00
self.info("Reset succeeded.")
return True
else:
2022-06-28 13:53:44 +02:00
self.error("Reset failed: " + val.error)
2021-08-05 09:04:49 +02:00
return False
def cmd_xml(self, filename):
with open(filename, 'rb') as rf:
data = rf.read()
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
self.info("Command succeeded." + str(val.data))
return val.data
2021-08-05 09:04:49 +02:00
else:
2022-01-26 10:08:33 +01:00
self.error("Command failed:" + str(val.error))
return val.error
2021-08-05 09:04:49 +02:00
def cmd_nop(self):
data = "<?xml version=\"1.0\" ?><data><nop /></data>"
2022-06-28 13:53:44 +02:00
resp = self.xmlsend(data, True)
2023-01-20 10:58:40 +00:00
self.debug(resp.data.hex())
2021-08-05 09:04:49 +02:00
info = b""
tmp = None
while tmp != b"":
2022-01-26 10:08:33 +01:00
tmp = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if tmp == b"":
break
info += tmp
if info != b"":
self.info("Nop succeeded.")
return self.xml.getlog(info)
else:
self.error("Nop failed.")
return False
def cmd_getsha256digest(self, physical_partition_number, start_sector, num_partition_sectors):
data = f"<?xml version=\"1.0\" ?><data><getsha256digest" + \
f" SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
res = self.xml.getlog(val.data)
2021-08-05 09:04:49 +02:00
for line in res:
self.info(line)
if "Digest " in res:
return res.split("Digest ")[1]
else:
return res
else:
2022-06-28 13:53:44 +02:00
self.error("GetSha256Digest failed: " + val.error)
2021-08-05 09:04:49 +02:00
return False
def cmd_setbootablestoragedrive(self, partition_number):
data = f"<?xml version=\"1.0\" ?><data>\n<setbootablestoragedrive value=\"{str(partition_number)}\" /></data>"
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
2021-08-05 09:04:49 +02:00
self.info("Setbootablestoragedrive succeeded.")
return True
else:
2022-01-26 10:08:33 +01:00
self.error("Setbootablestoragedrive failed: " + val.error)
2021-08-05 09:04:49 +02:00
return False
2022-06-28 13:53:44 +02:00
def cmd_send(self, content, responsexml=True):
2021-08-05 09:04:49 +02:00
data = f"<?xml version=\"1.0\" ?><data>\n<{content} /></data>"
2022-06-28 13:53:44 +02:00
if responsexml:
2021-08-05 09:04:49 +02:00
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
return val.data
2021-08-05 09:04:49 +02:00
else:
self.error(f"{content} failed.")
2022-01-26 10:08:33 +01:00
self.error(f"{val.error}")
return val.error
2021-08-05 09:04:49 +02:00
else:
self.xmlsend(data, True)
return True
def cmd_patch(self, physical_partition_number, start_sector, byte_offset, value, size_in_bytes, display=True):
"""
<patch SECTOR_SIZE_IN_BYTES="512" byte_offset="16" filename="DISK" physical_partition_number="0"
size_in_bytes="4" start_sector="NUM_DISK_SECTORS-1." value="0" what="Zero Out Header CRC in Backup Header."/>
"""
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<patch SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" byte_offset=\"{byte_offset}\"" + \
f" filename=\"DISK\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" size_in_bytes=\"{size_in_bytes}\" " + \
f" start_sector=\"{start_sector}\" " + \
f" value=\"{value}\" "
if self.modules is not None:
data += self.modules.addpatch()
data += f"/>\n</data>"
rsp = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if rsp.resp:
2021-08-05 09:04:49 +02:00
if display:
self.info(f"Patch:\n--------------------\n")
2022-01-26 10:08:33 +01:00
self.info(rsp.data)
2021-08-05 09:04:49 +02:00
return True
else:
2022-01-26 10:08:33 +01:00
self.error(f"Error:{rsp.error}")
2021-08-05 09:04:49 +02:00
return False
def wait_for_data(self):
tmp = bytearray()
timeout = 0
while b'response value' not in tmp:
2022-01-26 10:08:33 +01:00
res = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if res == b'':
timeout += 1
if timeout == 4:
break
time.sleep(0.1)
tmp += res
return tmp
def cmd_program(self, physical_partition_number, start_sector, filename, display=True):
total = os.stat(filename).st_size
sparse = QCSparse(filename, self.loglevel)
sparseformat = False
if sparse.readheader():
sparseformat = True
total = sparse.getsize()
bytestowrite = total
2021-12-05 14:32:21 +01:00
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
2021-08-05 09:04:49 +02:00
with open(filename, "rb") as rf:
# Make sure we fill data up to the sector size
num_partition_sectors = total // self.cfg.SECTOR_SIZE_IN_BYTES
if (total % self.cfg.SECTOR_SIZE_IN_BYTES) != 0:
num_partition_sectors += 1
if display:
self.info(f"\nWriting to physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\" "
if self.modules is not None:
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Write", pos=0, total=total, display=display)
2022-01-26 10:08:33 +01:00
if rsp.resp:
2021-08-05 09:04:49 +02:00
while bytestowrite > 0:
wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes)
if sparseformat:
wdata = sparse.read(wlen)
else:
wdata = rf.read(wlen)
bytestowrite -= wlen
if wlen % self.cfg.SECTOR_SIZE_IN_BYTES != 0:
filllen = (wlen // self.cfg.SECTOR_SIZE_IN_BYTES * self.cfg.SECTOR_SIZE_IN_BYTES) + \
self.cfg.SECTOR_SIZE_IN_BYTES
wdata += b"\x00" * (filllen - wlen)
self.cdc.write(wdata)
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Write", pos=total - bytestowrite, total=total, display=display)
2021-08-05 09:04:49 +02:00
self.cdc.write(b'')
# time.sleep(0.2)
wd = self.wait_for_data()
log = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in log:
self.error(line)
return False
else:
self.error(f"Error:{rsp}")
return False
return True
def cmd_program_buffer(self, physical_partition_number, start_sector, wfdata, display=True):
bytestowrite = len(wfdata)
total = bytestowrite
# Make sure we fill data up to the sector size
num_partition_sectors = bytestowrite // self.cfg.SECTOR_SIZE_IN_BYTES
if (bytestowrite % self.cfg.SECTOR_SIZE_IN_BYTES) != 0:
num_partition_sectors += 1
if display:
self.info(f"\nWriting to physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\" "
if self.modules is not None:
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
2021-12-05 14:32:21 +01:00
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
progbar.show_progress(prefix="Write", pos=0, total=total, display=display)
2022-01-26 10:08:33 +01:00
if rsp.resp:
2021-08-05 09:04:49 +02:00
pos = 0
while bytestowrite > 0:
wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes)
2021-08-27 23:03:46 +02:00
wrdata = wfdata[pos:pos + wlen]
2021-08-05 09:04:49 +02:00
pos += wlen
bytestowrite -= wlen
if wlen % self.cfg.SECTOR_SIZE_IN_BYTES != 0:
filllen = (wlen // self.cfg.SECTOR_SIZE_IN_BYTES * self.cfg.SECTOR_SIZE_IN_BYTES) + \
self.cfg.SECTOR_SIZE_IN_BYTES
2021-08-27 23:03:46 +02:00
wrdata += b"\x00" * (filllen - wlen)
2021-08-05 09:04:49 +02:00
2021-08-27 23:03:46 +02:00
self.cdc.write(wrdata)
2021-08-05 09:04:49 +02:00
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Write", pos=total - bytestowrite, total=total, display=display)
2021-08-05 09:04:49 +02:00
self.cdc.write(b'')
# time.sleep(0.2)
wd = self.wait_for_data()
log = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in log:
self.error(line)
return False
else:
self.error(f"Error:{rsp}")
return False
2022-01-26 10:08:33 +01:00
else:
self.error(f"Error:{rsp.error}")
2021-08-05 09:04:49 +02:00
return True
def cmd_erase(self, physical_partition_number, start_sector, num_partition_sectors, display=True):
if display:
self.info(f"\nErasing from physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\" "
if self.modules is not None:
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
empty = b"\x00" * self.cfg.MaxPayloadSizeToTargetInBytes
pos = 0
bytestowrite = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
2021-12-05 14:32:21 +01:00
progbar = progress(self.cfg.MaxPayloadSizeToTargetInBytes)
progbar.show_progress(prefix="Erase", pos=0, total=total, display=display)
2022-01-26 10:08:33 +01:00
if rsp.resp:
2021-08-05 09:04:49 +02:00
while bytestowrite > 0:
wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes)
self.cdc.write(empty[:wlen])
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Erase", pos=total - bytestowrite, total=total, display=display)
2021-08-05 09:04:49 +02:00
bytestowrite -= wlen
pos += wlen
self.cdc.write(b'')
res = self.wait_for_data()
info = self.xml.getlog(res)
rsp = self.xml.getresponse(res)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in info:
self.error(line)
return False
else:
self.error(f"Error:{rsp}")
return False
2022-01-26 10:08:33 +01:00
else:
self.error(f"Error:{rsp.error}")
return False
2021-08-05 09:04:49 +02:00
return True
def cmd_read(self, physical_partition_number, start_sector, num_partition_sectors, filename, display=True):
2023-04-07 16:50:58 +02:00
global rq
2021-08-05 09:04:49 +02:00
self.lasterror = b""
2021-12-05 14:32:21 +01:00
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
2021-08-05 09:04:49 +02:00
if display:
self.info(
f"\nReading from physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
2023-04-07 16:50:58 +02:00
data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
2021-08-05 09:04:49 +02:00
2023-04-07 16:50:58 +02:00
rsp = self.xmlsend(data, self.skipresponse)
self.cdc.xmlread = False
time.sleep(0.01)
if not rsp.resp:
if display:
self.error(rsp.error)
return b""
else:
bytestoread = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = bytestoread
show_progress = progbar.show_progress
usb_read = self.cdc.read
progbar.show_progress(prefix="Read", pos=0, total=total, display=display)
worker = Thread(target=writedata, args=(filename, rq), daemon=True)
worker.start()
while bytestoread > 0:
if self.cdc.is_serial:
maxsize = self.cfg.MaxPayloadSizeFromTargetInBytes
else:
maxsize = 5 * 1024 * 1024
size = min(maxsize, bytestoread)
data = usb_read(size)
if len(data) > 0:
rq.put(data)
bytestoread -= len(data)
show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display)
rq.put(None)
worker.join(60)
self.cdc.xmlread = True
wd = self.wait_for_data()
info = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
if bytestoread!=0:
2021-08-05 09:04:49 +02:00
self.error(f"Error:")
for line in info:
self.error(line)
self.lasterror += bytes(line + "\n", "utf-8")
2023-04-07 16:50:58 +02:00
return False
else:
if display:
self.error(f"Error:{rsp[2]}")
return False
return True
2021-08-05 09:04:49 +02:00
def cmd_read_buffer(self, physical_partition_number, start_sector, num_partition_sectors, display=True):
self.lasterror = b""
prog = 0
if display:
self.info(
f"\nReading from physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
2021-12-05 14:32:21 +01:00
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
2021-08-05 09:04:49 +02:00
rsp = self.xmlsend(data, self.skipresponse)
2022-01-26 10:08:33 +01:00
self.cdc.xmlread = False
2021-08-05 09:04:49 +02:00
resData = bytearray()
2022-01-26 10:08:33 +01:00
if not rsp.resp:
if display:
self.error(rsp.error)
return rsp
else:
2021-08-05 09:04:49 +02:00
bytestoread = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = bytestoread
if display:
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display)
2021-08-05 09:04:49 +02:00
while bytestoread > 0:
2022-01-26 10:08:33 +01:00
tmp = self.cdc.read(min(self.cdc.maxsize, bytestoread))
2021-12-05 14:32:21 +01:00
size = len(tmp)
2021-08-05 09:04:49 +02:00
bytestoread -= size
resData.extend(tmp)
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display)
2022-01-26 10:08:33 +01:00
self.cdc.xmlread = True
2021-08-05 09:04:49 +02:00
wd = self.wait_for_data()
info = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in info:
self.error(line)
2022-06-28 13:53:44 +02:00
return response(resp=False, data=resData, error=info)
2022-01-26 10:08:33 +01:00
elif "rawmode" in rsp:
if rsp["rawmode"] == "false":
2022-06-28 13:53:44 +02:00
return response(resp=True, data=resData)
2021-08-05 09:04:49 +02:00
else:
if len(rsp) > 1:
if b"Failed to open the UFS Device" in rsp[2]:
self.error(f"Error:{rsp[2]}")
2022-01-26 10:08:33 +01:00
self.lasterror = rsp[2]
2022-06-28 13:53:44 +02:00
return response(resp=False, data=resData, error=rsp[2])
if rsp["value"] != "ACK":
2021-08-05 09:04:49 +02:00
self.lasterror = rsp[2]
if display and prog != 100:
2021-12-05 14:32:21 +01:00
progbar.show_progress(prefix="Read", pos=total, total=total, display=display)
2022-06-28 13:53:44 +02:00
resp = rsp["value"] == "ACK"
return response(resp=resp, data=resData, error=rsp[2]) # Do not remove, needed for oneplus
2021-08-05 09:04:49 +02:00
2024-03-25 01:32:34 -04:00
def get_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba, start_sector=1):
2021-08-05 09:04:49 +02:00
try:
2024-03-24 22:50:30 -04:00
resp = self.cmd_read_buffer(lun, 0, 1, False)
2021-08-05 09:04:49 +02:00
except Exception as err:
self.debug(str(err))
self.skipresponse = True
2024-03-24 22:50:30 -04:00
resp = self.cmd_read_buffer(lun, 0, 1, False)
2021-08-05 09:04:49 +02:00
2022-06-28 13:53:44 +02:00
if not resp.resp:
2022-01-26 10:08:33 +01:00
for line in resp.error:
self.error(line)
2021-08-05 09:04:49 +02:00
return None, None
2022-01-26 10:08:33 +01:00
data = resp.data
2021-08-05 09:04:49 +02:00
magic = unpack("<I", data[0:4])[0]
2024-03-24 23:02:59 -04:00
data += self.cmd_read_buffer(lun, start_sector, 1, False).data
2021-08-05 09:04:49 +02:00
if magic == 0x844bdcd1:
2022-01-26 10:08:33 +01:00
self.info("Nand storage detected.")
self.info("Scanning for partition table ...")
progbar = progress(1)
2021-08-05 09:04:49 +02:00
if self.nandpart.partitiontblsector is None:
2022-06-28 13:53:44 +02:00
sector = 0x280
2022-01-26 10:08:33 +01:00
progbar.show_progress(prefix="Scanning", pos=sector, total=1024, display=True)
2024-03-24 23:02:59 -04:00
resp = self.cmd_read_buffer(0, sector, 1, False)
2022-01-26 10:08:33 +01:00
if resp.resp:
2022-06-28 13:53:44 +02:00
if resp.data[0:8] in [b"\xac\x9f\x56\xfe\x7a\x12\x7f\xcd", b"\xAA\x73\xEE\x55\xDB\xBD\x5E\xE3"]:
2022-01-26 10:08:33 +01:00
progbar.show_progress(prefix="Scanning", pos=1024, total=1024, display=True)
self.nandpart.partitiontblsector = sector
self.info(f"Found partition table at sector {sector} :)")
else:
self.error("Error on reading partition table data")
return None, None
2021-08-05 09:04:49 +02:00
if self.nandpart.partitiontblsector is not None:
2024-03-24 23:02:59 -04:00
resp = self.cmd_read_buffer(0, self.nandpart.partitiontblsector + 1, 2, False)
2022-01-26 10:08:33 +01:00
if resp.resp:
if self.nandpart.parse(resp.data):
return resp.data, self.nandpart
else:
self.error("Couldn't find partition table, but command \"rs\" might still work !")
sys.exit(0)
2021-08-05 09:04:49 +02:00
return None, None
else:
2022-01-26 10:08:33 +01:00
data = resp.data
2021-08-05 09:04:49 +02:00
guid_gpt = gpt(
num_part_entries=gpt_num_part_entries,
part_entry_size=gpt_part_entry_size,
part_entry_start_lba=gpt_part_entry_start_lba,
loglevel=self.__logger.level
)
try:
2024-03-25 01:32:34 -04:00
sectorsize = self.cfg.SECTOR_SIZE_IN_BYTES
header = guid_gpt.parseheader(data, sectorsize)
2021-08-05 09:04:49 +02:00
if header.signature == b"EFI PART":
2024-03-25 01:32:34 -04:00
part_table_size = header.num_part_entries * header.part_entry_size
sectors = part_table_size // self.cfg.SECTOR_SIZE_IN_BYTES
if part_table_size % self.cfg.SECTOR_SIZE_IN_BYTES != 0:
2021-08-05 09:04:49 +02:00
sectors += 1
if sectors == 0:
return None, None
2021-12-19 00:28:54 +01:00
if sectors > 64:
sectors = 64
2024-03-25 01:32:34 -04:00
data += self.cmd_read_buffer(lun, header.part_entry_start_lba, sectors, False).data
2024-03-24 23:02:59 -04:00
if data == b"":
return None, None
2024-03-25 01:32:34 -04:00
guid_gpt.parse(data, self.cfg.SECTOR_SIZE_IN_BYTES)
return data, guid_gpt
2021-08-05 09:04:49 +02:00
else:
return None, None
except Exception as err:
self.debug(str(err))
return None, None
def get_backup_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba):
2022-01-26 10:08:33 +01:00
resp = self.cmd_read_buffer(lun, 0, 2, False)
if not resp.resp:
self.error("Error on reading backup gpt")
2021-08-05 09:04:49 +02:00
return None
guid_gpt = gpt(
num_part_entries=gpt_num_part_entries,
part_entry_size=gpt_part_entry_size,
part_entry_start_lba=gpt_part_entry_start_lba,
loglevel=self.__logger.level
)
2022-01-26 10:08:33 +01:00
header = guid_gpt.parseheader(resp.data, self.cfg.SECTOR_SIZE_IN_BYTES)
2021-08-05 09:04:49 +02:00
if "backup_lba" in header:
sectors = header.first_usable_lba - 1
data = self.cmd_read_buffer(lun, header.backup_lba, sectors, False)
if data == b"":
return None
return data
else:
return None
def calc_offset(self, sector, offset):
sector = sector + (offset // self.cfg.SECTOR_SIZE_IN_BYTES)
offset = offset % self.cfg.SECTOR_SIZE_IN_BYTES
return sector, offset
def getluns(self, argument):
if argument["--lun"] is not None:
return [int(argument["--lun"])]
luns = []
if self.cfg.MemoryName.lower() == "ufs":
for i in range(0, self.cfg.maxlun):
luns.append(i)
else:
luns = [0]
return luns
def configure(self, lvl):
if self.cfg.SECTOR_SIZE_IN_BYTES == 0:
if self.cfg.MemoryName.lower() == "emmc":
self.cfg.SECTOR_SIZE_IN_BYTES = 512
else:
self.cfg.SECTOR_SIZE_IN_BYTES = 4096
2022-01-26 10:08:33 +01:00
connectcmd = f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>" + \
2021-08-05 09:04:49 +02:00
f"<configure MemoryName=\"{self.cfg.MemoryName}\" " + \
2022-01-26 10:08:33 +01:00
f"Verbose=\"0\" " + \
f"AlwaysValidate=\"0\" " + \
f"MaxDigestTableSizeInBytes=\"2048\" " + \
f"MaxPayloadSizeToTargetInBytes=\"{str(self.cfg.MaxPayloadSizeToTargetInBytes)}\" " + \
2021-08-05 09:04:49 +02:00
f"ZLPAwareHost=\"{str(self.cfg.ZLPAwareHost)}\" " + \
f"SkipStorageInit=\"{str(int(self.cfg.SkipStorageInit))}\" " + \
2022-01-26 10:08:33 +01:00
f"SkipWrite=\"{str(int(self.cfg.SkipWrite))}\"/>" + \
2021-08-05 09:04:49 +02:00
"</data>"
'''
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data><response value=\"ACK\" MinVersionSupported=\"1\"" \
"MemoryName=\"eMMC\" MaxPayloadSizeFromTargetInBytes=\"4096\" MaxPayloadSizeToTargetInBytes=\"1048576\" " \
2024-03-24 22:50:30 -04:00
"MaxPayloadSizeToTargetInBytesSupported=\"1048576\" MaxXMLSizeInBytes=\"4096\" Version=\"1\"
2021-08-05 09:04:49 +02:00
TargetName=\"8953\" />" \
"</data>"
'''
rsp = self.xmlsend(connectcmd)
2022-01-26 10:08:33 +01:00
if not rsp.resp:
2022-06-28 13:53:44 +02:00
if rsp.error == "":
2022-03-23 12:45:46 +01:00
try:
if "MemoryName" in rsp.data:
2022-06-28 13:53:44 +02:00
self.cfg.MemoryName = rsp.data["MemoryName"]
2022-03-23 12:45:46 +01:00
except TypeError:
self.warning("!DEBUG! rsp.data: '%s'" % (rsp.data,))
return self.configure(lvl + 1)
2022-01-26 10:08:33 +01:00
if "MaxPayloadSizeFromTargetInBytes" in rsp.data:
self.cfg.MaxPayloadSizeFromTargetInBytes = int(rsp.data["MaxPayloadSizeFromTargetInBytes"])
if "MaxPayloadSizeToTargetInBytes" in rsp.data:
self.cfg.MaxPayloadSizeToTargetInBytes = int(rsp.data["MaxPayloadSizeToTargetInBytes"])
if "MaxPayloadSizeToTargetInBytesSupported" in rsp.data:
2022-06-28 13:53:44 +02:00
self.cfg.MaxPayloadSizeToTargetInBytesSupported = int(
rsp.data["MaxPayloadSizeToTargetInBytesSupported"])
2022-01-26 10:08:33 +01:00
if "TargetName" in rsp.data:
self.cfg.TargetName = rsp.data["TargetName"]
return self.configure(lvl + 1)
for line in rsp.error:
if "Not support configure MemoryName eMMC" in line:
self.info("eMMC is not supported by the firehose loader. Trying UFS instead.")
self.cfg.MemoryName = "UFS"
return self.configure(lvl + 1)
elif "Only nop and sig tag can be" in line:
2021-08-05 09:04:49 +02:00
self.info("Xiaomi EDL Auth detected.")
try:
self.modules = modules(fh=self, serial=self.serial,
2021-12-05 14:32:21 +01:00
supported_functions=self.supported_functions,
loglevel=self.__logger.level,
devicemodel=self.devicemodel, args=self.args)
2021-08-05 09:04:49 +02:00
except Exception as err: # pylint: disable=broad-except
self.modules = None
if self.modules.edlauth():
rsp = self.xmlsend(connectcmd)
2022-01-26 10:08:33 +01:00
return rsp.resp
else:
self.error("Error on EDL Authentification")
return False
2022-08-27 16:10:16 +02:00
elif "MaxPayloadSizeToTargetInBytes" in rsp.data:
2022-01-26 10:08:33 +01:00
try:
self.cfg.MemoryName = rsp.data["MemoryName"]
self.cfg.MaxPayloadSizeToTargetInBytes = int(rsp.data["MaxPayloadSizeToTargetInBytes"])
self.cfg.MaxPayloadSizeToTargetInBytesSupported = int(
rsp.data["MaxPayloadSizeToTargetInBytesSupported"])
if "MaxXMLSizeInBytes" in rsp.data:
self.cfg.MaxXMLSizeInBytes = int(rsp.data["MaxXMLSizeInBytes"])
else:
self.cfg.MaxXMLSizeInBytes = 4096
if "MaxPayloadSizeFromTargetInBytes" in rsp.data:
self.cfg.MaxPayloadSizeFromTargetInBytes = int(rsp.data["MaxPayloadSizeFromTargetInBytes"])
else:
self.cfg.MaxPayloadSizeFromTargetInBytes = 4096
if "TargetName" in rsp.data:
self.cfg.TargetName = rsp.data["TargetName"]
else:
self.cfg.TargetName = "Unknown"
if "MSM" not in self.cfg.TargetName:
self.cfg.TargetName = "MSM" + self.cfg.TargetName
if "Version" in rsp.data:
self.cfg.Version = rsp.data["Version"]
else:
self.cfg.Version = "Unknown"
if lvl == 0:
return self.configure(lvl + 1)
else:
self.error(f"Error:{rsp}")
sys.exit()
except Exception as e:
pass
elif "ERROR" in line or "WARN" in line:
if "ERROR" in line:
self.error(line)
sys.exit()
elif "WARN" in line:
self.warning(line)
else:
info = self.cdc.read(timeout=1)
2022-06-28 13:53:44 +02:00
if isinstance(rsp.resp, dict):
2022-01-26 10:08:33 +01:00
field = rsp.resp
if "MemoryName" not in field:
2021-08-05 09:04:49 +02:00
# print(rsp[1])
2022-01-26 10:08:33 +01:00
field["MemoryName"] = "eMMC"
if "MaxXMLSizeInBytes" not in field:
field["MaxXMLSizeInBytes"] = "4096"
2021-08-05 09:04:49 +02:00
self.warning("Couldn't detect MaxPayloadSizeFromTargetinBytes")
2022-01-26 10:08:33 +01:00
if "MaxPayloadSizeToTargetInBytes" not in field:
field["MaxPayloadSizeToTargetInBytes"] = "1038576"
if "MaxPayloadSizeToTargetInBytesSupported" not in field:
field["MaxPayloadSizeToTargetInBytesSupported"] = "1038576"
if field["MemoryName"].lower() != self.cfg.MemoryName.lower():
2021-08-05 09:04:49 +02:00
self.warning("Memory type was set as " + self.cfg.MemoryName + " but device reported it is " +
2022-01-26 10:08:33 +01:00
field["MemoryName"] + " instead.")
self.cfg.MemoryName = field["MemoryName"]
if "MaxPayloadSizeToTargetInBytes" in field:
self.cfg.MaxPayloadSizeToTargetInBytes = int(field["MaxPayloadSizeToTargetInBytes"])
else:
self.cfg.MaxPayloadSizeToTargetInBytes = 1048576
if "MaxPayloadSizeToTargetInBytesSupported" in field:
2022-06-28 13:53:44 +02:00
self.cfg.MaxPayloadSizeToTargetInBytesSupported = int(
field["MaxPayloadSizeToTargetInBytesSupported"])
2022-01-26 10:08:33 +01:00
else:
self.cfg.MaxPayloadSizeToTargetInBytesSupported = 1048576
if "MaxXMLSizeInBytes" in field:
self.cfg.MaxXMLSizeInBytes = int(field["MaxXMLSizeInBytes"])
else:
self.cfg.MaxXMLSizeInBytes = 4096
if "MaxPayloadSizeFromTargetInBytes" in field:
self.cfg.MaxPayloadSizeFromTargetInBytes = int(field["MaxPayloadSizeFromTargetInBytes"])
2021-08-05 09:04:49 +02:00
else:
self.cfg.MaxPayloadSizeFromTargetInBytes = self.cfg.MaxXMLSizeInBytes
self.warning("Couldn't detect MaxPayloadSizeFromTargetinBytes")
2022-01-26 10:08:33 +01:00
if "TargetName" in field:
self.cfg.TargetName = field["TargetName"]
2021-08-05 09:04:49 +02:00
if "MSM" not in self.cfg.TargetName:
self.cfg.TargetName = "MSM" + self.cfg.TargetName
else:
self.cfg.TargetName = "Unknown"
self.warning("Couldn't detect TargetName")
2022-01-26 10:08:33 +01:00
if "Version" in field:
self.cfg.Version = field["Version"]
2021-08-05 09:04:49 +02:00
else:
self.cfg.Version = 0
self.warning("Couldn't detect Version")
2022-01-26 10:08:33 +01:00
self.info(f"TargetName={self.cfg.TargetName}")
self.info(f"MemoryName={self.cfg.MemoryName}")
self.info(f"Version={self.cfg.Version}")
self.info("Trying to read first storage sector...")
rsp = self.cmd_read_buffer(0, 1, 1, False)
self.info("Running configure...")
if not rsp.resp and self.args["--memory"] is None:
for line in rsp.error:
if "Failed to set the IO options" in line:
self.warning(
"Memory type eMMC doesn't seem to match (Failed to init). Trying to use NAND instead.")
self.cfg.MemoryName = "nand"
return self.configure(0)
elif "Failed to open the SDCC Device" in line:
self.warning(
"Memory type eMMC doesn't seem to match (Failed to init). Trying to use UFS instead.")
self.cfg.MemoryName = "UFS"
return self.configure(0)
elif "Failed to initialize (open whole lun) UFS Device slot" in line:
self.warning(
"Memory type UFS doesn't seem to match (Failed to init). Trying to use eMMC instead.")
self.cfg.MemoryName = "eMMC"
return self.configure(0)
2022-06-28 13:53:44 +02:00
elif "Attribute \'SECTOR_SIZE_IN_BYTES\'=4096 must be equal to disk sector size 512" in line \
2022-01-26 10:08:33 +01:00
or "different from device sector size (512)" in line:
self.cfg.SECTOR_SIZE_IN_BYTES = 512
return self.configure(0)
2022-06-28 13:53:44 +02:00
elif "Attribute \'SECTOR_SIZE_IN_BYTES\'=512 must be equal to disk sector size 4096" in line \
2022-01-26 10:08:33 +01:00
or "different from device sector size (4096)" in line:
self.cfg.SECTOR_SIZE_IN_BYTES = 4096
return self.configure(0)
self.parse_storage()
for function in self.supported_functions:
if function == "checkntfeature":
if type(self.devicemodel)==list:
self.devicemodel=self.devicemodel[0]
self.nothing = nothing(fh=self, projid=self.devicemodel, serial=self.serial,
supported_functions=self.supported_functions,
loglevel=self.loglevel)
if self.nothing is not None:
self.nothing.ntprojectverify()
2022-01-26 10:08:33 +01:00
self.luns = self.getluns(self.args)
return True
2021-08-05 09:04:49 +02:00
def getlunsize(self, lun):
if lun not in self.lunsizes:
try:
data, guid_gpt = self.get_gpt(lun, int(self.args["--gpt-num-part-entries"]),
int(self.args["--gpt-part-entry-size"]),
int(self.args["--gpt-part-entry-start-lba"]))
self.lunsizes[lun] = guid_gpt.totalsectors
2022-06-28 13:53:44 +02:00
except Exception as err:
self.error(err)
2021-08-05 09:04:49 +02:00
return -1
else:
return self.lunsizes[lun]
return guid_gpt.totalsectors
2021-10-25 14:43:33 +02:00
def get_supported_functions(self):
supfunc = False
info = self.cmd_nop()
if not info:
self.info("No supported functions detected, configuring qc generic commands")
self.supported_functions = ['configure', 'program', 'firmwarewrite', 'patch', 'setbootablestoragedrive',
'ufs', 'emmc', 'power', 'benchmark', 'read', 'getstorageinfo',
'getcrc16digest', 'getsha256digest', 'erase', 'peek', 'poke', 'nop', 'xml']
else:
self.supported_functions = []
for line in info:
if "chip serial num" in line.lower():
self.info(line)
try:
serial = line.split("0x")[1][:-1]
self.serial = int(serial, 16)
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
serial = line.split(": ")[2]
self.serial = int(serial.split(" ")[0])
if supfunc and "end of supported functions" not in line.lower():
rs = line.replace("\n", "")
if rs != "":
rs = rs.replace("INFO: ", "")
self.supported_functions.append(rs)
if "supported functions" in line.lower():
supfunc = True
if len(self.supported_functions) > 1:
info = "Supported Functions: "
for line in self.supported_functions:
info += line + ","
self.info(info[:-1])
2022-01-26 10:08:33 +01:00
data = self.cdc.read(timeout=None)
2021-10-25 14:43:33 +02:00
try:
self.info(data.decode('utf-8'))
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
if not self.supported_functions:
self.supported_functions = ['configure', 'program', 'firmwarewrite', 'patch', 'setbootablestoragedrive',
'ufs', 'emmc', 'power', 'benchmark', 'read', 'getstorageinfo',
'getcrc16digest', 'getsha256digest', 'erase', 'peek', 'poke', 'nop', 'xml']
2021-08-05 09:04:49 +02:00
def connect(self):
v = b'-1'
if platform.system() == 'Windows':
2021-08-19 22:52:20 +02:00
self.cdc.timeout = 50
2021-08-18 13:21:51 -07:00
elif platform.system() == 'Darwin':
# must ensure the timeout is enough to fill the buffer we alloc
# which is 1MB, othwise some data are dropped in the underlying usb libraries
self.cdc.timeout = 50
2021-08-05 09:04:49 +02:00
else:
2021-08-19 22:52:20 +02:00
self.cdc.timeout = 50
2021-08-05 09:04:49 +02:00
info = []
while v != b'':
try:
v = self.cdc.read(timeout=None)
2022-01-26 10:08:33 +01:00
if (b"response" in v and b"</data>" in v) or v == b'':
2021-08-05 09:04:49 +02:00
break
data = self.xml.getlog(v)
if len(data) > 0:
info.append(data[0])
if not info:
break
except Exception as err: # pylint: disable=broad-except
pass
2021-10-25 14:43:33 +02:00
2021-08-05 09:04:49 +02:00
if info == [] or (len(info) > 0 and 'ERROR' in info[0]):
if len(info) > 0:
self.debug(info[0])
2022-06-28 13:53:44 +02:00
if len(info) > 0:
2021-10-25 14:43:33 +02:00
supfunc = False
2021-08-05 09:04:49 +02:00
for line in info:
2022-01-26 10:08:33 +01:00
self.info(line)
2021-08-05 09:04:49 +02:00
if "chip serial num" in line.lower():
try:
serial = line.split("0x")[1][:-1]
if ")" in serial:
serial=serial[:serial.rfind(")")]
2021-08-05 09:04:49 +02:00
self.serial = int(serial, 16)
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
serial = line.split(": ")[2]
self.serial = int(serial.split(" ")[0])
if supfunc and "end of supported functions" not in line.lower():
rs = line.replace("\n", "")
if rs != "":
rs = rs.replace("INFO: ", "")
self.supported_functions.append(rs)
if "supported functions" in line.lower():
supfunc = True
2022-01-26 10:08:33 +01:00
if "program" in line.lower():
2022-06-28 13:53:44 +02:00
idx = line.find("Functions: ")
if idx != -1:
v = line[idx + 11:].split(" ")
2022-01-26 10:08:33 +01:00
for val in v:
2022-06-28 13:53:44 +02:00
if val != "":
2022-01-26 10:08:33 +01:00
self.supported_functions.append(val)
supfunc = False
2021-10-25 14:43:33 +02:00
try:
if os.path.exists(self.cfg.programmer):
data = open(self.cfg.programmer, "rb").read()
for cmd in [b"demacia", b"setprojmodel", b"setswprojmodel", b"setprocstart", b"SetNetType",
b"checkntfeature"]:
2021-10-25 14:43:33 +02:00
if cmd in data:
self.supported_functions.append(cmd.decode('utf-8'))
2021-12-05 14:32:21 +01:00
state = {
"supported_functions": self.supported_functions,
"programmer": self.cfg.programmer,
"serial": self.serial
2021-10-25 14:43:33 +02:00
}
if os.path.exists("edl_config.json"):
data = json.loads(open("edl_config.json","rb").read().decode('utf-8'))
if "serial" in data and data["serial"]!=state["serial"]:
open("edl_config.json", "w").write(json.dumps(state))
else:
self.supported_functions = data["supported_functions"]
self.cfg.programmer = data["programmer"]
else:
open("edl_config.json", "w").write(json.dumps(state))
if "001920e101cf0000_fa2836525c2aad8a_fhprg.bin" in self.cfg.programmer:
self.devicemodel = '20111'
elif "000b80e100020000_467f3020c4cc788d_fhprg.bin" in self.cfg.programmer:
self.devicemodel = '22111'
2021-10-25 14:43:33 +02:00
except:
pass
2021-08-05 09:04:49 +02:00
2022-01-26 10:08:33 +01:00
elif self.serial is None or self.supported_functions is []:
try:
if os.path.exists("edl_config.json"):
pinfo = json.loads(open("edl_config.json", "rb").read())
if not self.supported_functions:
if "supported_functions" in pinfo:
self.supported_functions = pinfo["supported_functions"]
if self.serial is None:
if "serial" in pinfo:
self.serial = pinfo["serial"]
else:
self.get_supported_functions()
except:
self.get_supported_functions()
pass
2021-12-05 14:32:21 +01:00
# rsp = self.xmlsend(data, self.skipresponse)
2021-08-05 09:04:49 +02:00
return self.supported_functions
2022-01-26 10:08:33 +01:00
def parse_storage(self):
storageinfo = self.cmd_getstorageinfo()
2022-06-28 13:53:44 +02:00
if storageinfo is None or storageinfo.resp and len(storageinfo.data) == 0:
2022-01-26 10:08:33 +01:00
return False
info = storageinfo.data
if "UFS Inquiry Command Output" in info:
self.cfg.prod_name = info["UFS Inquiry Command Output"]
self.info(info)
if "UFS Erase Block Size" in info:
self.cfg.block_size = int(info["UFS Erase Block Size"], 16)
self.info(info)
self.cfg.MemoryName = "UFS"
self.cfg.SECTOR_SIZE_IN_BYTES = 4096
if "UFS Boot Partition Enabled" in info:
self.info(info["UFS Boot Partition Enabled"])
if "UFS Total Active LU" in info:
self.cfg.maxlun = int(info["UFS Total Active LU"], 16)
if "SECTOR_SIZE_IN_BYTES" in info:
self.cfg.SECTOR_SIZE_IN_BYTES = int(info["SECTOR_SIZE_IN_BYTES"])
if "num_physical_partitions" in info:
self.cfg.num_physical = int(info["num_physical_partitions"])
return True
2021-08-05 09:04:49 +02:00
# OEM Stuff here below --------------------------------------------------
def cmd_writeimei(self, imei):
if len(imei) != 16:
self.info("IMEI must be 16 digits")
return False
data = "<?xml version=\"1.0\" ?><data><writeIMEI len=\"16\"/></data>"
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
2021-08-05 09:04:49 +02:00
self.info("writeIMEI succeeded.")
return True
else:
self.error("writeIMEI failed.")
return False
def cmd_getstorageinfo(self):
data = "<?xml version=\"1.0\" ?><data><getstorageinfo physical_partition_number=\"0\"/></data>"
val = self.xmlsend(data)
2022-06-28 13:53:44 +02:00
if val.data == '' and val.log == '' and val.resp:
2022-01-26 10:08:33 +01:00
return None
2022-06-28 13:53:44 +02:00
if isinstance(val.data, dict):
2022-01-26 10:08:33 +01:00
if "bNumberLu" in val.data:
self.cfg.maxlun = int(val.data["bNumberLu"])
if val.resp:
if val.log is not None:
2022-06-28 13:53:44 +02:00
res = {}
2022-01-26 10:08:33 +01:00
for value in val.log:
2022-06-28 13:53:44 +02:00
v = value.split("=")
if len(v) > 1:
res[v[0]] = v[1]
2022-01-26 10:08:33 +01:00
else:
if "\"storage_info\"" in value:
try:
info = value.replace("INFO:", "")
si = json.loads(info)["storage_info"]
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
continue
self.info("Storage report:")
for sii in si:
self.info(f"{sii}:{si[sii]}")
if "total_blocks" in si:
self.cfg.total_blocks = si["total_blocks"]
if "num_physical" in si:
self.cfg.num_physical = si["num_physical"]
2022-08-21 22:06:46 +02:00
self.cfg.maxlun = self.cfg.num_physical
2022-01-26 10:08:33 +01:00
if "block_size" in si:
self.cfg.block_size = si["block_size"]
if "page_size" in si:
self.cfg.SECTOR_SIZE_IN_BYTES = si["page_size"]
if "mem_type" in si:
self.cfg.MemoryName = si["mem_type"]
if "prod_name" in si:
self.cfg.prod_name = si["prod_name"]
else:
2022-06-28 13:53:44 +02:00
v = value.split(":")
if len(v) > 1:
res[v[0]] = v[1].lstrip(" ")
2022-01-26 10:08:33 +01:00
return response(resp=val.resp, data=res)
return response(resp=val.resp, data=val.data)
2021-08-05 09:04:49 +02:00
else:
2022-01-26 10:08:33 +01:00
if val.error:
for v in val.error:
if "Failed to open the SDCC Device" in v:
2022-06-28 13:53:44 +02:00
self.cfg.MemoryName = "ufs"
2022-01-26 10:08:33 +01:00
self.configure(0)
return self.cmd_getstorageinfo()
2021-08-05 09:04:49 +02:00
self.warning("GetStorageInfo command isn't supported.")
return None
2024-03-24 22:50:30 -04:00
def cmd_setactiveslot(self, slot: str):
2024-03-21 01:04:49 -04:00
# flags: 0x3a for inactive and 0x6f for active boot partition
2024-03-10 00:21:11 -05:00
def set_flags(flags, active, is_boot):
2024-03-09 14:12:02 -05:00
new_flags = flags
if active:
2024-03-10 00:21:11 -05:00
if is_boot:
2024-03-21 00:43:00 -04:00
#new_flags |= (PART_ATT_PRIORITY_VAL | PART_ATT_ACTIVE_VAL | PART_ATT_MAX_RETRY_COUNT_VAL)
#new_flags &= (~PART_ATT_SUCCESSFUL_VAL & ~PART_ATT_UNBOOTABLE_VAL)
new_flags = 0x6f << (AB_FLAG_OFFSET*8)
2024-03-10 00:21:11 -05:00
else:
new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8)
2024-03-09 14:12:02 -05:00
else:
2024-03-10 00:21:11 -05:00
if is_boot:
2024-03-21 00:43:00 -04:00
#new_flags &= (~PART_ATT_PRIORITY_VAL & ~PART_ATT_ACTIVE_VAL)
#new_flags |= ((MAX_PRIORITY-1) << PART_ATT_PRIORITY_BIT)
new_flags = 0x3a << (AB_FLAG_OFFSET*8)
2024-03-10 00:21:11 -05:00
else:
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8))
2024-03-09 14:12:02 -05:00
return new_flags
2024-03-24 22:50:30 -04:00
2024-03-26 00:34:10 -04:00
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status, slot_b_status, is_boot):
part_entry_size = guid_gpt_a.header.part_entry_size
2024-03-09 14:12:02 -05:00
2024-03-24 22:50:30 -04:00
rf_a = BytesIO(gpt_data_a)
rf_b = BytesIO(gpt_data_b)
2024-03-10 00:21:11 -05:00
2024-03-26 00:34:10 -04:00
entryoffset_a = partition_a.entryoffset - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
rf_a.seek(entryoffset_a)
rf_b.seek(entryoffset_b)
2024-03-09 14:12:02 -05:00
2024-03-10 00:21:11 -05:00
sdata_a = rf_a.read(part_entry_size)
sdata_b = rf_b.read(part_entry_size)
2024-03-09 14:12:02 -05:00
2024-03-10 00:21:11 -05:00
partentry_a = gpt.gpt_partition(sdata_a)
partentry_b = gpt.gpt_partition(sdata_b)
2024-03-09 14:12:02 -05:00
2024-03-10 00:21:11 -05:00
partentry_a.flags = set_flags(partentry_a.flags, slot_a_status, is_boot)
partentry_b.flags = set_flags(partentry_b.flags, slot_b_status, is_boot)
partentry_a.type, partentry_b.type = partentry_b.type, partentry_a.type
2024-03-09 14:12:02 -05:00
2024-03-10 00:21:11 -05:00
pdata_a, pdata_b = partentry_a.create(), partentry_b.create()
return pdata_a, partition_a.entryoffset, pdata_b, partition_b.entryoffset
2024-03-09 14:12:02 -05:00
2024-03-25 01:32:34 -04:00
def cmd_patch_multiple(lun, start_sector, byte_offset, patch_data):
offset = 0
2024-04-06 03:40:55 -04:00
size_each_patch = 8 if len(patch_data) % 8 == 0 else 4
unpack_fmt = "<I" if size_each_patch == 4 else "<Q"
2024-03-25 01:32:34 -04:00
write_size = len(patch_data)
for i in range(0, write_size, size_each_patch):
2024-04-06 03:40:55 -04:00
pdata_subset = int(unpack(unpack_fmt, patch_data[offset:offset+size_each_patch])[0])
2024-03-26 00:34:10 -04:00
self.cmd_patch( lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, True)
2024-03-25 01:32:34 -04:00
offset += size_each_patch
return True
def update_gpt_info(guid_gpt_a, guid_gpt_b, partitionname_a, partitionname_b,
gpt_data_a, gpt_data_b, slot_a_status, slot_b_status, lun_a, lun_b
):
part_a = guid_gpt_a.partentries[partitionname_a]
part_b = guid_gpt_b.partentries[partitionname_b]
2024-03-26 00:34:10 -04:00
2024-03-25 01:32:34 -04:00
is_boot = False
if partitionname_a == "boot_a":
is_boot = True
pdata_a, poffset_a, pdata_b, poffset_b = patch_helper(
gpt_data_a, gpt_data_b,
2024-03-26 00:34:10 -04:00
guid_gpt_a, guid_gpt_b,
part_a, part_b,
2024-03-25 01:32:34 -04:00
slot_a_status, slot_b_status,
is_boot
)
if gpt_data_a and gpt_data_b:
2024-03-25 01:36:10 -04:00
entryoffset_a = poffset_a - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
2024-03-26 00:34:10 -04:00
gpt_data_a[entryoffset_a : entryoffset_a + len(pdata_a)] = pdata_a
2024-03-25 01:36:10 -04:00
new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a)
2024-03-26 00:34:10 -04:00
2024-03-25 01:36:10 -04:00
entryoffset_b = poffset_b - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
2024-03-26 00:34:10 -04:00
gpt_data_b[entryoffset_b : entryoffset_b + len(pdata_b)] = pdata_b
2024-03-25 01:36:10 -04:00
new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b)
2024-03-25 01:32:34 -04:00
start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES
byte_offset_patch_a = poffset_a % self.cfg.SECTOR_SIZE_IN_BYTES
2024-03-25 01:36:10 -04:00
cmd_patch_multiple(lun_a, start_sector_patch_a, byte_offset_patch_a, pdata_a)
2024-03-25 01:32:34 -04:00
if lun_a != lun_b:
start_sector_hdr_a = guid_gpt_a.header.current_lba
headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array
new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size]
cmd_patch_multiple(lun_a, start_sector_hdr_a, 0, new_hdr_a)
start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES
byte_offset_patch_b = poffset_b % self.cfg.SECTOR_SIZE_IN_BYTES
cmd_patch_multiple(lun_b, start_sector_patch_b, byte_offset_patch_b, pdata_b)
start_sector_hdr_b = guid_gpt_b.header.current_lba
headeroffset_b = guid_gpt_b.sectorsize
new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size]
cmd_patch_multiple(lun_b, start_sector_hdr_b, 0, new_hdr_b)
return True
2024-03-26 00:34:10 -04:00
return False
2024-03-25 01:32:34 -04:00
2024-03-26 04:05:02 -04:00
def ensure_gpt_hdr_consistency(guid_gpt, backup_guid_gpt, gpt_data, backup_gpt_data):
2024-03-26 00:34:10 -04:00
headeroffset = guid_gpt.sectorsize
prim_corrupted, backup_corrupted = False, False
prim_hdr = gpt_data[headeroffset : headeroffset + guid_gpt.header.header_size]
2024-03-26 03:58:13 -04:00
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset : headeroffset + guid_gpt.header.header_size]
2024-03-26 00:34:10 -04:00
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
prim_corrupted = prim_hdr_crc != test_hdr_crc or prim_part_table_crc != test_part_table_crc
2024-03-26 03:58:13 -04:00
backup_hdr = backup_gpt_data[headeroffset : headeroffset + backup_guid_gpt.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[headeroffset : headeroffset + backup_guid_gpt.header.header_size]
2024-03-26 00:34:10 -04:00
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
backup_corrupted = backup_hdr_crc != test_hdr_crc or backup_part_table_crc != test_part_table_crc
2024-03-26 03:58:13 -04:00
prim_backup_consistent = prim_part_table_crc == backup_part_table_crc
if prim_corrupted or not prim_backup_consistent:
2024-03-26 00:34:10 -04:00
if backup_corrupted:
2024-03-26 03:58:13 -04:00
self.error("both are gpt headers are corrupted, cannot recover")
return False, None, None
gpt_data[2*guid_gpt.sectorsize:] = backup_gpt_data[2*backup_guid_gpt.sectorsize:]
gpt_data = guid_gpt.fix_gpt_crc(gpt_data)
elif backup_corrupted or not prim_backup_consistent:
backup_gpt_data[2*backup_guid_gpt.sectorsize:] = gpt_data[2*guid_gpt.sectorsize:]
backup_gpt_data = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)
return True, gpt_data, backup_gpt_data
2024-03-25 01:32:34 -04:00
2022-06-28 13:53:44 +02:00
if slot.lower() not in ["a", "b"]:
2022-01-26 10:08:33 +01:00
self.error("Only slots a or b are accepted. Aborting.")
return False
2024-03-09 14:12:02 -05:00
slot_a_status = None
2022-01-26 10:08:33 +01:00
if slot == "a":
2024-03-09 14:12:02 -05:00
slot_a_status = True
2022-01-26 10:08:33 +01:00
elif slot == "b":
2024-03-09 14:12:02 -05:00
slot_a_status = False
2024-03-09 15:46:48 -05:00
slot_b_status = not slot_a_status
2022-01-26 10:08:33 +01:00
fpartitions = {}
2024-03-24 22:50:30 -04:00
try:
2024-03-12 01:01:58 -04:00
for lun_a in self.luns:
lunname = "Lun" + str(lun_a)
2024-03-06 01:16:21 -05:00
fpartitions[lunname] = []
2024-03-26 00:34:10 -04:00
check_gpt_hdr = False
2024-03-24 22:50:30 -04:00
gpt_data_a, guid_gpt_a = self.get_gpt(lun_a, int(0), int(0), int(0))
2024-03-26 00:34:10 -04:00
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba)
2024-03-09 15:46:48 -05:00
if guid_gpt_a is None:
2024-03-06 01:16:21 -05:00
break
else:
2024-03-09 15:46:48 -05:00
for partitionname_a in guid_gpt_a.partentries:
2024-03-09 14:12:02 -05:00
slot = partitionname_a.lower()[-2:]
2024-04-17 05:23:13 -04:00
partition_a = backup_guid_gpt_a.partentries[partitionname_a]
2024-03-09 14:12:02 -05:00
if slot == "_a":
2024-04-17 02:42:52 -04:00
active_a = ((partition_a.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if (active_a and slot_a_status) or (not active_a and slot_b_status):
return True
2024-03-09 14:12:02 -05:00
partitionname_b = partitionname_a[:-1] + "b"
2024-03-12 01:01:58 -04:00
if partitionname_b in guid_gpt_a.partentries:
lun_b = lun_a
2024-03-24 22:50:30 -04:00
gpt_data_b = gpt_data_a
2024-03-12 01:01:58 -04:00
guid_gpt_b = guid_gpt_a
2024-03-26 00:34:10 -04:00
backup_gpt_data_b = backup_gpt_data_a
backup_guid_gpt_b = backup_guid_gpt_a
2024-03-12 01:01:58 -04:00
else:
2024-03-25 01:32:34 -04:00
resp = self.detect_partition(arguments=None,
partitionname=partitionname_b,
send_full=True)
2024-03-12 01:01:58 -04:00
if not resp[0]:
self.error(f"Cannot find partition {partitionname_b}")
return False
2024-03-24 22:50:30 -04:00
_, lun_b, gpt_data_b, guid_gpt_b = resp
2024-03-26 00:34:10 -04:00
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0 , 0, guid_gpt_b.header.backup_lba)
2024-04-06 14:08:42 -04:00
if not check_gpt_hdr and partitionname_a[:3] != "xbl": # xbl partition don't need check consistency
2024-03-26 04:05:02 -04:00
sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a, backup_guid_gpt_a, gpt_data_a, backup_gpt_data_a)
2024-03-26 03:58:13 -04:00
if not sts:
2024-03-26 00:34:10 -04:00
return False
if lun_a != lun_b:
2024-03-26 04:05:02 -04:00
sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b, backup_guid_gpt_b, gpt_data_b, backup_gpt_data_b)
2024-03-26 03:58:13 -04:00
if not sts:
2024-03-26 00:34:10 -04:00
return False
check_gpt_hdr = True
2024-03-09 15:46:48 -05:00
2024-03-25 01:32:34 -04:00
update_gpt_info(guid_gpt_a, guid_gpt_b,
partitionname_a, partitionname_b,
gpt_data_a, gpt_data_b,
slot_a_status, slot_b_status,
2024-03-26 00:34:10 -04:00
lun_a, lun_b)
2024-03-24 22:50:30 -04:00
2024-03-26 03:58:13 -04:00
# TODO: this updates the backup gpt header, but is it needed, since it is updated when xbl loads
#update_gpt_info(backup_guid_gpt_a, backup_guid_gpt_b,
# partitionname_a, partitionname_b,
# backup_gpt_data_a, backup_gpt_data_b,
# slot_a_status, slot_b_status,
# lun_a, lun_b)
2024-03-24 22:50:30 -04:00
2024-03-06 01:16:21 -05:00
except Exception as err:
self.error(str(err))
return False
return True
2022-01-26 10:08:33 +01:00
2024-03-24 22:50:30 -04:00
2024-03-05 23:46:43 -05:00
2021-10-25 14:43:33 +02:00
def cmd_test(self, cmd):
2021-12-05 14:32:21 +01:00
token = "1234"
pk = "1234"
data = "<?xml version=\"1.0\" ?>\n<data>\n<" + cmd + " token=\"" + token + "\" pk=\"" + pk + "\" />\n</data>"
2021-10-25 14:43:33 +02:00
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
2021-10-25 14:43:33 +02:00
if b"raw hex token" in val[2]:
return True
if b"opcmd is not enabled" in val[2]:
return True
return False
2021-08-05 09:04:49 +02:00
def cmd_getstorageinfo_string(self):
data = "<?xml version=\"1.0\" ?><data><getstorageinfo /></data>"
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
2021-08-05 09:04:49 +02:00
self.info(f"GetStorageInfo:\n--------------------\n")
2022-01-26 10:08:33 +01:00
data = self.xml.getlog(val.data)
2021-08-05 09:04:49 +02:00
for line in data:
self.info(line)
return True
else:
self.warning("GetStorageInfo command isn't supported.")
return False
def cmd_poke(self, address, data, filename="", info=False):
rf = None
if filename != "":
rf = open(filename, "rb")
SizeInBytes = os.stat(filename).st_size
else:
SizeInBytes = len(data)
if info:
self.info(f"Poke: Address({hex(address)}),Size({hex(SizeInBytes)})")
'''
<?xml version="1.0" ?><data><poke address64="1048576" SizeInBytes="90112" value="0x22 0x00 0x00"/></data>
'''
maxsize = 8
lengthtowrite = SizeInBytes
if lengthtowrite < maxsize:
maxsize = lengthtowrite
pos = 0
old = 0
datawritten = 0
mode = 0
if info:
print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50)
while lengthtowrite > 0:
if rf is not None:
content = hex(int(hexlify(rf.read(maxsize)).decode('utf-8'), 16))
else:
content = 0
if lengthtowrite < maxsize:
maxsize = lengthtowrite
for i in range(0, maxsize):
content = (content << 8) + int(
hexlify(data[pos + maxsize - i - 1:pos + maxsize - i]).decode('utf-8'), 16)
# content=hex(int(hexlify(data[pos:pos+maxsize]).decode('utf-8'),16))
content = hex(content)
if mode == 0:
xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \
f"size_in_bytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n"
else:
xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \
f"SizeInBytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n"
try:
self.cdc.write(xdata[:self.cfg.MaxXMLSizeInBytes])
2022-06-28 13:53:44 +02:00
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
2021-08-05 09:04:49 +02:00
pass
2022-01-26 10:08:33 +01:00
addrinfo = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if b"SizeInBytes" in addrinfo or b"Invalid parameters" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
2022-01-26 10:08:33 +01:00
tmp += self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \
f"SizeInBytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n"
self.cdc.write(xdata[:self.cfg.MaxXMLSizeInBytes])
2022-01-26 10:08:33 +01:00
addrinfo = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if (b'<response' in addrinfo and 'NAK' in addrinfo) or b"Invalid parameters" in addrinfo:
self.error(f"Error:{addrinfo}")
return False
if b"address" in addrinfo and b"can\'t" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
2022-01-26 10:08:33 +01:00
tmp += self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
self.error(f"Error:{addrinfo}")
return False
2022-01-26 10:08:33 +01:00
addrinfo = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if b'<response' in addrinfo and b'NAK' in addrinfo:
print(f"Error:{addrinfo}")
return False
pos += maxsize
datawritten += maxsize
lengthtowrite -= maxsize
if info:
prog = round(float(datawritten) / float(SizeInBytes) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
if info:
self.info("Done writing.")
return True
def cmd_peek(self, address, SizeInBytes, filename="", info=False):
if info:
self.info(f"Peek: Address({hex(address)}),Size({hex(SizeInBytes)})")
wf = None
if filename != "":
wf = open(filename, "wb")
'''
<?xml version="1.0" ?><data><peek address64="1048576" SizeInBytes="90112" /></data>
'''
data = f"<?xml version=\"1.0\" ?><data><peek address64=\"{address}\" " + \
f"size_in_bytes=\"{SizeInBytes}\" /></data>\n"
'''
2024-03-24 22:50:30 -04:00
<?xml version="1.0" encoding="UTF-8" ?><data><log value="Using address 00100000" /></data>
<?xml version="1.0" encoding="UTF-8" ?><data><log value="0x22 0x00 0x00 0xEA 0x70 0x00 0x00 0xEA 0x74 0x00
0x00 0xEA 0x78 0x00 0x00 0xEA 0x7C 0x00 0x00 0xEA 0x80 0x00 0x00 0xEA 0x84 0x00 0x00 0xEA 0x88 0x00 0x00
0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA
0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE
0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF
2021-08-05 09:04:49 +02:00
0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF " /></data>
'''
try:
self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes])
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
2022-01-26 10:08:33 +01:00
addrinfo = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if b"SizeInBytes" in addrinfo or b"Invalid parameters" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
2022-01-26 10:08:33 +01:00
tmp += self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
data = f"<?xml version=\"1.0\" ?><data><peek address64=\"{hex(address)}\" " + \
f"SizeInBytes=\"{hex(SizeInBytes)}\" /></data>"
self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes])
2022-01-26 10:08:33 +01:00
addrinfo = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if (b'<response' in addrinfo and 'NAK' in addrinfo) or b"Invalid parameters" in addrinfo:
self.error(f"Error:{addrinfo}")
return False
if b"address" in addrinfo and b"can\'t" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
2022-01-26 10:08:33 +01:00
tmp += self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
self.error(f"Error:{addrinfo}")
return False
resp = b""
dataread = 0
old = 0
if info:
print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50)
while True:
2022-01-26 10:08:33 +01:00
tmp = self.cdc.read(timeout=None)
2021-08-05 09:04:49 +02:00
if b'<response' in tmp or b"ERROR" in tmp:
break
rdata = self.xml.getlog(tmp)[0].replace("0x", "").replace(" ", "")
tmp2 = b""
try:
tmp2 = binascii.unhexlify(rdata)
except: # pylint: disable=broad-except
print(rdata)
exit(0)
dataread += len(tmp2)
if wf is not None:
wf.write(tmp2)
else:
resp += tmp2
if info:
prog = round(float(dataread) / float(SizeInBytes) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
if wf is not None:
wf.close()
if b'<response' in tmp and b'ACK' in tmp:
if info:
self.info(f"Bytes from {hex(address)}, bytes read {hex(dataread)}, written to {filename}.")
return True
else:
self.error(f"Error:{addrinfo}")
return False
else:
return resp
def cmd_memcpy(self, destaddress, sourceaddress, size):
data = self.cmd_peek(sourceaddress, size)
if data != b"" and data:
if self.cmd_poke(destaddress, data):
return True
return False
def cmd_rawxml(self, data, response=True):
if response:
val = self.xmlsend(data)
2022-01-26 10:08:33 +01:00
if val.resp:
2021-08-05 09:04:49 +02:00
self.info(f"{data} succeeded.")
2022-01-26 10:08:33 +01:00
return val.data
2021-08-05 09:04:49 +02:00
else:
self.error(f"{data} failed.")
2022-01-26 10:08:33 +01:00
self.error(f"{val.error}")
2021-08-05 09:04:49 +02:00
return False
else:
self.xmlsend(data, False)
2021-08-19 22:52:20 +02:00
return True