Reformat code

Fix A bug in enableadb
This commit is contained in:
ColdWindScholar 2024-06-10 01:06:20 +08:00
parent 3e4e569cf7
commit 084ab71db2
33 changed files with 462 additions and 433 deletions

View file

@ -1,25 +1,27 @@
#!/usr/bin/env python3
from edl.Library.tcpclient import tcpclient
class client():
def __init__(self):
self.commands=[]
self.commands = []
def send(self):
self.tcp = tcpclient(1340)
self.tcp.sendcommands(self.commands)
def read(self,src):
def read(self, src):
self.commands.append(f"peekqword:{hex(src)}")
def write(self,dest,value):
def write(self, dest, value):
self.commands.append(f"pokeqword:{hex(dest)},{hex(value)}")
def memcpy(self,dest,src,size):
def memcpy(self, dest, src, size):
self.commands.append(f"memcpy:{hex(dest)},{hex(src)},{hex(size)}")
def main():
exp=client()
exp = client()
exp.commands = [
"send:nop",
"r:boot,boot.img",
@ -29,5 +31,6 @@ def main():
]
exp.send()
if __name__=="__main__":
if __name__ == "__main__":
main()

21
edl
View file

@ -127,28 +127,29 @@ Options:
--resetmode=mode Resetmode for reset (poweroff, reset, edl, etc.)
"""
import logging
import os
import re
import subprocess
import sys
import time
import logging
import subprocess
import re
from docopt import docopt
from edlclient.Config.usb_ids import default_ids
from edlclient.Library.utils import LogBase
from edlclient.Library.Connection.usblib import usb_class
from edlclient.Library.Connection.seriallib import serial_class
from edlclient.Library.sahara import sahara
from edlclient.Library.streaming_client import streaming_client
from edlclient.Library.Connection.usblib import usb_class
from edlclient.Library.firehose_client import firehose_client
from edlclient.Library.streaming import Streaming
from edlclient.Library.sahara import sahara
from edlclient.Library.sahara_defs import cmd_t, sahara_mode_t
from edlclient.Library.streaming import Streaming
from edlclient.Library.streaming_client import streaming_client
from edlclient.Library.utils import LogBase
from edlclient.Library.utils import is_windows
from binascii import hexlify
args = docopt(__doc__, version='3')
print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2023.")
print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2024.")
def parse_cmd(rargs):

View file

@ -5,8 +5,6 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import serial
import serial.tools.list_ports
import inspect
import traceback
from binascii import hexlify

View file

@ -5,25 +5,25 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import os.path
import time
import sys
if not sys.platform.startswith('win32'):
import termios
def _reset_input_buffer():
return
def _reset_input_buffer_org(self):
if not sys.platform.startswith('win32'):
return termios.tcflush(self.fd, termios.TCIFLUSH)
import serial
import serial.tools.list_ports
import inspect
import traceback
from binascii import hexlify
try:
from edlclient.Library.utils import *
from edlclient.Library.Connection.devicehandler import DeviceClass
@ -38,13 +38,13 @@ class serial_class(DeviceClass):
super().__init__(loglevel, portconfig, devclass)
self.is_serial = True
def connect(self, EP_IN=-1, EP_OUT=-1, portname:str=""):
def connect(self, EP_IN=-1, EP_OUT=-1, portname: str = ""):
if self.connected:
self.close()
self.connected = False
if portname == "":
devices=self.detectdevices()
if len(devices)>0:
devices = self.detectdevices()
if len(devices) > 0:
portname = devices[0]
if portname != "":
self.device = serial.Serial(baudrate=115200, bytesize=serial.EIGHTBITS,
@ -88,13 +88,12 @@ class serial_class(DeviceClass):
self.debug("Break set")
def setcontrollinestate(self, RTS=None, DTR=None, isFTDI=False):
if RTS==1:
if RTS == 1:
self.device.setRTS(RTS)
if DTR==1:
if DTR == 1:
self.device.setDTR(DTR)
self.debug("Linecoding set")
def write(self, command, pktsize=None):
if pktsize is None:
pktsize = 512
@ -169,18 +168,18 @@ class serial_class(DeviceClass):
epr = self.device.read
extend = res.extend
if self.xmlread:
info=self.device.read(6)
bytestoread=resplen-len(info)
info = self.device.read(6)
bytestoread = resplen - len(info)
extend(info)
if b"<?xml " in info:
while not b"response " in res or res[-7:]!=b"</data>":
while not b"response " in res or res[-7:] != b"</data>":
extend(epr(1))
return res
bytestoread = resplen
while len(res) < bytestoread:
try:
val=epr(bytestoread)
if len(val)==0:
val = epr(bytestoread)
if len(val) == 0:
break
extend(val)
except Exception as e:
@ -218,5 +217,3 @@ class serial_class(DeviceClass):
self.device.flush()
res = self.usbread(resplen)
return res

View file

@ -5,26 +5,25 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import io
import logging
import usb.core # pyusb
import usb.util
import time
import inspect
import array
import usb.backend.libusb0
from enum import Enum
import inspect
import logging
from binascii import hexlify
from ctypes import c_void_p, c_int
from enum import Enum
import usb.backend.libusb0
import usb.core # pyusb
import usb.util
try:
from edlclient.Library.utils import *
except:
from Library.utils import *
if not is_windows():
import usb.backend.libusb1
from struct import pack, calcsize
import traceback
from struct import pack
try:
from edlclient.Library.Connection.devicehandler import DeviceClass
except:
@ -211,7 +210,7 @@ class usb_class(DeviceClass):
def flush(self):
return
def connect(self, EP_IN=-1, EP_OUT=-1, portname:str=""):
def connect(self, EP_IN=-1, EP_OUT=-1, portname: str = ""):
if self.connected:
self.close()
self.connected = False
@ -389,7 +388,7 @@ class usb_class(DeviceClass):
extend = res.extend
while len(res) < resplen:
try:
resplen=epr(buffer,timeout)
resplen = epr(buffer, timeout)
extend(buffer[:resplen])
if resplen == self.EP_IN.wMaxPacketSize:
break

View file

@ -16,7 +16,7 @@ class generic(metaclass=LogBase):
self.serial = serial
self.args = args
self.__logger.setLevel(loglevel)
self.error=self.__logger.error
self.error = self.__logger.error
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
@ -27,7 +27,7 @@ class generic(metaclass=LogBase):
if res[0]:
lun = res[1]
rpartition = res[2]
if rpartition.sectors <= (0x8000//self.fh.cfg.SECTOR_SIZE_IN_BYTES):
if rpartition.sectors <= (0x8000 // self.fh.cfg.SECTOR_SIZE_IN_BYTES):
offsettopatch = 0x7FFF
sector, offset = self.fh.calc_offset(rpartition.sector, offsettopatch)
else:

View file

@ -36,8 +36,9 @@ except ImportError as e:
nothing = None
pass
class modules(metaclass=LogBase):
def __init__(self, fh, serial:int, supported_functions, loglevel, devicemodel:str, args):
def __init__(self, fh, serial: int, supported_functions, loglevel, devicemodel: str, args):
self.fh = fh
self.args = args
self.serial = serial
@ -132,7 +133,7 @@ class modules(metaclass=LogBase):
if paramdata.data == b"":
self.error("Error on reading param partition.")
return False
wdata = self.ops.enable_ops(paramdata.data, enable,self.devicemodel,self.serial)
wdata = self.ops.enable_ops(paramdata.data, enable, self.devicemodel, self.serial)
if wdata is not None:
self.ops.run()
if self.fh.cmd_program_buffer(lun, rpartition.sector, wdata, False):

View file

@ -31,10 +31,11 @@ class nothing(metaclass=LogBase):
if token1 is None:
token1 = random.randbytes(32).hex()
authresp = token1 + self.projid + ("%x" % self.serial) + self.hashverify
token2 = hashlib.sha256(bytes(authresp,'utf-8')).hexdigest()[:64]
token2 = hashlib.sha256(bytes(authresp, 'utf-8')).hexdigest()[:64]
token3 = self.hashverify
return bytes(f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"{token1}\" token2=\"{token2}\" token3=\"{token3}\"/>\n</data>\n",'utf-8')
return bytes(
f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"{token1}\" token2=\"{token2}\" token3=\"{token3}\"/>\n</data>\n",
'utf-8')
def ntprojectverify(self):
"""
@ -57,9 +58,9 @@ class nothing(metaclass=LogBase):
if __name__ == "__main__":
nt = nothing(fh=None, projid="22111", serial=1729931115)
res=nt.generatetoken(token1="512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671")
org=b"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671\" token2=\"1ecd222465436eb8acc0cfc41e90d1e677165c184ea7d9631615014dac88c669\" token3=\"16386b4035411a770b12507b2e30297c0c5471230b213e6a1e1e701c6a425150\"/>\n</data>\n"
if res!=org:
res = nt.generatetoken(token1="512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671")
org = b"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671\" token2=\"1ecd222465436eb8acc0cfc41e90d1e677165c184ea7d9631615014dac88c669\" token3=\"16386b4035411a770b12507b2e30297c0c5471230b213e6a1e1e701c6a425150\"/>\n</data>\n"
if res != org:
print("Error !")
print(res)
print(nt.generatetoken())

View file

@ -25,6 +25,7 @@ from struct import pack
import logging
from edlclient.Library.utils import LogBase
from edlclient.Library.Modules.oneplus_param import paramtools
try:
from edlclient.Library.cryptutils import cryptutils
except Exception as e:
@ -128,7 +129,8 @@ deviceconfig = {
class oneplus(metaclass=LogBase):
def __init__(self, fh, projid:str="18825", serial=123456, ATOBuild=0, Flash_Mode=0, cf=0, supported_functions=None,
def __init__(self, fh, projid: str = "18825", serial=123456, ATOBuild=0, Flash_Mode=0, cf=0,
supported_functions=None,
args=None, loglevel=logging.INFO):
self.fh = fh
self.__logger = self.__logger
@ -203,7 +205,7 @@ class oneplus(metaclass=LogBase):
else:
assert "Device is not supported"
exit(0)
assert "Unknown projid:"+str(projid)
assert "Unknown projid:" + str(projid)
return None
def run(self):
@ -446,7 +448,7 @@ class oneplus2(metaclass=LogBase):
fh = logging.FileHandler(logfilename)
self.__logger.addHandler(fh)
def crypt_token(self, data, pk, device_timestamp:int, decrypt=False):
def crypt_token(self, data, pk, device_timestamp: int, decrypt=False):
aes = cryptutils().aes()
aeskey = b"\x46\xA5\x97\x30\xBB\x0D\x41\xE8" + bytes(pk, 'utf-8') + \
pack("<Q", device_timestamp) # we get this using setprocstart

View file

@ -109,7 +109,7 @@ class sid(Enum):
'''
class paramtools():
class paramtools:
paramitems = {
sid.PARAM_SID_PRODUCT.value[0]: {
0x18: ["8c", "project_name"],
@ -315,9 +315,9 @@ class paramtools():
def __init__(self, mode, serial):
self.aes_iv = unhexlify("562E17996D093D28DDB3BA695A2E6F58")
self.aes_key = unhexlify("3030304F6E65506C7573383138303030")
if mode==1:
derivedkey=bytes.fromhex("a9264fbf8a"+("%08x"%serial)+"6b4487ea")[:0x1A]
derivedkey=hashlib.sha256(derivedkey).digest()[:16]
if mode == 1:
derivedkey = bytes.fromhex("a9264fbf8a" + ("%08x" % serial) + "6b4487ea")[:0x1A]
derivedkey = hashlib.sha256(derivedkey).digest()[:16]
self.aes_key = derivedkey
def getparam(self, offset, sidindex):
@ -375,7 +375,7 @@ class paramtools():
def parse_encrypted(self, rdata, sid):
data = rdata[(sid * 0x400):(sid * 0x400) + 0x1000]
itemdata, hv, cv, updatecounter = self.decryptsid(data)
if itemdata != None:
if itemdata is not None:
itemdata = bytearray(itemdata)
print(
f"Offset {hex(sid * 0x400)}: hv {hex(hv)}, cv {hex(cv)}, increase_enc_update_counter {hex(updatecounter)}.")
@ -420,7 +420,7 @@ class paramtools():
itemlength = 0x400
itemdata = rdata[pos + 0x18:pos + 0x18 + itemlength]
i = 0
while (i < len(itemdata) - 0x22):
while i < len(itemdata) - 0x22:
sidindex = (pos // 0x400) & 0x1FF
offset = i + 0x18
# if sidindex==0x334 and offset==0x80:
@ -439,7 +439,7 @@ class paramtools():
length = self.parse_data(i, itemdata, offset, param, sidindex)
i += length
if length > 4:
if (length % 4):
if length % 4:
i += 4 - (length % 4)
def parse_data(self, i, itemdata, offset, param, sidindex, encrypted=False):
@ -467,10 +467,10 @@ class paramtools():
name = name + " "
if "PWD Hash" in name:
items = content.split(" ")
pwdhash = items[0][1:9]+"00000000000000000000000000000000000000000000000000000000"
pwdhash = items[0][1:9] + "00000000000000000000000000000000000000000000000000000000"
valid = "True" if items[1] != "-1" else "False"
flag = items[2]
date = items[3] + " "+ items[4][:-1]
date = items[3] + " " + items[4][:-1]
content = f"{date} ({valid},{flag}): {pwdhash}"
ff = f"SID_Index {hex(sidindex)}, Offset {offsetstr}: {name}: {content}"
if encrypted:
@ -936,7 +936,7 @@ def main():
filename = args["<filename>"]
mode = args["--mode"]
serial = args["--serial"]
param = paramtools(mode,serial)
param = paramtools(mode, serial)
with open(filename, 'rb') as rf:
data = rf.read()
param.parse_decrypted_fields(data)
@ -948,7 +948,7 @@ def main():
filename = args["<filename>"]
mode = args["--mode"]
serial = args["--serial"]
param = paramtools(mode,serial)
param = paramtools(mode, serial)
with open(filename, 'rb') as rf:
data = rf.read()
with open(filename + ".patched", 'wb') as wf:
@ -965,7 +965,7 @@ def main():
value = int(args["<value>"], 16)
mode = args["--mode"]
serial = args["--serial"]
param = paramtools(mode,serial)
param = paramtools(mode, serial)
with open(filename, 'rb') as rf:
data = rf.read()
with open(filename + ".patched", 'wb') as wf:
@ -974,7 +974,7 @@ def main():
imei = args["<imei>"]
mode = 0
serial = None
param = paramtools(mode,serial)
param = paramtools(mode, serial)
print("oneplus Factory qr code generator (c) B. Kerler 2019\nGPLv3 License\n----------------------")
print("Code : *#*#5646#*#* , *#808#, *#36446337# = com.android.engineeringmode.manualtest.DecryptActivity")
results = param.gencode([imei, "YOU_CAN_PASS_NOW"])

View file

@ -43,7 +43,7 @@ class xiaomi(metaclass=LogBase):
rsp = self.fh.xmlsend(self.xiaomi_authdata)
if rsp.resp:
if "value" in rsp.resp:
if rsp.resp["value"]=="ACK":
if rsp.resp["value"] == "ACK":
if 'authenticated' in rsp.log[0].lower() and 'true' in rsp.log[0].lower():
return True
return False

View file

@ -425,7 +425,7 @@ class cryptutils:
return q
def pss_verify(self, e, N, msghash, signature, emBits=1024, salt=None):
if salt == None:
if salt is None:
slen = self.digestLen
else:
slen = len(salt)
@ -482,7 +482,7 @@ class cryptutils:
else:
return False
class hash():
class hash:
def __init__(self, hashtype="SHA256"):
if hashtype == "SHA1":
self.hash = self.sha1

View file

@ -7,27 +7,22 @@
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import io
import json
import os.path
import platform
import time
import json
from struct import unpack
from binascii import hexlify
from queue import Queue
from threading import Thread
from edlclient.Library.Modules.nothing import nothing
from edlclient.Library.utils import *
from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE, MAX_PRIORITY, PART_ATT_PRIORITY_BIT
from edlclient.Library.gpt import PART_ATT_PRIORITY_VAL, PART_ATT_ACTIVE_VAL, PART_ATT_MAX_RETRY_COUNT_VAL, PART_ATT_SUCCESSFUL_VAL, PART_ATT_UNBOOTABLE_VAL
from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE
from edlclient.Library.sparse import QCSparse
from edlclient.Library.utils import *
from edlclient.Library.utils import progress
from queue import Queue
from threading import Thread
rq = Queue()
def writedata(filename, rq):
pos = 0
with open(filename, "wb") as wf:
@ -146,11 +141,10 @@ def writefile(wf, q, stop):
break
class asyncwriter():
class asyncwriter:
def __init__(self, wf):
self.writequeue = Queue()
self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,))
self.worker.setDaemon(True)
self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,), daemon=True)
self.stopthreads = False
self.worker.start()
@ -216,9 +210,9 @@ class firehose(metaclass=LogBase):
def detect_partition(self, arguments, partitionname, send_full=False):
if arguments is None:
arguments = {
"--gpt-num-part-entries" : 0,
"--gpt-part-entry-size" : 0,
"--gpt-part-entry-start-lba" : 0
"--gpt-num-part-entries": 0,
"--gpt-part-entry-size": 0,
"--gpt-part-entry-start-lba": 0
}
fpartitions = {}
for lun in self.luns:
@ -231,7 +225,8 @@ class firehose(metaclass=LogBase):
break
else:
if partitionname in guid_gpt.partentries:
return [True, lun, data, guid_gpt] if send_full else [True, lun, guid_gpt.partentries[partitionname]]
return [True, lun, data, guid_gpt] if send_full else [True, lun,
guid_gpt.partentries[partitionname]]
for part in guid_gpt.partentries:
fpartitions[lunname].append(part)
return [False, fpartitions]
@ -682,7 +677,7 @@ class firehose(metaclass=LogBase):
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
if bytestoread!=0:
if bytestoread != 0:
self.error(f"Error:")
for line in info:
self.error(line)
@ -1044,8 +1039,8 @@ class firehose(metaclass=LogBase):
self.parse_storage()
for function in self.supported_functions:
if function == "checkntfeature":
if type(self.devicemodel)==list:
self.devicemodel=self.devicemodel[0]
if type(self.devicemodel) == list:
self.devicemodel = self.devicemodel[0]
self.nothing = nothing(fh=self, projid=self.devicemodel, serial=self.serial,
supported_functions=self.supported_functions,
loglevel=self.loglevel)
@ -1148,7 +1143,7 @@ class firehose(metaclass=LogBase):
try:
serial = line.split("0x")[1][:-1]
if ")" in serial:
serial=serial[:serial.rfind(")")]
serial = serial[:serial.rfind(")")]
self.serial = int(serial, 16)
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
@ -1182,8 +1177,8 @@ class firehose(metaclass=LogBase):
"serial": self.serial
}
if os.path.exists("edl_config.json"):
data = json.loads(open("edl_config.json","rb").read().decode('utf-8'))
if "serial" in data and data["serial"]!=state["serial"]:
data = json.loads(open("edl_config.json", "rb").read().decode('utf-8'))
if "serial" in data and data["serial"] != state["serial"]:
open("edl_config.json", "w").write(json.dumps(state))
else:
self.supported_functions = data["supported_functions"]
@ -1318,26 +1313,29 @@ class firehose(metaclass=LogBase):
if is_boot:
#new_flags |= (PART_ATT_PRIORITY_VAL | PART_ATT_ACTIVE_VAL | PART_ATT_MAX_RETRY_COUNT_VAL)
#new_flags &= (~PART_ATT_SUCCESSFUL_VAL & ~PART_ATT_UNBOOTABLE_VAL)
new_flags = 0x6f << (AB_FLAG_OFFSET*8)
new_flags = 0x6f << (AB_FLAG_OFFSET * 8)
else:
new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8)
new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8)
else:
if is_boot:
#new_flags &= (~PART_ATT_PRIORITY_VAL & ~PART_ATT_ACTIVE_VAL)
#new_flags |= ((MAX_PRIORITY-1) << PART_ATT_PRIORITY_BIT)
new_flags = 0x3a << (AB_FLAG_OFFSET*8)
new_flags = 0x3a << (AB_FLAG_OFFSET * 8)
else:
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8))
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8))
return new_flags
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status, slot_b_status, is_boot):
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status,
slot_b_status, is_boot):
part_entry_size = guid_gpt_a.header.part_entry_size
rf_a = BytesIO(gpt_data_a)
rf_b = BytesIO(gpt_data_b)
entryoffset_a = partition_a.entryoffset - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
entryoffset_a = partition_a.entryoffset - (
(guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - (
(guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
rf_a.seek(entryoffset_a)
rf_b.seek(entryoffset_b)
@ -1360,8 +1358,8 @@ class firehose(metaclass=LogBase):
unpack_fmt = "<I" if size_each_patch == 4 else "<Q"
write_size = len(patch_data)
for i in range(0, write_size, size_each_patch):
pdata_subset = int(unpack(unpack_fmt, patch_data[offset:offset+size_each_patch])[0])
self.cmd_patch( lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, False)
pdata_subset = int(unpack(unpack_fmt, patch_data[offset:offset + size_each_patch])[0])
self.cmd_patch(lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, False)
offset += size_each_patch
return True
@ -1384,11 +1382,11 @@ class firehose(metaclass=LogBase):
if gpt_data_a and gpt_data_b:
entryoffset_a = poffset_a - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
gpt_data_a[entryoffset_a : entryoffset_a + len(pdata_a)] = pdata_a
gpt_data_a[entryoffset_a: entryoffset_a + len(pdata_a)] = pdata_a
new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a)
entryoffset_b = poffset_b - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
gpt_data_b[entryoffset_b : entryoffset_b + len(pdata_b)] = pdata_b
gpt_data_b[entryoffset_b: entryoffset_b + len(pdata_b)] = pdata_b
new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b)
start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES
@ -1397,8 +1395,8 @@ class firehose(metaclass=LogBase):
if lun_a != lun_b:
start_sector_hdr_a = guid_gpt_a.header.current_lba
headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array
new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size]
headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array
new_hdr_a = new_gpt_data_a[headeroffset_a: headeroffset_a + guid_gpt_a.header.header_size]
cmd_patch_multiple(lun_a, start_sector_hdr_a, 0, new_hdr_a)
start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES
@ -1407,7 +1405,7 @@ class firehose(metaclass=LogBase):
start_sector_hdr_b = guid_gpt_b.header.current_lba
headeroffset_b = guid_gpt_b.sectorsize
new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size]
new_hdr_b = new_gpt_data_b[headeroffset_b: headeroffset_b + guid_gpt_b.header.header_size]
cmd_patch_multiple(lun_b, start_sector_hdr_b, 0, new_hdr_b)
return True
return False
@ -1416,16 +1414,17 @@ class firehose(metaclass=LogBase):
headeroffset = guid_gpt.sectorsize
prim_corrupted, backup_corrupted = False, False
prim_hdr = gpt_data[headeroffset : headeroffset + guid_gpt.header.header_size]
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset : headeroffset + guid_gpt.header.header_size]
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
prim_hdr = gpt_data[headeroffset: headeroffset + guid_gpt.header.header_size]
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset: headeroffset + guid_gpt.header.header_size]
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10: 0x10 + 4], test_hdr[0x10: 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58: 0x58 + 4], test_hdr[0x58: 0x58 + 4]
prim_corrupted = prim_hdr_crc != test_hdr_crc or prim_part_table_crc != test_part_table_crc
backup_hdr = backup_gpt_data[headeroffset : headeroffset + backup_guid_gpt.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[headeroffset : headeroffset + backup_guid_gpt.header.header_size]
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
backup_hdr = backup_gpt_data[headeroffset: headeroffset + backup_guid_gpt.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[
headeroffset: headeroffset + backup_guid_gpt.header.header_size]
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10: 0x10 + 4], test_hdr[0x10: 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58: 0x58 + 4], test_hdr[0x58: 0x58 + 4]
backup_corrupted = backup_hdr_crc != test_hdr_crc or backup_part_table_crc != test_part_table_crc
prim_backup_consistent = prim_part_table_crc == backup_part_table_crc
@ -1433,10 +1432,10 @@ class firehose(metaclass=LogBase):
if backup_corrupted:
self.error("both are gpt headers are corrupted, cannot recover")
return False, None, None
gpt_data[2*guid_gpt.sectorsize:] = backup_gpt_data[2*backup_guid_gpt.sectorsize:]
gpt_data[2 * guid_gpt.sectorsize:] = backup_gpt_data[2 * backup_guid_gpt.sectorsize:]
gpt_data = guid_gpt.fix_gpt_crc(gpt_data)
elif backup_corrupted or not prim_backup_consistent:
backup_gpt_data[2*backup_guid_gpt.sectorsize:] = gpt_data[2*guid_gpt.sectorsize:]
backup_gpt_data[2 * backup_guid_gpt.sectorsize:] = gpt_data[2 * guid_gpt.sectorsize:]
backup_gpt_data = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)
return True, gpt_data, backup_gpt_data
@ -1456,7 +1455,7 @@ class firehose(metaclass=LogBase):
fpartitions[lunname] = []
check_gpt_hdr = False
gpt_data_a, guid_gpt_a = self.get_gpt(lun_a, int(0), int(0), int(0))
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba)
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0, 0, guid_gpt_a.header.backup_lba)
if guid_gpt_a is None:
break
else:
@ -1464,7 +1463,8 @@ class firehose(metaclass=LogBase):
slot = partitionname_a.lower()[-2:]
partition_a = backup_guid_gpt_a.partentries[partitionname_a]
if slot == "_a":
active_a = ((partition_a.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active_a = ((partition_a.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if (active_a and slot_a_status) or (not active_a and slot_b_status):
return True
@ -1483,14 +1483,22 @@ class firehose(metaclass=LogBase):
self.error(f"Cannot find partition {partitionname_b}")
return False
_, lun_b, gpt_data_b, guid_gpt_b = resp
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0 , 0, guid_gpt_b.header.backup_lba)
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0, 0,
guid_gpt_b.header.backup_lba)
if not check_gpt_hdr and partitionname_a[:3] != "xbl": # xbl partition don't need check consistency
sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a, backup_guid_gpt_a, gpt_data_a, backup_gpt_data_a)
if not check_gpt_hdr and partitionname_a[
:3] != "xbl": # xbl partition don't need check consistency
sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a,
backup_guid_gpt_a,
gpt_data_a,
backup_gpt_data_a)
if not sts:
return False
if lun_a != lun_b:
sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b, backup_guid_gpt_b, gpt_data_b, backup_gpt_data_b)
sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b,
backup_guid_gpt_b,
gpt_data_b,
backup_gpt_data_b)
if not sts:
return False
check_gpt_hdr = True
@ -1513,8 +1521,6 @@ class firehose(metaclass=LogBase):
return False
return True
def cmd_test(self, cmd):
token = "1234"
pk = "1234"

View file

@ -351,7 +351,7 @@ class firehose_client(metaclass=LogBase):
start_sector=0, num_partition_sectors=1, display=False)
if self.get_storage_info():
totalsectors = (self.cfg.block_size *
self.cfg.total_blocks ) // self.cfg.SECTOR_SIZE_IN_BYTES
self.cfg.total_blocks) // self.cfg.SECTOR_SIZE_IN_BYTES
if len(luns) > 1:
sfilename = filename + f".lun{str(lun)}"
@ -649,7 +649,8 @@ class firehose_client(metaclass=LogBase):
prim_guid_gpt = res[3]
_, backup_guid_gpt = self.firehose.get_gpt(lun, 0, 0, 0, prim_guid_gpt.header.backup_lba)
partition = backup_guid_gpt.partentries["boot_a"]
active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active = ((partition.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if active:
self.printer("Current active slot: a")
return True
@ -659,7 +660,8 @@ class firehose_client(metaclass=LogBase):
prim_guid_gpt = res[3]
_, backup_guid_gpt = self.firehose.get_gpt(lun, 0, 0, 0, prim_guid_gpt.header.backup_lba)
partition = backup_guid_gpt.partentries["boot_b"]
active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active = ((partition.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if active:
self.printer("Current active slot: b")
return True
@ -755,14 +757,15 @@ class firehose_client(metaclass=LogBase):
filenames = []
if self.firehose.modules is not None:
self.firehose.modules.writeprepare()
for fname in filter(os.path.isfile, [ os.path.join(directory, i) for i in os.listdir(directory) ]):
for fname in filter(os.path.isfile, [os.path.join(directory, i) for i in os.listdir(directory)]):
filenames.append(fname)
for lun in luns:
data, guid_gpt = self.firehose.get_gpt(lun, int(options["--gpt-num-part-entries"]),
int(options["--gpt-part-entry-size"]),
int(options["--gpt-part-entry-start-lba"]))
if guid_gpt is None:
self.error("Error: Can not fetch GPT table from device, you may need to use `edl w gpt` to write a partition table first.`")
self.error(
"Error: Can not fetch GPT table from device, you may need to use `edl w gpt` to write a partition table first.`")
break
for filename in filenames:
partname = os.path.basename(filename)

View file

@ -5,18 +5,19 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import os
import sys
import argparse
import colorama
import copy
import logging
import logging.config
from enum import Enum
from binascii import hexlify
from struct import calcsize, unpack, pack
from io import BytesIO
import os
import sys
from binascii import crc32
from binascii import hexlify
from enum import Enum
from struct import calcsize, unpack, pack
import colorama
class ColorFormatter(logging.Formatter):
LOG_COLORS = {

View file

@ -7,9 +7,8 @@
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import logging
from binascii import hexlify
from struct import unpack
import time
from struct import unpack
MAX_PACKET_LEN = 4096

View file

@ -5,13 +5,11 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import time
import inspect
import logging
import os
import sys
import logging
import inspect
from struct import unpack, pack
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
@ -22,6 +20,7 @@ except:
from Library.utils import read_object, print_progress, rmrf, LogBase
from Config.qualcomm_config import sochw, msmids, root_cert_hash
class loader_utils(metaclass=LogBase):
def __init__(self, loglevel=logging.INFO):
self.__logger = self.__logger
@ -38,7 +37,7 @@ class loader_utils(metaclass=LogBase):
self.loaderdb = {}
def init_loader_db(self):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(parent_dir,"..","Loaders")):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(parent_dir, "..", "Loaders")):
for filename in filenames:
fn = os.path.join(dirpath, filename)
found = False
@ -52,13 +51,13 @@ class loader_utils(metaclass=LogBase):
hwid = filename.split("_")[0].lower()
msmid = hwid[:8]
try:
int(msmid,16)
int(msmid, 16)
except:
continue
devid = hwid[8:]
if devid == '':
continue
if len(filename.split("_"))<2:
if len(filename.split("_")) < 2:
continue
pkhash = filename.split("_")[1].lower()
for msmid in self.convertmsmid(msmid):
@ -88,4 +87,3 @@ class loader_utils(metaclass=LogBase):
rmsmid = '0' + rmsmid
msmiddb.append(rmsmid)
return msmiddb

View file

@ -6,11 +6,11 @@
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import os
import pt64
import pt
import argparse
import pt
import pt64
def pt64_walk(data, ttbr, tnsz, levels=3):
print("Dumping page tables (levels=%d)" % levels)

View file

@ -87,6 +87,7 @@ def get_fld(mfld, level):
return table_entry4k(mfld, level)
return None
class descriptor(object):
def get_name(self):
pass

View file

@ -5,21 +5,22 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import time
import inspect
import logging
import os
import sys
import logging
import inspect
import time
from struct import pack
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
from edlclient.Library.utils import read_object, print_progress, rmrf, LogBase
from edlclient.Config.qualcomm_config import sochw, msmids, root_cert_hash
from edlclient.Library.utils import print_progress, rmrf, LogBase
from edlclient.Config.qualcomm_config import msmids, root_cert_hash
from edlclient.Library.loader_db import loader_utils
from edlclient.Library.sahara_defs import ErrorDesc, cmd_t, exec_cmd_t, sahara_mode_t, status_t, \
CommandHandler, SAHARA_VERSION
CommandHandler
class sahara(metaclass=LogBase):
def __init__(self, cdc, loglevel):

View file

@ -12,9 +12,11 @@ from io import BytesIO
SAHARA_VERSION = 2
SAHARA_MIN_VERSION = 1
class DataError(Exception):
pass
class cmd_t:
SAHARA_HELLO_REQ = 0x1
SAHARA_HELLO_RSP = 0x2
@ -36,6 +38,7 @@ class cmd_t:
SAHARA_64BIT_MEMORY_READ_DATA = 0x12
SAHARA_RESET_STATE_MACHINE_ID = 0x13
class cmd_t_version:
SAHARA_HELLO_REQ = 0x1
SAHARA_HELLO_RSP = 1
@ -57,6 +60,7 @@ class cmd_t_version:
SAHARA_64BIT_MEMORY_READ_DATA = 2
SAHARA_RESET_STATE_MACHINE_ID = 2
class exec_cmd_t:
SAHARA_EXEC_CMD_NOP = 0x00
SAHARA_EXEC_CMD_SERIAL_NUM_READ = 0x01
@ -69,6 +73,7 @@ class exec_cmd_t:
SAHARA_EXEC_CMD_GET_COMMAND_ID_LIST = 0x08
SAHARA_EXEC_CMD_GET_TRAINING_DATA = 0x09
class sahara_mode_t:
SAHARA_MODE_IMAGE_TX_PENDING = 0x0
SAHARA_MODE_IMAGE_TX_COMPLETE = 0x1
@ -169,7 +174,7 @@ ErrorDesc = {
class CommandHandler:
def pkt_hello_req(self, data):
if len(data)<0xC * 0x4:
if len(data) < 0xC * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -190,7 +195,7 @@ class CommandHandler:
return req
def pkt_cmd_hdr(self, data):
if len(data)<2*4:
if len(data) < 2 * 4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -201,7 +206,7 @@ class CommandHandler:
return req
def pkt_read_data(self, data):
if len(data)<0x5 * 0x4:
if len(data) < 0x5 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -215,7 +220,7 @@ class CommandHandler:
return req
def pkt_read_data_64(self, data):
if len(data)<0x8 + 0x3 * 0x8:
if len(data) < 0x8 + 0x3 * 0x8:
raise DataError
st = structhelper_io(BytesIO(data))
@ -229,7 +234,7 @@ class CommandHandler:
return req
def pkt_memory_debug(self, data):
if len(data)<0x8 + 0x2 * 0x4:
if len(data) < 0x8 + 0x2 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -242,7 +247,7 @@ class CommandHandler:
return req
def pkt_memory_debug_64(self, data):
if len(data)<0x8 + 0x2 * 0x8:
if len(data) < 0x8 + 0x2 * 0x8:
raise DataError
st = structhelper_io(BytesIO(data))
@ -255,7 +260,7 @@ class CommandHandler:
return req
def pkt_execute_rsp_cmd(self, data):
if len(data)<0x4 * 0x4:
if len(data) < 0x4 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -268,7 +273,7 @@ class CommandHandler:
return req
def pkt_image_end(self, data):
if len(data)<0x4 * 0x4:
if len(data) < 0x4 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -281,7 +286,7 @@ class CommandHandler:
return req
def pkt_done(self, data):
if len(data)<0x3 * 4:
if len(data) < 0x3 * 4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -293,7 +298,7 @@ class CommandHandler:
return req
def pkt_info(self, data):
if len(data)<0x3 * 4 + 0x20:
if len(data) < 0x3 * 4 + 0x20:
raise DataError
st = structhelper_io(BytesIO(data))
@ -306,7 +311,7 @@ class CommandHandler:
return req
def parttbl(self, data):
if len(data)<(0x3 * 4) + 20 + 20:
if len(data) < (0x3 * 4) + 20 + 20:
raise DataError
st = structhelper_io(BytesIO(data))
@ -320,7 +325,7 @@ class CommandHandler:
return req
def parttbl_64bit(self, data):
if len(data)<(0x3 * 8) + 20 + 20:
if len(data) < (0x3 * 8) + 20 + 20:
raise DataError
st = structhelper_io(BytesIO(data))

View file

@ -5,13 +5,13 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import inspect
import logging
import sys
import os
import sys
import inspect
from queue import Queue
from struct import unpack
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)

View file

@ -168,7 +168,7 @@ class Streaming(metaclass=LogBase):
def send_section_header(self, name):
# 0x1b open muliimage, 0xe for user-defined partition
resp = self.send(b"\x1b\x0e" + bytes("0:"+name, 'utf-8') + b"\x00")
resp = self.send(b"\x1b\x0e" + bytes("0:" + name, 'utf-8') + b"\x00")
if resp[0] == 0x1c:
return True
self.error("Error on sending section header")
@ -186,7 +186,7 @@ class Streaming(metaclass=LogBase):
return True
return False
def write_flash(self, lba:int=0, partname="", filename="", info=True):
def write_flash(self, lba: int = 0, partname="", filename="", info=True):
wbsize = 1024
filesize = os.stat(filename).st_size
total = filesize
@ -197,8 +197,8 @@ class Streaming(metaclass=LogBase):
adr = lba
while filesize > 0:
subdata = rf.read(wbsize)
if len(subdata)<1024:
subdata += (1024-len(subdata))*b'\xFF'
if len(subdata) < 1024:
subdata += (1024 - len(subdata)) * b'\xFF'
scmd = b"\x07" + pack("<I", adr) + subdata
resp = self.send(scmd)
if len(resp) == 0 or resp[0] != 0x8:
@ -664,7 +664,7 @@ class Streaming(metaclass=LogBase):
for i in range(partcount):
if magic1 == 0xAA7D1B9a and magic2 == 0x1F7D48BC:
name, length, spare, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB",
data[i * 0x1C:(i * 0x1C) + 0x1C])
data[i * 0x1C:(i * 0x1C) + 0x1C])
else:
name, offset, length, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB",
data[i * 0x1C:(i * 0x1C) + 0x1C])
@ -672,10 +672,10 @@ class Streaming(metaclass=LogBase):
if name[1] != 0x3A:
break
partitions[name[2:].rstrip(b"\x00").decode('utf-8')] = dict(offset=offset,
length=(length+spare) & 0xFFFF,
attr1=attr1, attr2=attr2,
attr3=attr3,
which_flash=which_flash)
length=(length + spare) & 0xFFFF,
attr1=attr1, attr2=attr2,
attr3=attr3,
which_flash=which_flash)
if magic1 == 0xAA7D1B9a and magic2 == 0x1F7D48BC:
offset += length + spare
return partitions
@ -861,9 +861,9 @@ class Streaming(metaclass=LogBase):
val = resp[1].flashId.decode('utf-8') if resp[1].flashId[0] != 0x65 else ""
self.info("Flash memory: %s %s, %s (vendor: 0x%02X image_id: 0x%02X)" % (self.settings.flash_mfr, val,
self.settings.flash_descr,
self.settings.flash_pid,
self.settings.flash_fid))
self.settings.flash_descr,
self.settings.flash_pid,
self.settings.flash_fid))
# self.info("Maximum packet size: %i byte",*((unsigned int*)&rbuf[0x24]))
self.info(
"Page size: %d bytes (%d sectors)" % (self.settings.PAGESIZE, self.settings.sectors_per_page))
@ -923,7 +923,7 @@ class Streaming(metaclass=LogBase):
totallength = length * self.settings.num_pages_per_blk * self.settings.PAGESIZE
progbar.show_progress(prefix="Read", pos=pos, total=totallength, display=info)
for curblock in range(block,block+length):
for curblock in range(block, block + length):
for curpage in range(self.settings.num_pages_per_blk):
data, spare = self.flash_read(curblock, curpage, self.settings.sectors_per_page, cwsize)
pos = (curblock * self.settings.num_pages_per_blk + curpage) * self.settings.PAGESIZE
@ -1079,7 +1079,7 @@ def test_nand_config():
errorids = []
for test in testconfig:
nandid, buswidth, density, pagesize, blocksize, oobsize, bchecc, cfg0, \
cfg1, eccbufcfg, bccbchcfg, badblockbyte = test
cfg1, eccbufcfg, bccbchcfg, badblockbyte = test
res_cfg0, res_cfg1, res_ecc_buf_cfg, res_ecc_bch_cfg = qs.nanddevice.nand_setup(nandid)
if cfg0 != res_cfg0 or cfg1 != res_cfg1 or eccbufcfg != res_ecc_buf_cfg or res_ecc_bch_cfg != bccbchcfg:
errorids.append([nandid, res_cfg0, res_cfg1, res_ecc_buf_cfg, res_ecc_bch_cfg])

View file

@ -54,7 +54,7 @@ class streaming_client(metaclass=LogBase):
self.printer("-------------------------------------------------------------")
for name in partitions:
partition = partitions[name]
if not isinstance(partition,dict):
if not isinstance(partition, dict):
continue
for i in range(0x10 - len(name)):
name += " "
@ -136,7 +136,7 @@ class streaming_client(metaclass=LogBase):
elif cmd == "rf":
sector = 0
sectors = self.streaming.settings.MAXBLOCK * self.streaming.settings.num_pages_per_blk * \
self.streaming.settings.sectors_per_page
self.streaming.settings.sectors_per_page
filename = options["<filename>"]
self.printer(f"Dumping Flash from sector 0 to sector {hex(sectors)}...")
if self.streaming.read_sectors(sector, sectors, filename, True):
@ -323,7 +323,7 @@ class streaming_client(metaclass=LogBase):
self.error(f"Error: Couldn't find partition file: {partitionfilename}")
return False
else:
ptable = open(partitionfilename,"rb").read()
ptable = open(partitionfilename, "rb").read()
else:
self.error("Partition file is needed for writing (--partitionfilename)")
sys.exit(1)

View file

@ -6,63 +6,64 @@
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
class open_mode_type:
OPEN_MODE_NONE = 0x00 # Not opened yet
OPEN_BOOTLOADER = 0x01 # Bootloader Image
OPEN_BOOTABLE = 0x02 # Bootable Image
OPEN_CEFS = 0x03 # CEFS Image
OPEN_MODE_FACTORY = 0x04 # Factory Image
OPEN_MODE_NONE = 0x00 # Not opened yet
OPEN_BOOTLOADER = 0x01 # Bootloader Image
OPEN_BOOTABLE = 0x02 # Bootable Image
OPEN_CEFS = 0x03 # CEFS Image
OPEN_MODE_FACTORY = 0x04 # Factory Image
class open_multi_mode_type:
OPEN_MULTI_MODE_NONE = 0x00 # Not opened yet
OPEN_MULTI_MODE_PBL = 0x01 # Primary Boot Loader
OPEN_MULTI_MODE_QCSBLHDCFG = 0x02 # QC 2ndary Boot Loader Header and Config Data
OPEN_MULTI_MODE_QCSBL = 0x03 # QC 2ndary Boot Loader
OPEN_MULTI_MODE_OEMSBL = 0x04 # OEM 2ndary Boot Loader
OPEN_MULTI_MODE_AMSS = 0x05 # AMSS modem executable
OPEN_MULTI_MODE_APPS = 0x06 # APPS executable
OPEN_MULTI_MODE_OBL = 0x07 # OTP Boot Loader
OPEN_MULTI_MODE_FOTAUI = 0x08 # FOTA UI binarh
OPEN_MULTI_MODE_CEFS = 0x09 # Modem CEFS image
OPEN_MULTI_MODE_APPSBL = 0x0A # APPS Boot Loader
OPEN_MULTI_MODE_APPS_CEFS = 0x0B # APPS CEFS image
OPEN_MULTI_MODE_FLASH_BIN = 0x0C # Flash.bin image for Windows mobile
OPEN_MULTI_MODE_DSP1 = 0x0D # DSP1 runtime image
OPEN_MULTI_MODE_CUSTOM = 0x0E # Image for user defined partition
OPEN_MULTI_MODE_DBL = 0x0F # DBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_OSBL = 0x10 # OSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_FSBL = 0x11 # FSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_DSP2 = 0x12 # DSP2 executable
OPEN_MULTI_MODE_RAW = 0x13 # APPS EFS2 RAW image
OPEN_MULTI_MODE_EMMC_USER = 0x21 # EMMC USER partition
OPEN_MULTI_MODE_EMMC_BOOT0 = 0x22 # EMMC BOOT partition 0
OPEN_MULTI_MODE_EMMC_BOOT1 = 0x23 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_RPMB = 0x24 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_GPP1 = 0x25 # EMMC GPP partition 1
OPEN_MULTI_MODE_EMMC_GPP2 = 0x26 # EMMC GPP partition 2
OPEN_MULTI_MODE_EMMC_GPP3 = 0x27 # EMMC GPP partition 3
OPEN_MULTI_MODE_EMMC_GPP4 = 0x28 # EMMC GPP partition 4
OPEN_MULTI_MODE_NONE = 0x00 # Not opened yet
OPEN_MULTI_MODE_PBL = 0x01 # Primary Boot Loader
OPEN_MULTI_MODE_QCSBLHDCFG = 0x02 # QC 2ndary Boot Loader Header and Config Data
OPEN_MULTI_MODE_QCSBL = 0x03 # QC 2ndary Boot Loader
OPEN_MULTI_MODE_OEMSBL = 0x04 # OEM 2ndary Boot Loader
OPEN_MULTI_MODE_AMSS = 0x05 # AMSS modem executable
OPEN_MULTI_MODE_APPS = 0x06 # APPS executable
OPEN_MULTI_MODE_OBL = 0x07 # OTP Boot Loader
OPEN_MULTI_MODE_FOTAUI = 0x08 # FOTA UI binarh
OPEN_MULTI_MODE_CEFS = 0x09 # Modem CEFS image
OPEN_MULTI_MODE_APPSBL = 0x0A # APPS Boot Loader
OPEN_MULTI_MODE_APPS_CEFS = 0x0B # APPS CEFS image
OPEN_MULTI_MODE_FLASH_BIN = 0x0C # Flash.bin image for Windows mobile
OPEN_MULTI_MODE_DSP1 = 0x0D # DSP1 runtime image
OPEN_MULTI_MODE_CUSTOM = 0x0E # Image for user defined partition
OPEN_MULTI_MODE_DBL = 0x0F # DBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_OSBL = 0x10 # OSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_FSBL = 0x11 # FSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_DSP2 = 0x12 # DSP2 executable
OPEN_MULTI_MODE_RAW = 0x13 # APPS EFS2 RAW image
OPEN_MULTI_MODE_EMMC_USER = 0x21 # EMMC USER partition
OPEN_MULTI_MODE_EMMC_BOOT0 = 0x22 # EMMC BOOT partition 0
OPEN_MULTI_MODE_EMMC_BOOT1 = 0x23 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_RPMB = 0x24 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_GPP1 = 0x25 # EMMC GPP partition 1
OPEN_MULTI_MODE_EMMC_GPP2 = 0x26 # EMMC GPP partition 2
OPEN_MULTI_MODE_EMMC_GPP3 = 0x27 # EMMC GPP partition 3
OPEN_MULTI_MODE_EMMC_GPP4 = 0x28 # EMMC GPP partition 4
class response_code_type:
ACK = 0x00 # Successful
RESERVED_1 = 0x01 # Reserved
NAK_INVALID_DEST = 0x02 # Failure: destination address is invalid.
NAK_INVALID_LEN = 0x03 # Failure: operation length is invalid.
NAK_EARLY_END = 0x04 # Failure: packet was too short for this cmd.
NAK_INVALID_CMD = 0x05 # Failure: invalid command
RESERVED_6 = 0x06 # Reserved
NAK_FAILED = 0x07 # Failure: operation did not succeed.
NAK_WRONG_IID = 0x08 # Failure: intelligent ID code was wrong.
NAK_BAD_VPP = 0x09 # Failure: programming voltage out of spec
NAK_VERIFY_FAILED = 0x0A # Failure: readback verify did not match
RESERVED_0xB = 0x0B # Reserved
NAK_INVALID_SEC_CODE = 0x0C # Failure: Incorrect security code
NAK_CANT_POWER_DOWN = 0x0D # Failure: Cannot power down phone
NAK_NAND_NOT_SUPP = 0x0E # Failure: Download to NAND not supported
NAK_CMD_OUT_SEQ = 0x0F # Failure: Command out of sequence
NAK_CLOSE_FAILED = 0x10 # Failure: Close command failed
NAK_BAD_FEATURE_BITS = 0x11 # Failure: Incompatible Feature Bits
NAK_NO_SPACE = 0x12 # Failure: Out of space
NAK_INVALID_SEC_MODE = 0x13 # Failure: Multi-Image invalid security mode
NAK_MIBOOT_NOT_SUPP = 0x14 # Failure: Multi-Image boot not supported
NAK_PWROFF_NOT_SUPP = 0x15 # Failure: Power off not supported
ACK = 0x00 # Successful
RESERVED_1 = 0x01 # Reserved
NAK_INVALID_DEST = 0x02 # Failure: destination address is invalid.
NAK_INVALID_LEN = 0x03 # Failure: operation length is invalid.
NAK_EARLY_END = 0x04 # Failure: packet was too short for this cmd.
NAK_INVALID_CMD = 0x05 # Failure: invalid command
RESERVED_6 = 0x06 # Reserved
NAK_FAILED = 0x07 # Failure: operation did not succeed.
NAK_WRONG_IID = 0x08 # Failure: intelligent ID code was wrong.
NAK_BAD_VPP = 0x09 # Failure: programming voltage out of spec
NAK_VERIFY_FAILED = 0x0A # Failure: readback verify did not match
RESERVED_0xB = 0x0B # Reserved
NAK_INVALID_SEC_CODE = 0x0C # Failure: Incorrect security code
NAK_CANT_POWER_DOWN = 0x0D # Failure: Cannot power down phone
NAK_NAND_NOT_SUPP = 0x0E # Failure: Download to NAND not supported
NAK_CMD_OUT_SEQ = 0x0F # Failure: Command out of sequence
NAK_CLOSE_FAILED = 0x10 # Failure: Close command failed
NAK_BAD_FEATURE_BITS = 0x11 # Failure: Incompatible Feature Bits
NAK_NO_SPACE = 0x12 # Failure: Out of space
NAK_INVALID_SEC_MODE = 0x13 # Failure: Multi-Image invalid security mode
NAK_MIBOOT_NOT_SUPP = 0x14 # Failure: Multi-Image boot not supported
NAK_PWROFF_NOT_SUPP = 0x15 # Failure: Power off not supported

View file

@ -5,20 +5,21 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import sys
import codecs
import copy
import datetime as dt
import logging
import logging.config
import codecs
import struct
import os
import shutil
import stat
import colorama
import copy
import datetime as dt
import struct
import sys
import time
from io import BytesIO
from struct import unpack, pack
from struct import unpack
import colorama
try:
from capstone import *
@ -124,7 +125,7 @@ class progress:
def calcProcessTime(self, starttime, cur_iter, max_iter):
telapsed = time.time() - starttime
if telapsed > 0 and cur_iter > 0:
testimated = (telapsed / cur_iter) * (max_iter)
testimated = (telapsed / cur_iter) * max_iter
finishtime = starttime + testimated
finishtime = dt.datetime.fromtimestamp(finishtime).strftime("%H:%M:%S") # in time
lefttime = testimated - telapsed # in seconds
@ -575,7 +576,7 @@ class patchtools:
badchars = self.has_bad_uart_chars(data)
if not badchars:
badchars = self.has_bad_uart_chars(data2)
if not (badchars):
if not badchars:
return div
div += 4
@ -685,7 +686,7 @@ class patchtools:
continue
rt += 1
prep = data[rt:].find(t[i])
if (prep != 0):
if prep != 0:
error = 1
break
rt += len(t[i])
@ -699,7 +700,7 @@ class patchtools:
return None
def read_object(data: object, definition: object) -> object:
def read_object(data: object, definition: object) -> dict:
"""
Unpacks a structure using the given data and definition.
"""

View file

@ -17,7 +17,7 @@ class xmlparser:
continue
line = b"<?xml" + line
if b"\xf0\xe9\x88\x14" in line:
line=line.replace(b"\xf0\xe9\x88\x14",b"")
line = line.replace(b"\xf0\xe9\x88\x14", b"")
parser = ET.XMLParser(encoding="utf-8")
try:
tree = ET.fromstring(line, parser=parser)
@ -37,7 +37,7 @@ class xmlparser:
continue
line = b"<?xml" + line
if b"\xf0\xe9\x88\x14" in line:
line=line.replace(b"\xf0\xe9\x88\x14",b"")
line = line.replace(b"\xf0\xe9\x88\x14", b"")
parser = ET.XMLParser(encoding="utf-8")
try:
tree = ET.fromstring(line, parser=parser)

View file

@ -7,43 +7,45 @@
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
# Beagle to EDL Loader
import os,sys
import sys
from struct import unpack
def main():
if len(sys.argv)<2:
print("Usage: ./beagle_to_loader.py [beagle_log.bin] [loader.elf]")
sys.exit(0)
with open(sys.argv[1],"rb") as rf:
data=rf.read()
outdata=bytearray()
i=0
seq=b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx=data.find(seq)
if idx==-1:
if i==0:
seq=b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i+=1
continue
else:
break
else:
cmd=unpack("<I", data[idx:idx+4])[0]
if cmd==0x03:
cmd,tlen,slen,offset,length=unpack("<IIIII",data[idx:idx+0x14])
elif cmd==0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x14:]
print("Offset : %08X Length: %08X" %(offset,length))
while len(outdata)<offset+length:
outdata.append(0xFF)
outdata[offset:offset+length]=data[:length]
i+=1
wf.write(outdata)
print("Done.")
if __name__=="__main__":
main()
def main():
if len(sys.argv) < 2:
print("Usage: ./beagle_to_loader.py [beagle_log.bin] [loader.elf]")
sys.exit(0)
with open(sys.argv[1], "rb") as rf:
data = rf.read()
outdata = bytearray()
i = 0
seq = b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx = data.find(seq)
if idx == -1:
if i == 0:
seq = b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i += 1
continue
else:
break
else:
cmd = unpack("<I", data[idx:idx + 4])[0]
if cmd == 0x03:
cmd, tlen, slen, offset, length = unpack("<IIIII", data[idx:idx + 0x14])
elif cmd == 0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x14:]
print("Offset : %08X Length: %08X" % (offset, length))
while len(outdata) < offset + length:
outdata.append(0xFF)
outdata[offset:offset + length] = data[:length]
i += 1
wf.write(outdata)
print("Done.")
if __name__ == "__main__":
main()

View file

@ -38,12 +38,13 @@ import logging.config
import logging.handlers
import colorama
itoa64 = bytearray(b"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")
def _crypt_to64(s, v, n):
out=bytearray()
out = bytearray()
while --n >= 0:
out.append(itoa64[v&0x3f])
out.append(itoa64[v & 0x3f])
v >>= 6
@ -191,7 +192,8 @@ class connection:
mode = "AT"
break
elif device.pid == 0x1403:
print(f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
print(
f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
mode = "Web"
self.ZTE_Web()
break
@ -358,7 +360,7 @@ class adbtools(metaclass=LogBase):
if cn.connected:
while True:
resp2 = cn.serial.read(8)
if len(resp2)>0:
if len(resp2) > 0:
break
cn.serial.write(mode)
response = cn.serial.read(8)
@ -378,36 +380,33 @@ class adbtools(metaclass=LogBase):
res = False
if "vendor" in info:
if info["vendor"] == "Sierra Wireless" or info["vendor"] == "Netgear":
res=self.SierraWireless(cn, info, enable)
res = self.SierraWireless(cn, info, enable)
elif info["vendor"] == "Quectel":
print("Sending at switch command")
res=self.Quectel(cn, enable)
res = self.Quectel(cn, enable)
elif info["vendor"] == "ZTE":
print("Sending switch command via diag")
res=self.ZTE(cn, enable)
res = self.ZTE(cn, enable)
elif info["vendor"] == "Simcom":
res=self.Simcom(cn)
res = self.Simcom(cn, enable)
elif info["vendor"] == "Fibocom":
res=self.Fibocom(cn, enable)
res = self.Fibocom(cn, enable)
elif info["vendor"] == "Alcatel":
res=self.Alcatel(enable)
res = self.Alcatel(enable)
elif info["vendor"] == "Samsung":
res=self.Samsung(cn, enable)
if enable:
mode="enabled"
else:
mode="disabled"
res = self.Samsung(cn, enable)
mode = "enabled" if enable else "disabled"
if res:
print("ADB successfully "+mode)
print("ADB successfully " + mode)
else:
print("ADB couldn't be "+mode)
print("ADB couldn't be " + mode)
cn.close()
else:
print("No device detected")
def SierraWireless(self, cn, info, enable):
print("Sending at switch command")
kg = SierraKeygen(cn=cn,devicegeneration=None)
kg = SierraKeygen(cn=cn, devicegeneration=None)
kg.detectdevicegeneration()
if kg.openlock():
if enable:
@ -428,25 +427,25 @@ class adbtools(metaclass=LogBase):
print("Successfully enabled PID 68E2")
return True
index=-1
type=-1
bitmask=-1
resp=cn.send("AT!USBCOMP?")
if resp!=-1:
index = -1
type = -1
bitmask = -1
resp = cn.send("AT!USBCOMP?")
if resp != -1:
print(resp)
for val in resp:
if "Config Index" in val:
index=val[val.find("Config Index: ")+14:]
index = val[val.find("Config Index: ") + 14:]
elif "Config Type" in val:
type=val[val.find("Config Type: ")+14:].replace(" (Generic)","")
type = val[val.find("Config Type: ") + 14:].replace(" (Generic)", "")
elif "Interface bitmask" in val:
bitmask=val[val.find("Interface bitmask: ")+19:]
bitmask = val[val.find("Interface bitmask: ") + 19:]
if " " in bitmask:
bitmask="0x"+bitmask.split(" ")[0]
if index!=-1 and type!=-1 and bitmask!=1:
index=int(index)
type=int(type)
bitmask=int(bitmask,16)
bitmask = "0x" + bitmask.split(" ")[0]
if index != -1 and type != -1 and bitmask != 1:
index = int(index)
type = int(type)
bitmask = int(bitmask, 16)
# AT!USBCOMP=<Config Index>,<Config Type>,<Interface bitmask>
# <Config Index> - configuration index to which the composition applies, should be 1
# <Config Type> - 1:Generic, 2:USBIF-MBIM, 3:RNDIS
@ -465,13 +464,13 @@ class adbtools(metaclass=LogBase):
# ECM - 0x00080000,
# UBIST - 0x00200000
#if enable:
cmd=f"AT!USBCOMP={index},{type},%08X" % 0x0080010E
cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0080010E
#else:
# cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0000010D
resp=cn.send(cmd)
if resp!=-1:
resp=cn.send("AT!RESET")
if resp!=-1:
resp = cn.send(cmd)
if resp != -1:
resp = cn.send("AT!RESET")
if resp != -1:
return True
return False
return True
@ -600,7 +599,7 @@ def main():
else:
enable = False
ad.run(port=args.port, enable=enable)
#ad.meta(port=args.port)
# ad.meta(port=args.port)
if __name__ == "__main__":

View file

@ -5,25 +5,28 @@
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import hashlib
import inspect
import os
import sys
from os import walk
import hashlib
from struct import unpack, pack
from shutil import copyfile
import os, sys, inspect
from io import BytesIO
from Library.utils import elf
from Library.loader_db import loader_utils
from os import walk
from shutil import copyfile
from struct import unpack
from Config.qualcomm_config import vendor
from Library.loader_db import loader_utils
from Library.utils import elf
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
lu=loader_utils()
lu = loader_utils()
class MBN:
def __init__(self, memory):
self.imageid, self.flashpartitionversion, self.imagesrc, self.loadaddr, self.imagesz, self.codesz, \
self.sigptr, self.sigsz, self.certptr, self.certsz = unpack("<IIIIIIIIII", memory[0xC:0xC + 40])
self.sigptr, self.sigsz, self.certptr, self.certsz = unpack("<IIIIIIIIII", memory[0xC:0xC + 40])
class Signed:
@ -90,9 +93,9 @@ def extract_hdr(memsection, version, sign_info, mem_section, code_size, signatur
anti_rollback_version=unpack("<I", mm[md_offset:md_offset + 4])[0]
'''
if version==6:
if version == 6:
signatureoffset = memsection.file_start_addr + 0x30 + md_size + code_size + signature_size
elif version==7:
elif version == 7:
signatureoffset = memsection.file_start_addr + 0x28 + hdr1 + hdr2 + hdr3 + md_size + code_size + hdr4
try:
if mem_section[signatureoffset] != 0x30:
@ -106,7 +109,8 @@ def extract_hdr(memsection, version, sign_info, mem_section, code_size, signatur
len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
casignature2offset = signatureoffset + len1
len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
idx = signatureoffset
signature = {}
@ -171,7 +175,8 @@ def extract_old_hdr(signatureoffset, sign_info, mem_section, code_size, signatur
len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
casignature2offset = signatureoffset + len1
len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
sign_info.pk_hash = hashlib.sha256(rootsignature3).hexdigest()
idx = signatureoffset
@ -220,7 +225,7 @@ def extract_old_hdr(signatureoffset, sign_info, mem_section, code_size, signatur
def init_loader_db():
loaderdb = {}
loaders=os.path.join(current_dir,"..","..","Loaders")
loaders = os.path.join(current_dir, "..", "..", "Loaders")
if not os.path.exists(loaders):
loaders = os.path.join(current_dir, "Loaders")
if not os.path.exists(loaders):
@ -326,7 +331,7 @@ def main(argv):
filelist = []
rt = open(os.path.join(outputdir, argv[1] + ".log"), "w")
for filename in file_list:
filesize=os.stat(filename).st_size
filesize = os.stat(filename).st_size
elfpos = 0
with open(filename, 'rb') as rhandle:
data = rhandle.read()
@ -339,30 +344,30 @@ def main(argv):
signinfo.filename = filename
signinfo.filesize = os.stat(filename).st_size
while elfpos<filesize:
if elfpos==-1:
while elfpos < filesize:
if elfpos == -1:
break
mem_section = data[elfpos:]
elfheader = elf(mem_section, signinfo.filename)
if len(elfheader.pentry)<4:
elfpos = data.find(b"\x7FELF", elfpos+1)
if len(elfheader.pentry) < 4:
elfpos = data.find(b"\x7FELF", elfpos + 1)
continue
idx = 0
for entry in elfheader.pentry:
if entry.p_type==0 and entry.p_flags&0xF000000==0x2000000:
if entry.p_type == 0 and entry.p_flags & 0xF000000 == 0x2000000:
break
idx+=1
idx += 1
if 'memorylayout' in dir(elfheader):
memsection = elfheader.memorylayout[idx]
try:
sect=BytesIO(mem_section[memsection.file_start_addr+0x4:])
version = int.from_bytes(sect.read(4),'little')
hdr1 = int.from_bytes(sect.read(4),'little')
hdr2 = int.from_bytes(sect.read(4),'little')
hdr3 = int.from_bytes(sect.read(4),'little')
code_size = int.from_bytes(sect.read(4),'little')
hdr4 = int.from_bytes(sect.read(4),'little')
signature_size = int.from_bytes(sect.read(4),'little')
sect = BytesIO(mem_section[memsection.file_start_addr + 0x4:])
version = int.from_bytes(sect.read(4), 'little')
hdr1 = int.from_bytes(sect.read(4), 'little')
hdr2 = int.from_bytes(sect.read(4), 'little')
hdr3 = int.from_bytes(sect.read(4), 'little')
code_size = int.from_bytes(sect.read(4), 'little')
hdr4 = int.from_bytes(sect.read(4), 'little')
signature_size = int.from_bytes(sect.read(4), 'little')
# cert_chain_size=unpack("<I", mem_section[memsection.file_start_addr + 0x24:memsection.file_start_addr + 0x24 + 0x4])[0]
except Exception as err:
print(err)
@ -381,7 +386,8 @@ def main(argv):
filelist.append(signinfo)
break
elif version >= 6: # SDM
signinfo = extract_hdr(memsection, version, signinfo, mem_section, code_size, signature_size, hdr1,
signinfo = extract_hdr(memsection, version, signinfo, mem_section, code_size, signature_size,
hdr1,
hdr2, hdr3, hdr4)
if signinfo is None:
continue
@ -390,7 +396,7 @@ def main(argv):
else:
print("Unknown version for " + filename)
continue
if elfpos == -1 and int.from_bytes(data[:4],'little') == 0x844BDCD1:
if elfpos == -1 and int.from_bytes(data[:4], 'little') == 0x844BDCD1:
mbn = MBN(mem_section)
if mbn.sigsz == 0:
print("%s has no signature." % filename)
@ -414,13 +420,13 @@ def main(argv):
loaderlists = {}
for item in sorted_x:
if item.oem_id != '':
oemid=int(item.oem_id,16)
oemid = int(item.oem_id, 16)
if oemid in vendor:
oeminfo = vendor[oemid]
else:
oeminfo=item.oem_id
if len(item.sw_id)<16:
item.sw_id="0"*(16-len(item.sw_id))+item.sw_id
oeminfo = item.oem_id
if len(item.sw_id) < 16:
item.sw_id = "0" * (16 - len(item.sw_id)) + item.sw_id
info = f"OEM:{oeminfo}\tMODEL:{item.model_id}\tHWID:{item.hw_id}\tSWID:{item.sw_id}\tSWSIZE:{item.sw_size}\tPK_HASH:{item.pk_hash}\t{item.filename}\t{str(item.filesize)}"
if item.oem_version != '':
info += "\tOEMVER:" + item.oem_version + "\tQCVER:" + item.qc_version + "\tVAR:" + item.image_variant
@ -444,11 +450,11 @@ def main(argv):
if b"peek\x00" in data:
auth += "_peek"
fna = os.path.join(outputdir, (
hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower())
hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower())
if not os.path.exists(fna):
copyfile(item.filename,
os.path.join(outputdir, hwid + "_" + (
loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower()))
loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower()))
elif item.filesize > os.stat(fna).st_size:
copyfile(item.filename, os.path.join(outputdir,
(hwid + "_" + loader_info.pk_hash[
@ -470,7 +476,7 @@ def main(argv):
except:
continue
else:
print("Unknown :"+item.filename)
print("Unknown :" + item.filename)
copyfile(item.filename, os.path.join(outputdir, "Unknown", os.path.basename(item.filename).lower()))
for item in filelist:

View file

@ -90,7 +90,7 @@ subnvitem_type = [
]
class fs_factimage_read_info():
class fs_factimage_read_info:
def_fs_factimage_read_info = [
("stream_state", "B"), # 0 indicates no more data to be sent, otherwise set to 1
("info_cluster_sent", "B"), # 0 indicates if info_cluster was not sent, else 1
@ -117,7 +117,7 @@ class fs_factimage_read_info():
return data
class FactoryHeader():
class FactoryHeader:
def_factory_header = [
("magic1", "I"),
("magic2", "I"),
@ -160,7 +160,7 @@ class FactoryHeader():
return data
class nvitem():
class nvitem:
item = 0x0
data = b""
status = 0x0
@ -409,7 +409,7 @@ class qcdiag(metaclass=LogBase):
self.cdc.close(True)
def send(self, cmd):
if self.hdlc != None:
if self.hdlc is not None:
return self.hdlc.send_cmd_np(cmd)
def cmd_info(self):
@ -809,7 +809,7 @@ class qcdiag(metaclass=LogBase):
return False
write_handle.close()
if efserr == False:
if not efserr:
print("Successfully read EFS.")
return True
else:
@ -1408,7 +1408,7 @@ def main():
parser_nvwritesub.add_argument("-debugmode", help="[Option] Enable verbose logging", action="store_true")
parser_writeimei = subparser.add_parser("writeimei", help="Write imei")
parser_writeimei.add_argument("imei", metavar=("<imei1,imei2,...>"), help="[Option] IMEI to write", default="")
parser_writeimei.add_argument("imei", metavar="<imei1,imei2,...>", help="[Option] IMEI to write", default="")
parser_writeimei.add_argument("-vid", metavar="<vid>", help="[Option] Specify vid", default="")
parser_writeimei.add_argument("-pid", metavar="<pid>", help="[Option] Specify pid", default="")
parser_writeimei.add_argument("-interface", metavar="<pid>", help="[Option] Specify interface number, default=0)",

View file

@ -17,6 +17,7 @@ import logging.config
import logging.handlers
import colorama
class ColorFormatter(logging.Formatter):
LOG_COLORS = {
logging.ERROR: colorama.Fore.RED,
@ -153,8 +154,8 @@ infotable = {
"SDX65": ["MR6400", "MR6500", "MR6110", "MR6150", "MR6450", "MR6550"]
}
# 0 MC8775_H2.0.8.19 !OPENLOCK, !OPENCND .. MC8765V,MC8765,MC8755V,MC8775,MC8775V,MC8775,AC850,
# AC860,AC875,AC881,AC881U,AC875, AC340U 1.13.12.14
# 0 MC8775_H2.0.8.19 !OPENLOCK, !OPENCND .. MC8765V,MC8765,MC8755V,MC8775,MC8775V,MC8775,AC850,
# AC860,AC875,AC881,AC881U,AC875, AC340U 1.13.12.14
keytable = bytearray([0xF0, 0x14, 0x55, 0x0D, 0x5E, 0xDA, 0x92, 0xB3, 0xA7, 0x6C, 0xCE, 0x84, 0x90, 0xBC, 0x7F, 0xED,
# 1 MC8775_H2.0.8.19 AC340U, OPENMEP default
0x61, 0x94, 0xCE, 0xA7, 0xB0, 0xEA, 0x4F, 0x0A, 0x73, 0xC5, 0xC3, 0xA6, 0x5E, 0xEC, 0x1C, 0xE2,
@ -210,7 +211,8 @@ keytable = bytearray([0xF0, 0x14, 0x55, 0x0D, 0x5E, 0xDA, 0x92, 0xB3, 0xA7, 0x6C
0x46, 0x30, 0x33, 0x43, 0x44, 0x36, 0x42, 0x34, 0x41, 0x32, 0x31, 0x32, 0x30, 0x35, 0x39, 0x37
])
class SierraGenerator():
class SierraGenerator:
tbl = bytearray()
rtbl = bytearray()
devicegeneration = None
@ -265,7 +267,7 @@ class SierraGenerator():
{"challenge": "BE96CBBEE0829BCA", "devicegeneration": "MDM9200", "response": "EEDBF8BFF8DAE346"},
{"challenge": "20E253156762DACE", "devicegeneration": "SDX55", "response": "03940D7067145323"},
{"challenge": "2387885E7D290FEE", "devicegeneration": "MDM9x15A", "response": "DC3E51897BAA9C1E"},
{"challenge": "4B1FEF9FD43C6DAA", "devicegeneration": "SDX65", "response":"1253C1B1E447B697"}
{"challenge": "4B1FEF9FD43C6DAA", "devicegeneration": "SDX65", "response": "1253C1B1E447B697"}
]
for test in test_table:
challenge = test["challenge"]
@ -425,7 +427,7 @@ class connection:
def readreply(self):
info = []
if self.serial is not None:
while (True):
while True:
tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '')
if "OK" in info:
return info
@ -464,7 +466,7 @@ class SierraKeygen(metaclass=LogBase):
def __init__(self, cn, devicegeneration=None):
self.cn = cn
self.keygen = SierraGenerator()
if devicegeneration == None:
if devicegeneration is None:
self.detectdevicegeneration()
else:
self.devicegeneration = devicegeneration
@ -528,7 +530,7 @@ class SierraKeygen(metaclass=LogBase):
devicegeneration = "SDX55"
else: # MR6400 NTGX65_10.04.13.03
devicegeneration = "SDX65"
# MR6550 NTGX65_12.01.31.00
# MR6550 NTGX65_12.01.31.00
devicegeneration = "SDX65"
else:
devicegeneration = ""

View file

@ -7,51 +7,53 @@
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
# TXT to EDL Loader (c) B.Kerler 2023
import os,sys
import os, sys
from struct import unpack
def main():
if len(sys.argv)<2:
print("Usage: ./txt_to_loader.py [log.txt] [loader.elf]")
sys.exit(0)
with open(sys.argv[1],"rb") as rf:
data=bytearray()
for line in rf.readlines():
if line[0]==0x20:
tt=line.split(b" ")[:-1]
tt=tt[1:17]
xx=b"".join(tt)
data.extend(bytes.fromhex(xx.decode('utf-8')))
if len(sys.argv) < 2:
print("Usage: ./txt_to_loader.py [log.txt] [loader.elf]")
sys.exit(0)
with open(sys.argv[1], "rb") as rf:
data = bytearray()
for line in rf.readlines():
if line[0] == 0x20:
tt = line.split(b" ")[:-1]
tt = tt[1:17]
xx = b"".join(tt)
data.extend(bytes.fromhex(xx.decode('utf-8')))
outdata=bytearray()
i=0
seq=b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx=data.find(seq)
if idx==-1:
if i==0:
seq=b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i+=1
continue
else:
break
else:
cmd=unpack("<I", data[idx:idx+4])[0]
if cmd==0x03:
cmd,tlen,slen,offset,length=unpack("<IIIII",data[idx:idx+0x14])
elif cmd==0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x20:]
print("Offset : %08X Length: %08X" %(offset,length))
while len(outdata)<offset+length:
outdata.append(0xFF)
outdata[offset:offset+length]=data[:length]
i+=1
data = data[length:]
wf.write(outdata)
outdata = bytearray()
i = 0
seq = b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx = data.find(seq)
if idx == -1:
if i == 0:
seq = b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i += 1
continue
else:
break
else:
cmd = unpack("<I", data[idx:idx + 4])[0]
if cmd == 0x03:
cmd, tlen, slen, offset, length = unpack("<IIIII", data[idx:idx + 0x14])
elif cmd == 0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x20:]
print("Offset : %08X Length: %08X" % (offset, length))
while len(outdata) < offset + length:
outdata.append(0xFF)
outdata[offset:offset + length] = data[:length]
i += 1
data = data[length:]
wf.write(outdata)
print("Done.")
if __name__=="__main__":
main()
print("Done.")
if __name__ == "__main__":
main()