Merge pull request #558 from ColdWindScholar/master

More Beautiful and bug fixes
This commit is contained in:
Bjoern Kerler 2024-06-13 23:59:29 +02:00 committed by GitHub
commit a6bb478da0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 653 additions and 612 deletions

View file

@ -1,19 +1,20 @@
import socket
from binascii import hexlify
class tcpclient():
class tcpclient:
def __init__(self, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ("localhost", port)
print("connecting to %s port %s" % server_address)
self.sock.connect(server_address)
def sendcommands(self,commands):
def sendcommands(self, commands):
try:
for command in commands:
self.sock.sendall(bytes(command, 'utf-8'))
data=""
while not "<ACK>" in data and not "<NAK>" in data:
data = ""
while "<ACK>" not in data and "<NAK>" not in data:
tmp = self.sock.recv(4096)
if tmp == b"":
continue
@ -25,5 +26,3 @@ class tcpclient():
finally:
print("closing socket")
self.sock.close()

View file

@ -1,25 +1,27 @@
#!/usr/bin/env python3
from edl.Library.tcpclient import tcpclient
class client():
class client:
def __init__(self):
self.commands=[]
self.commands = []
def send(self):
self.tcp = tcpclient(1340)
self.tcp.sendcommands(self.commands)
def read(self,src):
def read(self, src):
self.commands.append(f"peekqword:{hex(src)}")
def write(self,dest,value):
def write(self, dest, value):
self.commands.append(f"pokeqword:{hex(dest)},{hex(value)}")
def memcpy(self,dest,src,size):
def memcpy(self, dest, src, size):
self.commands.append(f"memcpy:{hex(dest)},{hex(src)},{hex(size)}")
def main():
exp=client()
exp = client()
exp.commands = [
"send:nop",
"r:boot,boot.img",
@ -29,5 +31,6 @@ def main():
]
exp.send()
if __name__=="__main__":
if __name__ == "__main__":
main()

34
edl
View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -127,34 +127,36 @@ Options:
--resetmode=mode Resetmode for reset (poweroff, reset, edl, etc.)
"""
import logging
import os
import re
import subprocess
import sys
import time
import logging
import subprocess
import re
from docopt import docopt
from edlclient.Config.usb_ids import default_ids
from edlclient.Library.utils import LogBase
from edlclient.Library.Connection.usblib import usb_class
from edlclient.Library.Connection.seriallib import serial_class
from edlclient.Library.sahara import sahara
from edlclient.Library.streaming_client import streaming_client
from edlclient.Library.Connection.usblib import usb_class
from edlclient.Library.firehose_client import firehose_client
from edlclient.Library.streaming import Streaming
from edlclient.Library.sahara import sahara
from edlclient.Library.sahara_defs import cmd_t, sahara_mode_t
from edlclient.Library.streaming import Streaming
from edlclient.Library.streaming_client import streaming_client
from edlclient.Library.utils import LogBase
from edlclient.Library.utils import is_windows
from binascii import hexlify
args = docopt(__doc__, version='3')
print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2023.")
print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2024.")
def parse_cmd(rargs):
cmds = ["server", "printgpt", "gpt", "r", "rl", "rf", "rs", "w", "wl", "wf", "ws", "e", "es", "ep", "footer",
"peek", "peekhex", "peekdword", "peekqword", "memtbl", "poke", "pokehex", "pokedword", "pokeqword",
"memcpy", "secureboot", "pbl", "qfp", "getstorageinfo", "setbootablestoragedrive", "getactiveslot", "setactiveslot",
"memcpy", "secureboot", "pbl", "qfp", "getstorageinfo", "setbootablestoragedrive", "getactiveslot",
"setactiveslot",
"send", "xml", "rawxml", "reset", "nop", "modules", "memorydump", "provision", "qfil"]
for cmd in cmds:
if rargs[cmd]:
@ -239,7 +241,6 @@ class main(metaclass=LogBase):
if re.findall(r'QCUSB', str(proper_driver)):
self.warning(f'Please first install libusb_win32 driver from Zadig')
mode = ""
loop = 0
vid = int(args["--vid"], 16)
pid = int(args["--pid"], 16)
@ -285,14 +286,13 @@ class main(metaclass=LogBase):
self.sahara.programmer = loader
self.info("Waiting for the device")
resp = None
self.cdc.timeout = 1500
conninfo = self.doconnect(loop)
mode = conninfo["mode"]
if not "data" in conninfo:
if conninfo.get("data"):
version = 2
else:
version = conninfo["data"].version
version = conninfo.get("data").version
if mode == "sahara":
cmd = conninfo["cmd"]
if cmd == cmd_t.SAHARA_HELLO_REQ:
@ -303,7 +303,7 @@ class main(metaclass=LogBase):
time.sleep(0.5)
print("Device is in memory dump mode, dumping memory")
if args["--partitions"]:
self.sahara.debug_mode(args["--partitions"].split(","),version=version)
self.sahara.debug_mode(args["--partitions"].split(","), version=version)
else:
self.sahara.debug_mode(version=version)
self.exit()

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically

View file

@ -1,20 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import serial
import serial.tools.list_ports
import inspect
import traceback
from binascii import hexlify
try:
from edlclient.Library.utils import *
except:
from Library.utils import *
class DeviceClass(metaclass=LogBase):
def __init__(self, loglevel=logging.INFO, portconfig=None, devclass=-1):
@ -119,7 +119,7 @@ class DeviceClass(metaclass=LogBase):
stack_trace = traceback.format_stack(frame)
td = []
for trace in stack_trace:
if not "verify_data" in trace and not "Port" in trace:
if "verify_data" not in trace and "Port" not in trace:
td.append(trace)
self.debug(td[:-1])

View file

@ -1,29 +1,29 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import os.path
import time
import sys
if not sys.platform.startswith('win32'):
import termios
def _reset_input_buffer():
return
def _reset_input_buffer_org(self):
if not sys.platform.startswith('win32'):
return termios.tcflush(self.fd, termios.TCIFLUSH)
import serial
import serial.tools.list_ports
import inspect
import traceback
from binascii import hexlify
try:
from edlclient.Library.utils import *
from edlclient.Library.Connection.devicehandler import DeviceClass
@ -38,13 +38,13 @@ class serial_class(DeviceClass):
super().__init__(loglevel, portconfig, devclass)
self.is_serial = True
def connect(self, EP_IN=-1, EP_OUT=-1, portname:str=""):
def connect(self, EP_IN=-1, EP_OUT=-1, portname: str = ""):
if self.connected:
self.close()
self.connected = False
if portname == "":
devices=self.detectdevices()
if len(devices)>0:
devices = self.detectdevices()
if len(devices) > 0:
portname = devices[0]
if portname != "":
self.device = serial.Serial(baudrate=115200, bytesize=serial.EIGHTBITS,
@ -88,13 +88,12 @@ class serial_class(DeviceClass):
self.debug("Break set")
def setcontrollinestate(self, RTS=None, DTR=None, isFTDI=False):
if RTS==1:
if RTS == 1:
self.device.setRTS(RTS)
if DTR==1:
if DTR == 1:
self.device.setDTR(DTR)
self.debug("Linecoding set")
def write(self, command, pktsize=None):
if pktsize is None:
pktsize = 512
@ -169,18 +168,18 @@ class serial_class(DeviceClass):
epr = self.device.read
extend = res.extend
if self.xmlread:
info=self.device.read(6)
bytestoread=resplen-len(info)
info = self.device.read(6)
bytestoread = resplen - len(info)
extend(info)
if b"<?xml " in info:
while not b"response " in res or res[-7:]!=b"</data>":
while b"response " not in res or res[-7:] != b"</data>":
extend(epr(1))
return res
bytestoread = resplen
while len(res) < bytestoread:
try:
val=epr(bytestoread)
if len(val)==0:
val = epr(bytestoread)
if len(val) == 0:
break
extend(val)
except Exception as e:
@ -218,5 +217,3 @@ class serial_class(DeviceClass):
self.device.flush()
res = self.usbread(resplen)
return res

View file

@ -1,30 +1,29 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import io
import logging
import usb.core # pyusb
import usb.util
import time
import inspect
import array
import usb.backend.libusb0
from enum import Enum
import inspect
import logging
from binascii import hexlify
from ctypes import c_void_p, c_int
from enum import Enum
import usb.backend.libusb0
import usb.core # pyusb
import usb.util
try:
from edlclient.Library.utils import *
except:
from Library.utils import *
if not is_windows():
import usb.backend.libusb1
from struct import pack, calcsize
import traceback
from struct import pack
try:
from edlclient.Library.Connection.devicehandler import DeviceClass
except:
@ -211,7 +210,7 @@ class usb_class(DeviceClass):
def flush(self):
return
def connect(self, EP_IN=-1, EP_OUT=-1, portname:str=""):
def connect(self, EP_IN=-1, EP_OUT=-1, portname: str = ""):
if self.connected:
self.close()
self.connected = False
@ -338,7 +337,7 @@ class usb_class(DeviceClass):
def write(self, command, pktsize=None):
if pktsize is None:
#pktsize = self.EP_OUT.wMaxPacketSize
# pktsize = self.EP_OUT.wMaxPacketSize
pktsize = MAX_USB_BULK_BUFFER_SIZE
if isinstance(command, str):
command = bytes(command, 'utf-8')
@ -389,7 +388,7 @@ class usb_class(DeviceClass):
extend = res.extend
while len(res) < resplen:
try:
resplen=epr(buffer,timeout)
resplen = epr(buffer, timeout)
extend(buffer[:resplen])
if resplen == self.EP_IN.wMaxPacketSize:
break

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -16,7 +16,7 @@ class generic(metaclass=LogBase):
self.serial = serial
self.args = args
self.__logger.setLevel(loglevel)
self.error=self.__logger.error
self.error = self.__logger.error
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
@ -27,7 +27,7 @@ class generic(metaclass=LogBase):
if res[0]:
lun = res[1]
rpartition = res[2]
if rpartition.sectors <= (0x8000//self.fh.cfg.SECTOR_SIZE_IN_BYTES):
if rpartition.sectors <= (0x8000 // self.fh.cfg.SECTOR_SIZE_IN_BYTES):
offsettopatch = 0x7FFF
sector, offset = self.fh.calc_offset(rpartition.sector, offsettopatch)
else:

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -36,8 +36,9 @@ except ImportError as e:
nothing = None
pass
class modules(metaclass=LogBase):
def __init__(self, fh, serial:int, supported_functions, loglevel, devicemodel:str, args):
def __init__(self, fh, serial: int, supported_functions, loglevel, devicemodel: str, args):
self.fh = fh
self.args = args
self.serial = serial
@ -132,7 +133,7 @@ class modules(metaclass=LogBase):
if paramdata.data == b"":
self.error("Error on reading param partition.")
return False
wdata = self.ops.enable_ops(paramdata.data, enable,self.devicemodel,self.serial)
wdata = self.ops.enable_ops(paramdata.data, enable, self.devicemodel, self.serial)
if wdata is not None:
self.ops.run()
if self.fh.cmd_program_buffer(lun, rpartition.sector, wdata, False):

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -17,7 +17,7 @@ class nothing(metaclass=LogBase):
loglevel=logging.INFO):
self.fh = fh
self.projid = projid
#self.projid == "22111":
# self.projid == "22111":
self.hashverify = "16386b4035411a770b12507b2e30297c0c5471230b213e6a1e1e701c6a425150"
self.serial = serial
self.supported_functions = supported_functions
@ -31,10 +31,11 @@ class nothing(metaclass=LogBase):
if token1 is None:
token1 = random.randbytes(32).hex()
authresp = token1 + self.projid + ("%x" % self.serial) + self.hashverify
token2 = hashlib.sha256(bytes(authresp,'utf-8')).hexdigest()[:64]
token2 = hashlib.sha256(bytes(authresp, 'utf-8')).hexdigest()[:64]
token3 = self.hashverify
return bytes(f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"{token1}\" token2=\"{token2}\" token3=\"{token3}\"/>\n</data>\n",'utf-8')
return bytes(
f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"{token1}\" token2=\"{token2}\" token3=\"{token3}\"/>\n</data>\n",
'utf-8')
def ntprojectverify(self):
"""
@ -57,9 +58,9 @@ class nothing(metaclass=LogBase):
if __name__ == "__main__":
nt = nothing(fh=None, projid="22111", serial=1729931115)
res=nt.generatetoken(token1="512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671")
org=b"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671\" token2=\"1ecd222465436eb8acc0cfc41e90d1e677165c184ea7d9631615014dac88c669\" token3=\"16386b4035411a770b12507b2e30297c0c5471230b213e6a1e1e701c6a425150\"/>\n</data>\n"
if res!=org:
res = nt.generatetoken(token1="512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671")
org = b"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>\n <ntprojectverify token1=\"512034500a07154561661e0f371f4a712a0b76074605724c640e301d632b3671\" token2=\"1ecd222465436eb8acc0cfc41e90d1e677165c184ea7d9631615014dac88c669\" token3=\"16386b4035411a770b12507b2e30297c0c5471230b213e6a1e1e701c6a425150\"/>\n</data>\n"
if res != org:
print("Error !")
print(res)
print(nt.generatetoken())

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -25,6 +25,7 @@ from struct import pack
import logging
from edlclient.Library.utils import LogBase
from edlclient.Library.Modules.oneplus_param import paramtools
try:
from edlclient.Library.cryptutils import cryptutils
except Exception as e:
@ -128,7 +129,8 @@ deviceconfig = {
class oneplus(metaclass=LogBase):
def __init__(self, fh, projid:str="18825", serial=123456, ATOBuild=0, Flash_Mode=0, cf=0, supported_functions=None,
def __init__(self, fh, projid: str = "18825", serial=123456, ATOBuild=0, Flash_Mode=0, cf=0,
supported_functions=None,
args=None, loglevel=logging.INFO):
self.fh = fh
self.__logger = self.__logger
@ -203,7 +205,7 @@ class oneplus(metaclass=LogBase):
else:
assert "Device is not supported"
exit(0)
assert "Unknown projid:"+str(projid)
assert "Unknown projid:" + str(projid)
return None
def run(self):
@ -446,7 +448,7 @@ class oneplus2(metaclass=LogBase):
fh = logging.FileHandler(logfilename)
self.__logger.addHandler(fh)
def crypt_token(self, data, pk, device_timestamp:int, decrypt=False):
def crypt_token(self, data, pk, device_timestamp: int, decrypt=False):
aes = cryptutils().aes()
aeskey = b"\x46\xA5\x97\x30\xBB\x0D\x41\xE8" + bytes(pk, 'utf-8') + \
pack("<Q", device_timestamp) # we get this using setprocstart
@ -573,11 +575,11 @@ def main():
op2 = oneplus(None, projid="20889", serial=serial, ATOBuild=0, Flash_Mode=0, cf=0)
op2.ops.device_timestamp = int(device_timestamp)
# 20889 OP N10 5G Europe
print(f"./edl.py rawxml \"<?xml version=\\\"1.0\\\" ?><data><setprocstart /></data>\"")
print('./edl.py rawxml "<?xml version=\\"1.0\\" ?><data><setprocstart /></data>"')
# Response should be : <?xml version="1.0" ?><data><response value=1 device_timestamp="%llu" /></data>
pk, token = op2.generatetoken(False)
print(
f"./edl.py rawxml \"<?xml version=\\\"1.0\\\" ?><data><setswprojmodel " +
'./edl.py rawxml "<?xml version=\\"1.0\\" ?><data><setswprojmodel ' +
f"token=\\\"{token}\\\" pk=\\\"{pk}\\\" /></data>\" --debugmode")
elif args["setprojmodel_verify"]:
projid = args["--projid"][0]
@ -616,8 +618,7 @@ def main():
def test_setswprojmodel_verify():
deviceresp = b"RX:<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\nRX:<data>\nRX:<response value=\"ACK\" " + \
b"device_timestamp=\"2507003650\" /></data>\n<?xmlversion=\"1.0\" ? ><data><setprocstart /></data>"
deviceresp = b'RX:<?xml version="1.0" encoding="UTF-8" ?>\nRX:<data>\nRX:<response value="ACK" device_timestamp="2507003650" /></data>\n<?xmlversion="1.0" ? ><data><setprocstart /></data>'
projid = "20889"
op = oneplus(None, projid=projid, serial=123456)
data = deviceresp.decode('utf-8')

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -109,7 +109,7 @@ class sid(Enum):
'''
class paramtools():
class paramtools:
paramitems = {
sid.PARAM_SID_PRODUCT.value[0]: {
0x18: ["8c", "project_name"],
@ -315,9 +315,9 @@ class paramtools():
def __init__(self, mode, serial):
self.aes_iv = unhexlify("562E17996D093D28DDB3BA695A2E6F58")
self.aes_key = unhexlify("3030304F6E65506C7573383138303030")
if mode==1:
derivedkey=bytes.fromhex("a9264fbf8a"+("%08x"%serial)+"6b4487ea")[:0x1A]
derivedkey=hashlib.sha256(derivedkey).digest()[:16]
if mode == 1:
derivedkey = bytes.fromhex("a9264fbf8a" + ("%08x" % serial) + "6b4487ea")[:0x1A]
derivedkey = hashlib.sha256(derivedkey).digest()[:16]
self.aes_key = derivedkey
def getparam(self, offset, sidindex):
@ -375,7 +375,7 @@ class paramtools():
def parse_encrypted(self, rdata, sid):
data = rdata[(sid * 0x400):(sid * 0x400) + 0x1000]
itemdata, hv, cv, updatecounter = self.decryptsid(data)
if itemdata != None:
if itemdata is not None:
itemdata = bytearray(itemdata)
print(
f"Offset {hex(sid * 0x400)}: hv {hex(hv)}, cv {hex(cv)}, increase_enc_update_counter {hex(updatecounter)}.")
@ -420,7 +420,7 @@ class paramtools():
itemlength = 0x400
itemdata = rdata[pos + 0x18:pos + 0x18 + itemlength]
i = 0
while (i < len(itemdata) - 0x22):
while i < len(itemdata) - 0x22:
sidindex = (pos // 0x400) & 0x1FF
offset = i + 0x18
# if sidindex==0x334 and offset==0x80:
@ -439,7 +439,7 @@ class paramtools():
length = self.parse_data(i, itemdata, offset, param, sidindex)
i += length
if length > 4:
if (length % 4):
if length % 4:
i += 4 - (length % 4)
def parse_data(self, i, itemdata, offset, param, sidindex, encrypted=False):
@ -467,10 +467,10 @@ class paramtools():
name = name + " "
if "PWD Hash" in name:
items = content.split(" ")
pwdhash = items[0][1:9]+"00000000000000000000000000000000000000000000000000000000"
pwdhash = items[0][1:9] + "00000000000000000000000000000000000000000000000000000000"
valid = "True" if items[1] != "-1" else "False"
flag = items[2]
date = items[3] + " "+ items[4][:-1]
date = items[3] + " " + items[4][:-1]
content = f"{date} ({valid},{flag}): {pwdhash}"
ff = f"SID_Index {hex(sidindex)}, Offset {offsetstr}: {name}: {content}"
if encrypted:
@ -936,7 +936,7 @@ def main():
filename = args["<filename>"]
mode = args["--mode"]
serial = args["--serial"]
param = paramtools(mode,serial)
param = paramtools(mode, serial)
with open(filename, 'rb') as rf:
data = rf.read()
param.parse_decrypted_fields(data)
@ -948,7 +948,7 @@ def main():
filename = args["<filename>"]
mode = args["--mode"]
serial = args["--serial"]
param = paramtools(mode,serial)
param = paramtools(mode, serial)
with open(filename, 'rb') as rf:
data = rf.read()
with open(filename + ".patched", 'wb') as wf:
@ -965,7 +965,7 @@ def main():
value = int(args["<value>"], 16)
mode = args["--mode"]
serial = args["--serial"]
param = paramtools(mode,serial)
param = paramtools(mode, serial)
with open(filename, 'rb') as rf:
data = rf.read()
with open(filename + ".patched", 'wb') as wf:
@ -974,7 +974,7 @@ def main():
imei = args["<imei>"]
mode = 0
serial = None
param = paramtools(mode,serial)
param = paramtools(mode, serial)
print("oneplus Factory qr code generator (c) B. Kerler 2019\nGPLv3 License\n----------------------")
print("Code : *#*#5646#*#* , *#808#, *#36446337# = com.android.engineeringmode.manualtest.DecryptActivity")
results = param.gencode([imei, "YOU_CAN_PASS_NOW"])

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -43,7 +43,7 @@ class xiaomi(metaclass=LogBase):
rsp = self.fh.xmlsend(self.xiaomi_authdata)
if rsp.resp:
if "value" in rsp.resp:
if rsp.resp["value"]=="ACK":
if rsp.resp["value"] == "ACK":
if 'authenticated' in rsp.log[0].lower() and 'true' in rsp.log[0].lower():
return True
return False

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -425,7 +425,7 @@ class cryptutils:
return q
def pss_verify(self, e, N, msghash, signature, emBits=1024, salt=None):
if salt == None:
if salt is None:
slen = self.digestLen
else:
slen = len(salt)
@ -469,20 +469,14 @@ class cryptutils:
if salt is not None:
inBlock = b"\x00" * 8 + msghash + salt
mhash = self.hash(inBlock)
if mhash == mhash:
return True
else:
return False
return mhash == mhash
else:
salt = TS[-self.digestLen:]
inBlock = b"\x00" * 8 + msghash + salt
mhash = self.hash(inBlock)
if mhash == mhash:
return True
else:
return False
return mhash == mhash
class hash():
class hash:
def __init__(self, hashtype="SHA256"):
if hashtype == "SHA1":
self.hash = self.sha1

View file

@ -1,33 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import io
import json
import os.path
import platform
import time
import json
from struct import unpack
from binascii import hexlify
from queue import Queue
from threading import Thread
from edlclient.Library.Modules.nothing import nothing
from edlclient.Library.utils import *
from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE, MAX_PRIORITY, PART_ATT_PRIORITY_BIT
from edlclient.Library.gpt import PART_ATT_PRIORITY_VAL, PART_ATT_ACTIVE_VAL, PART_ATT_MAX_RETRY_COUNT_VAL, PART_ATT_SUCCESSFUL_VAL, PART_ATT_UNBOOTABLE_VAL
from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE
from edlclient.Library.sparse import QCSparse
from edlclient.Library.utils import *
from edlclient.Library.utils import progress
from queue import Queue
from threading import Thread
rq = Queue()
def writedata(filename, rq):
pos = 0
with open(filename, "wb") as wf:
@ -146,11 +141,10 @@ def writefile(wf, q, stop):
break
class asyncwriter():
class asyncwriter:
def __init__(self, wf):
self.writequeue = Queue()
self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,))
self.worker.setDaemon(True)
self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,), daemon=True)
self.stopthreads = False
self.worker.start()
@ -216,9 +210,9 @@ class firehose(metaclass=LogBase):
def detect_partition(self, arguments, partitionname, send_full=False):
if arguments is None:
arguments = {
"--gpt-num-part-entries" : 0,
"--gpt-part-entry-size" : 0,
"--gpt-part-entry-start-lba" : 0
"--gpt-num-part-entries": 0,
"--gpt-part-entry-size": 0,
"--gpt-part-entry-start-lba": 0
}
fpartitions = {}
for lun in self.luns:
@ -231,7 +225,8 @@ class firehose(metaclass=LogBase):
break
else:
if partitionname in guid_gpt.partentries:
return [True, lun, data, guid_gpt] if send_full else [True, lun, guid_gpt.partentries[partitionname]]
return [True, lun, data, guid_gpt] if send_full else [True, lun,
guid_gpt.partentries[partitionname]]
for part in guid_gpt.partentries:
fpartitions[lunname].append(part)
return [False, fpartitions]
@ -333,7 +328,7 @@ class firehose(metaclass=LogBase):
def cmd_reset(self, mode="reset"):
if mode is None:
mode = "reset"
data = "<?xml version=\"1.0\" ?><data><power value=\"" + mode + "\"/></data>"
data = f'<?xml version="1.0" ?><data><power value="{mode}"/></data>'
val = self.xmlsend(data)
try:
v = None
@ -366,7 +361,7 @@ class firehose(metaclass=LogBase):
return val.error
def cmd_nop(self):
data = "<?xml version=\"1.0\" ?><data><nop /></data>"
data = '<?xml version="1.0" ?><data><nop /></data>'
resp = self.xmlsend(data, True)
self.debug(resp.data.hex())
info = b""
@ -682,7 +677,7 @@ class firehose(metaclass=LogBase):
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
if bytestoread!=0:
if bytestoread != 0:
self.error(f"Error:")
for line in info:
self.error(line)
@ -1044,8 +1039,8 @@ class firehose(metaclass=LogBase):
self.parse_storage()
for function in self.supported_functions:
if function == "checkntfeature":
if type(self.devicemodel)==list:
self.devicemodel=self.devicemodel[0]
if type(self.devicemodel) == list:
self.devicemodel = self.devicemodel[0]
self.nothing = nothing(fh=self, projid=self.devicemodel, serial=self.serial,
supported_functions=self.supported_functions,
loglevel=self.loglevel)
@ -1148,7 +1143,7 @@ class firehose(metaclass=LogBase):
try:
serial = line.split("0x")[1][:-1]
if ")" in serial:
serial=serial[:serial.rfind(")")]
serial = serial[:serial.rfind(")")]
self.serial = int(serial, 16)
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
@ -1182,8 +1177,8 @@ class firehose(metaclass=LogBase):
"serial": self.serial
}
if os.path.exists("edl_config.json"):
data = json.loads(open("edl_config.json","rb").read().decode('utf-8'))
if "serial" in data and data["serial"]!=state["serial"]:
data = json.loads(open("edl_config.json", "rb").read().decode('utf-8'))
if "serial" in data and data["serial"] != state["serial"]:
open("edl_config.json", "w").write(json.dumps(state))
else:
self.supported_functions = data["supported_functions"]
@ -1246,7 +1241,7 @@ class firehose(metaclass=LogBase):
if len(imei) != 16:
self.info("IMEI must be 16 digits")
return False
data = "<?xml version=\"1.0\" ?><data><writeIMEI len=\"16\"/></data>"
data = '<?xml version="1.0" ?><data><writeIMEI len="16"/></data>'
val = self.xmlsend(data)
if val.resp:
self.info("writeIMEI succeeded.")
@ -1256,7 +1251,7 @@ class firehose(metaclass=LogBase):
return False
def cmd_getstorageinfo(self):
data = "<?xml version=\"1.0\" ?><data><getstorageinfo physical_partition_number=\"0\"/></data>"
data = '<?xml version="1.0" ?><data><getstorageinfo physical_partition_number="0"/></data>'
val = self.xmlsend(data)
if val.data == '' and val.log == '' and val.resp:
return None
@ -1271,7 +1266,7 @@ class firehose(metaclass=LogBase):
if len(v) > 1:
res[v[0]] = v[1]
else:
if "\"storage_info\"" in value:
if '"storage_info"' in value:
try:
info = value.replace("INFO:", "")
si = json.loads(info)["storage_info"]
@ -1318,26 +1313,29 @@ class firehose(metaclass=LogBase):
if is_boot:
#new_flags |= (PART_ATT_PRIORITY_VAL | PART_ATT_ACTIVE_VAL | PART_ATT_MAX_RETRY_COUNT_VAL)
#new_flags &= (~PART_ATT_SUCCESSFUL_VAL & ~PART_ATT_UNBOOTABLE_VAL)
new_flags = 0x6f << (AB_FLAG_OFFSET*8)
new_flags = 0x6f << (AB_FLAG_OFFSET * 8)
else:
new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8)
new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8)
else:
if is_boot:
#new_flags &= (~PART_ATT_PRIORITY_VAL & ~PART_ATT_ACTIVE_VAL)
#new_flags |= ((MAX_PRIORITY-1) << PART_ATT_PRIORITY_BIT)
new_flags = 0x3a << (AB_FLAG_OFFSET*8)
new_flags = 0x3a << (AB_FLAG_OFFSET * 8)
else:
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8))
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8))
return new_flags
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status, slot_b_status, is_boot):
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status,
slot_b_status, is_boot):
part_entry_size = guid_gpt_a.header.part_entry_size
rf_a = BytesIO(gpt_data_a)
rf_b = BytesIO(gpt_data_b)
entryoffset_a = partition_a.entryoffset - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
entryoffset_a = partition_a.entryoffset - (
(guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - (
(guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
rf_a.seek(entryoffset_a)
rf_b.seek(entryoffset_b)
@ -1360,8 +1358,8 @@ class firehose(metaclass=LogBase):
unpack_fmt = "<I" if size_each_patch == 4 else "<Q"
write_size = len(patch_data)
for i in range(0, write_size, size_each_patch):
pdata_subset = int(unpack(unpack_fmt, patch_data[offset:offset+size_each_patch])[0])
self.cmd_patch( lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, False)
pdata_subset = int(unpack(unpack_fmt, patch_data[offset:offset + size_each_patch])[0])
self.cmd_patch(lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, False)
offset += size_each_patch
return True
@ -1384,11 +1382,11 @@ class firehose(metaclass=LogBase):
if gpt_data_a and gpt_data_b:
entryoffset_a = poffset_a - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
gpt_data_a[entryoffset_a : entryoffset_a + len(pdata_a)] = pdata_a
gpt_data_a[entryoffset_a: entryoffset_a + len(pdata_a)] = pdata_a
new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a)
entryoffset_b = poffset_b - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
gpt_data_b[entryoffset_b : entryoffset_b + len(pdata_b)] = pdata_b
gpt_data_b[entryoffset_b: entryoffset_b + len(pdata_b)] = pdata_b
new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b)
start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES
@ -1397,8 +1395,8 @@ class firehose(metaclass=LogBase):
if lun_a != lun_b:
start_sector_hdr_a = guid_gpt_a.header.current_lba
headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array
new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size]
headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array
new_hdr_a = new_gpt_data_a[headeroffset_a: headeroffset_a + guid_gpt_a.header.header_size]
cmd_patch_multiple(lun_a, start_sector_hdr_a, 0, new_hdr_a)
start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES
@ -1407,7 +1405,7 @@ class firehose(metaclass=LogBase):
start_sector_hdr_b = guid_gpt_b.header.current_lba
headeroffset_b = guid_gpt_b.sectorsize
new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size]
new_hdr_b = new_gpt_data_b[headeroffset_b: headeroffset_b + guid_gpt_b.header.header_size]
cmd_patch_multiple(lun_b, start_sector_hdr_b, 0, new_hdr_b)
return True
return False
@ -1416,16 +1414,17 @@ class firehose(metaclass=LogBase):
headeroffset = guid_gpt.sectorsize
prim_corrupted, backup_corrupted = False, False
prim_hdr = gpt_data[headeroffset : headeroffset + guid_gpt.header.header_size]
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset : headeroffset + guid_gpt.header.header_size]
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
prim_hdr = gpt_data[headeroffset: headeroffset + guid_gpt.header.header_size]
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset: headeroffset + guid_gpt.header.header_size]
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10: 0x10 + 4], test_hdr[0x10: 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58: 0x58 + 4], test_hdr[0x58: 0x58 + 4]
prim_corrupted = prim_hdr_crc != test_hdr_crc or prim_part_table_crc != test_part_table_crc
backup_hdr = backup_gpt_data[headeroffset : headeroffset + backup_guid_gpt.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[headeroffset : headeroffset + backup_guid_gpt.header.header_size]
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
backup_hdr = backup_gpt_data[headeroffset: headeroffset + backup_guid_gpt.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[
headeroffset: headeroffset + backup_guid_gpt.header.header_size]
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10: 0x10 + 4], test_hdr[0x10: 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58: 0x58 + 4], test_hdr[0x58: 0x58 + 4]
backup_corrupted = backup_hdr_crc != test_hdr_crc or backup_part_table_crc != test_part_table_crc
prim_backup_consistent = prim_part_table_crc == backup_part_table_crc
@ -1433,10 +1432,10 @@ class firehose(metaclass=LogBase):
if backup_corrupted:
self.error("both are gpt headers are corrupted, cannot recover")
return False, None, None
gpt_data[2*guid_gpt.sectorsize:] = backup_gpt_data[2*backup_guid_gpt.sectorsize:]
gpt_data[2 * guid_gpt.sectorsize:] = backup_gpt_data[2 * backup_guid_gpt.sectorsize:]
gpt_data = guid_gpt.fix_gpt_crc(gpt_data)
elif backup_corrupted or not prim_backup_consistent:
backup_gpt_data[2*backup_guid_gpt.sectorsize:] = gpt_data[2*guid_gpt.sectorsize:]
backup_gpt_data[2 * backup_guid_gpt.sectorsize:] = gpt_data[2 * guid_gpt.sectorsize:]
backup_gpt_data = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)
return True, gpt_data, backup_gpt_data
@ -1456,7 +1455,7 @@ class firehose(metaclass=LogBase):
fpartitions[lunname] = []
check_gpt_hdr = False
gpt_data_a, guid_gpt_a = self.get_gpt(lun_a, int(0), int(0), int(0))
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba)
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0, 0, guid_gpt_a.header.backup_lba)
if guid_gpt_a is None:
break
else:
@ -1464,7 +1463,8 @@ class firehose(metaclass=LogBase):
slot = partitionname_a.lower()[-2:]
partition_a = backup_guid_gpt_a.partentries[partitionname_a]
if slot == "_a":
active_a = ((partition_a.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active_a = ((partition_a.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if (active_a and slot_a_status) or (not active_a and slot_b_status):
return True
@ -1483,14 +1483,22 @@ class firehose(metaclass=LogBase):
self.error(f"Cannot find partition {partitionname_b}")
return False
_, lun_b, gpt_data_b, guid_gpt_b = resp
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0 , 0, guid_gpt_b.header.backup_lba)
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0, 0,
guid_gpt_b.header.backup_lba)
if not check_gpt_hdr and partitionname_a[:3] != "xbl": # xbl partition don't need check consistency
sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a, backup_guid_gpt_a, gpt_data_a, backup_gpt_data_a)
if not check_gpt_hdr and partitionname_a[
:3] != "xbl": # xbl partition don't need check consistency
sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a,
backup_guid_gpt_a,
gpt_data_a,
backup_gpt_data_a)
if not sts:
return False
if lun_a != lun_b:
sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b, backup_guid_gpt_b, gpt_data_b, backup_gpt_data_b)
sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b,
backup_guid_gpt_b,
gpt_data_b,
backup_gpt_data_b)
if not sts:
return False
check_gpt_hdr = True
@ -1513,12 +1521,10 @@ class firehose(metaclass=LogBase):
return False
return True
def cmd_test(self, cmd):
token = "1234"
pk = "1234"
data = "<?xml version=\"1.0\" ?>\n<data>\n<" + cmd + " token=\"" + token + "\" pk=\"" + pk + "\" />\n</data>"
data = f'<?xml version="1.0" ?>\n<data>\n<{cmd} token="{token}" pk="{pk}" />\n</data>'
val = self.xmlsend(data)
if val.resp:
if b"raw hex token" in val[2]:
@ -1528,7 +1534,7 @@ class firehose(metaclass=LogBase):
return False
def cmd_getstorageinfo_string(self):
data = "<?xml version=\"1.0\" ?><data><getstorageinfo /></data>"
data = '<?xml version="1.0" ?><data><getstorageinfo /></data>'
val = self.xmlsend(data)
if val.resp:
self.info(f"GetStorageInfo:\n--------------------\n")

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -351,7 +351,7 @@ class firehose_client(metaclass=LogBase):
start_sector=0, num_partition_sectors=1, display=False)
if self.get_storage_info():
totalsectors = (self.cfg.block_size *
self.cfg.total_blocks ) // self.cfg.SECTOR_SIZE_IN_BYTES
self.cfg.total_blocks) // self.cfg.SECTOR_SIZE_IN_BYTES
if len(luns) > 1:
sfilename = filename + f".lun{str(lun)}"
@ -649,7 +649,8 @@ class firehose_client(metaclass=LogBase):
prim_guid_gpt = res[3]
_, backup_guid_gpt = self.firehose.get_gpt(lun, 0, 0, 0, prim_guid_gpt.header.backup_lba)
partition = backup_guid_gpt.partentries["boot_a"]
active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active = ((partition.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if active:
self.printer("Current active slot: a")
return True
@ -659,7 +660,8 @@ class firehose_client(metaclass=LogBase):
prim_guid_gpt = res[3]
_, backup_guid_gpt = self.firehose.get_gpt(lun, 0, 0, 0, prim_guid_gpt.header.backup_lba)
partition = backup_guid_gpt.partentries["boot_b"]
active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active = ((partition.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if active:
self.printer("Current active slot: b")
return True
@ -755,14 +757,15 @@ class firehose_client(metaclass=LogBase):
filenames = []
if self.firehose.modules is not None:
self.firehose.modules.writeprepare()
for fname in filter(os.path.isfile, [ os.path.join(directory, i) for i in os.listdir(directory) ]):
for fname in filter(os.path.isfile, [os.path.join(directory, i) for i in os.listdir(directory)]):
filenames.append(fname)
for lun in luns:
data, guid_gpt = self.firehose.get_gpt(lun, int(options["--gpt-num-part-entries"]),
int(options["--gpt-part-entry-size"]),
int(options["--gpt-part-entry-start-lba"]))
if guid_gpt is None:
self.error("Error: Can not fetch GPT table from device, you may need to use `edl w gpt` to write a partition table first.`")
self.error(
"Error: Can not fetch GPT table from device, you may need to use `edl w gpt` to write a partition table first.`")
break
for filename in filenames:
partname = os.path.basename(filename)

View file

@ -1,22 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import os
import sys
import argparse
import colorama
import copy
import logging
import logging.config
from enum import Enum
from binascii import hexlify
from struct import calcsize, unpack, pack
from io import BytesIO
import os
import sys
from binascii import crc32
from binascii import hexlify
from enum import Enum
from struct import calcsize, unpack, pack
import colorama
class ColorFormatter(logging.Formatter):
LOG_COLORS = {
@ -193,7 +194,6 @@ AB_SLOT_INACTIVE_VAL = 0x0
AB_SLOT_ACTIVE = 1
AB_SLOT_INACTIVE = 0
PART_ATT_PRIORITY_BIT = 48
PART_ATT_ACTIVE_BIT = 50
PART_ATT_MAX_RETRY_CNT_BIT = 51
@ -204,7 +204,7 @@ PART_ATT_UNBOOTABLE_BIT = 55
PART_ATT_PRIORITY_VAL = 0x3 << PART_ATT_PRIORITY_BIT
PART_ATT_ACTIVE_VAL = 0x1 << PART_ATT_ACTIVE_BIT
PART_ATT_MAX_RETRY_COUNT_VAL = 0x7 << PART_ATT_MAX_RETRY_CNT_BIT
PART_ATT_SUCCESSFUL_VAL = 0x1 << PART_ATT_SUCCESS_BIT
PART_ATT_SUCCESSFUL_VAL = 0x1 << PART_ATT_SUCCESS_BIT
PART_ATT_UNBOOTABLE_VAL = 0x1 << PART_ATT_UNBOOTABLE_BIT
@ -349,7 +349,6 @@ class gpt(metaclass=LogBase):
def parseheader(self, gptdata, sectorsize=512):
return self.gpt_header(gptdata[sectorsize:sectorsize + 0x5C])
def parse(self, gptdata, sectorsize=512):
self.header = self.gpt_header(gptdata[sectorsize:sectorsize + 0x5C])
self.sectorsize = sectorsize
@ -361,7 +360,7 @@ class gpt(metaclass=LogBase):
if self.part_entry_start_lba != 0:
start = self.part_entry_start_lba
else:
start = 2 * sectorsize # mbr + header + part_table
start = 2 * sectorsize # mbr + header + part_table
entrysize = self.header.part_entry_size
self.partentries = {}
@ -403,7 +402,7 @@ class gpt(metaclass=LogBase):
pa.name = partentry.name.replace(b"\x00\x00", b"").decode('utf-16')
if pa.type == "EFI_UNUSED":
continue
self.partentries[pa.name]=pa
self.partentries[pa.name] = pa
self.totalsectors = self.header.first_usable_lba + self.header.last_usable_lba
return True
@ -414,7 +413,8 @@ class gpt(metaclass=LogBase):
mstr = "\nGPT Table:\n-------------\n"
for partitionname in self.partentries:
partition = self.partentries[partitionname]
active = ((partition.flags >> (AB_FLAG_OFFSET*8))&0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
active = ((partition.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
mstr += ("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:016x}, UUID {}, Type {}, Active {}\n".format(
partition.name + ":", partition.sector * self.sectorsize, partition.sectors * self.sectorsize,
partition.flags, partition.unique, partition.type, active))
@ -565,12 +565,12 @@ if __name__ == "__main__":
with open(args.image, "rb") as rf:
size = min(32 * 4096, filesize)
data = bytearray(rf.read(size))
pdata, poffset = gp.patch(data,partitition, active=active)
pdata, poffset = gp.patch(data, partitition, active=active)
data[poffset:poffset + len(pdata)] = pdata
wdata = gp.fix_gpt_crc(data)
if data is not None:
wfilename = args.image + ".patched"
with open(wfilename,"wb") as wf:
with open(wfilename, "wb") as wf:
wf.write(wdata)
print(f"Successfully wrote patched gpt to {wfilename}")
else:

View file

@ -1,15 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import logging
from binascii import hexlify
from struct import unpack
import time
from struct import unpack
MAX_PACKET_LEN = 4096
@ -154,8 +153,8 @@ class hdlc:
data = unescape(replybuf)
# print(hexlify(data))
if len(data) > 3:
if data[0]==0x7E:
data=data[1:]
if data[0] == 0x7E:
data = data[1:]
crc16val = crc16(0xFFFF, data[:-3])
reccrc = int(data[-3]) + (int(data[-2]) << 8)
if crc16val != reccrc:

View file

@ -1,17 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import time
import inspect
import logging
import os
import sys
import logging
import inspect
from struct import unpack, pack
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
@ -22,6 +20,7 @@ except:
from Library.utils import read_object, print_progress, rmrf, LogBase
from Config.qualcomm_config import sochw, msmids, root_cert_hash
class loader_utils(metaclass=LogBase):
def __init__(self, loglevel=logging.INFO):
self.__logger = self.__logger
@ -38,7 +37,7 @@ class loader_utils(metaclass=LogBase):
self.loaderdb = {}
def init_loader_db(self):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(parent_dir,"..","Loaders")):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(parent_dir, "..", "Loaders")):
for filename in filenames:
fn = os.path.join(dirpath, filename)
found = False
@ -52,13 +51,13 @@ class loader_utils(metaclass=LogBase):
hwid = filename.split("_")[0].lower()
msmid = hwid[:8]
try:
int(msmid,16)
int(msmid, 16)
except:
continue
devid = hwid[8:]
if devid == '':
continue
if len(filename.split("_"))<2:
if len(filename.split("_")) < 2:
continue
pkhash = filename.split("_")[1].lower()
for msmid in self.convertmsmid(msmid):
@ -88,4 +87,3 @@ class loader_utils(metaclass=LogBase):
rmsmid = '0' + rmsmid
msmiddb.append(rmsmid)
return msmiddb

View file

@ -1,16 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import os
import pt64
import pt
import argparse
import pt
import pt64
def pt64_walk(data, ttbr, tnsz, levels=3):
print("Dumping page tables (levels=%d)" % levels)
@ -50,7 +50,7 @@ def pt32_walk(data, ttbr, skip):
i += 1
if i <= skip:
continue
if type(fl) == pt.pt_desc:
if isinstance(fl, pt.pt_desc):
print("")
print("Second level (va = %08x)" % va)
print("---------------------------------------------")

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically

View file

@ -87,6 +87,7 @@ def get_fld(mfld, level):
return table_entry4k(mfld, level)
return None
class descriptor(object):
def get_name(self):
pass

View file

@ -1,25 +1,26 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import time
import inspect
import logging
import os
import sys
import logging
import inspect
import time
from struct import pack
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
from edlclient.Library.utils import read_object, print_progress, rmrf, LogBase
from edlclient.Config.qualcomm_config import sochw, msmids, root_cert_hash
from edlclient.Library.utils import print_progress, rmrf, LogBase
from edlclient.Config.qualcomm_config import msmids, root_cert_hash
from edlclient.Library.loader_db import loader_utils
from edlclient.Library.sahara_defs import ErrorDesc, cmd_t, exec_cmd_t, sahara_mode_t, status_t, \
CommandHandler, SAHARA_VERSION
CommandHandler
class sahara(metaclass=LogBase):
def __init__(self, cdc, loglevel):
@ -48,7 +49,7 @@ class sahara(metaclass=LogBase):
self.bit64 = False
self.pktsize = None
self.ch = CommandHandler()
self.loader_handler=loader_utils(loglevel=loglevel)
self.loader_handler = loader_utils(loglevel=loglevel)
self.loaderdb = self.loader_handler.init_loader_db()
self.__logger.setLevel(loglevel)
@ -69,12 +70,12 @@ class sahara(metaclass=LogBase):
if data == b'':
return {}
if b"<?xml" in data:
return {"firehose":"yes"}
return {"firehose": "yes"}
pkt = self.ch.pkt_cmd_hdr(data)
if pkt.cmd == cmd_t.SAHARA_HELLO_REQ:
return {"cmd":pkt.cmd,"data":self.ch.pkt_hello_req(data)}
return {"cmd": pkt.cmd, "data": self.ch.pkt_hello_req(data)}
elif pkt.cmd == cmd_t.SAHARA_DONE_RSP:
return {"cmd": pkt.cmd, "data":self.ch.pkt_done(data)}
return {"cmd": pkt.cmd, "data": self.ch.pkt_done(data)}
elif pkt.cmd == cmd_t.SAHARA_END_TRANSFER:
return {"cmd": pkt.cmd, "data": self.ch.pkt_image_end(data)}
elif pkt.cmd == cmd_t.SAHARA_64BIT_MEMORY_READ_DATA:
@ -92,7 +93,7 @@ class sahara(metaclass=LogBase):
elif pkt.cmd == cmd_t.SAHARA_EXECUTE_RSP:
return {"cmd": pkt.cmd, "data": self.ch.pkt_execute_rsp_cmd(data)}
elif pkt.cmd == cmd_t.SAHARA_CMD_READY or pkt.cmd == cmd_t.SAHARA_RESET_RSP:
return {"cmd": pkt.cmd, "data": None }
return {"cmd": pkt.cmd, "data": None}
return {}
except Exception as e: # pylint: disable=broad-except
self.error(str(e))
@ -112,7 +113,7 @@ class sahara(metaclass=LogBase):
def connect(self):
try:
v = self.cdc.read(length=0xC * 0x4,timeout=1)
v = self.cdc.read(length=0xC * 0x4, timeout=1)
if len(v) > 1:
if v[0] == 0x01:
pkt = self.ch.pkt_cmd_hdr(v)
@ -121,23 +122,23 @@ class sahara(metaclass=LogBase):
self.pktsize = rsp.cmd_packet_length
self.version = rsp.version
self.info(f"Protocol version: {rsp.version}, Version supported: {rsp.version_supported}")
return {"mode":"sahara", "cmd":cmd_t.SAHARA_HELLO_REQ, "data":rsp}
return {"mode": "sahara", "cmd": cmd_t.SAHARA_HELLO_REQ, "data": rsp}
elif pkt.cmd == cmd_t.SAHARA_END_TRANSFER:
rsp = self.ch.pkt_image_end(v)
return {"mode":"sahara", "cmd":cmd_t.SAHARA_END_TRANSFER, "data":rsp}
return {"mode": "sahara", "cmd": cmd_t.SAHARA_END_TRANSFER, "data": rsp}
elif b"<?xml" in v:
return {"mode":"firehose"}
return {"mode": "firehose"}
elif v[0] == 0x7E:
return {"mode":"nandprg"}
return {"mode": "nandprg"}
else:
data = b"<?xml version=\"1.0\" ?><data><nop /></data>"
self.cdc.write(data)
res = self.cdc.read(timeout=1)
if b"<?xml" in res:
return {"mode": "firehose"}
elif len(res)> 0:
elif len(res) > 0:
if res[0] == 0x7E:
return {"mode":"nandprg"}
return {"mode": "nandprg"}
elif res[0] == cmd_t.SAHARA_END_TRANSFER:
rsp = self.ch.pkt_image_end(res)
return {"mode": "sahara", "cmd": cmd_t.SAHARA_END_TRANSFER, "data": rsp}
@ -146,7 +147,7 @@ class sahara(metaclass=LogBase):
self.cdc.write(data)
res = self.cdc.read()
if len(res) > 0 and res[1] == 0x12:
return {"mode":"nandprg"}
return {"mode": "nandprg"}
elif len(res) == 0:
print("Device is in Sahara error state, please reboot the device.")
return {"mode": "error"}
@ -175,20 +176,20 @@ class sahara(metaclass=LogBase):
def cmdexec_get_serial_num(self):
res = self.cmd_exec(exec_cmd_t.SAHARA_EXEC_CMD_SERIAL_NUM_READ)
return int.from_bytes(res,'little')
return int.from_bytes(res, 'little')
def cmdexec_get_msm_hwid(self):
res = self.cmd_exec(exec_cmd_t.SAHARA_EXEC_CMD_MSM_HW_ID_READ)
if res is not None:
return int.from_bytes(res[:8],'little')
return int.from_bytes(res[:8], 'little')
return None
def cmdexec_get_pkhash(self):
try:
res = self.cmd_exec(exec_cmd_t.SAHARA_EXEC_CMD_OEM_PK_HASH_READ)
idx=res[4:].find(res[:4])
if idx!=-1:
res=res[:4+idx]
idx = res[4:].find(res[:4])
if idx != -1:
res = res[:4 + idx]
return res.hex()
except Exception as e: # pylint: disable=broad-except
self.error(str(e))
@ -196,7 +197,7 @@ class sahara(metaclass=LogBase):
def cmdexec_get_sbl_version(self):
res = self.cmd_exec(exec_cmd_t.SAHARA_EXEC_CMD_GET_SOFTWARE_VERSION_SBL)
return int.from_bytes(res,'little')
return int.from_bytes(res, 'little')
def cmdexec_switch_to_dmss_dload(self):
res = self.cmd_exec(exec_cmd_t.SAHARA_EXEC_CMD_SWITCH_TO_DMSS_DLOAD)
@ -309,7 +310,7 @@ class sahara(metaclass=LogBase):
else:
self.info(f"\nVersion {hex(version)}\n------------------------\n" +
f"Serial: 0x{self.serials}\n")
if self.programmer=="":
if self.programmer == "":
self.error("No autodetection of loader possible with sahara version 3 and above :( Aborting.")
return False
self.cmd_modeswitch(sahara_mode_t.SAHARA_MODE_COMMAND)
@ -347,7 +348,6 @@ class sahara(metaclass=LogBase):
self.cdc.write(pack("<II", cmd_t.SAHARA_RESET_STATE_MACHINE_ID, 0x8))
return True
def cmd_reset(self):
self.cdc.write(pack("<II", cmd_t.SAHARA_RESET_REQ, 0x8))
try:
@ -360,7 +360,7 @@ class sahara(metaclass=LogBase):
return True
elif res["cmd"] == cmd_t.SAHARA_END_TRANSFER:
if "data" in res:
pkt=res["data"]
pkt = res["data"]
self.error(self.get_error_desc(pkt.image_tx_status))
return False
@ -536,7 +536,7 @@ class sahara(metaclass=LogBase):
else:
self.error("Timeout while uploading loader. Wrong loader ?")
return ""
elif cmd in [cmd_t.SAHARA_64BIT_MEMORY_READ_DATA,cmd_t.SAHARA_READ_DATA]:
elif cmd in [cmd_t.SAHARA_64BIT_MEMORY_READ_DATA, cmd_t.SAHARA_READ_DATA]:
if cmd == cmd_t.SAHARA_64BIT_MEMORY_READ_DATA:
self.bit64 = True
if loop == 0:
@ -562,7 +562,7 @@ class sahara(metaclass=LogBase):
else:
self.error(f"Unknown sahara id: {self.id}")
return "error"
loop+=1
loop += 1
data_offset = pkt.data_offset
data_len = pkt.data_len
if data_offset + data_len > len(programmer):

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -12,9 +12,11 @@ from io import BytesIO
SAHARA_VERSION = 2
SAHARA_MIN_VERSION = 1
class DataError(Exception):
pass
class cmd_t:
SAHARA_HELLO_REQ = 0x1
SAHARA_HELLO_RSP = 0x2
@ -36,6 +38,7 @@ class cmd_t:
SAHARA_64BIT_MEMORY_READ_DATA = 0x12
SAHARA_RESET_STATE_MACHINE_ID = 0x13
class cmd_t_version:
SAHARA_HELLO_REQ = 0x1
SAHARA_HELLO_RSP = 1
@ -57,6 +60,7 @@ class cmd_t_version:
SAHARA_64BIT_MEMORY_READ_DATA = 2
SAHARA_RESET_STATE_MACHINE_ID = 2
class exec_cmd_t:
SAHARA_EXEC_CMD_NOP = 0x00
SAHARA_EXEC_CMD_SERIAL_NUM_READ = 0x01
@ -69,6 +73,7 @@ class exec_cmd_t:
SAHARA_EXEC_CMD_GET_COMMAND_ID_LIST = 0x08
SAHARA_EXEC_CMD_GET_TRAINING_DATA = 0x09
class sahara_mode_t:
SAHARA_MODE_IMAGE_TX_PENDING = 0x0
SAHARA_MODE_IMAGE_TX_COMPLETE = 0x1
@ -169,7 +174,7 @@ ErrorDesc = {
class CommandHandler:
def pkt_hello_req(self, data):
if len(data)<0xC * 0x4:
if len(data) < 0xC * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -190,7 +195,7 @@ class CommandHandler:
return req
def pkt_cmd_hdr(self, data):
if len(data)<2*4:
if len(data) < 2 * 4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -201,7 +206,7 @@ class CommandHandler:
return req
def pkt_read_data(self, data):
if len(data)<0x5 * 0x4:
if len(data) < 0x5 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -215,7 +220,7 @@ class CommandHandler:
return req
def pkt_read_data_64(self, data):
if len(data)<0x8 + 0x3 * 0x8:
if len(data) < 0x8 + 0x3 * 0x8:
raise DataError
st = structhelper_io(BytesIO(data))
@ -229,7 +234,7 @@ class CommandHandler:
return req
def pkt_memory_debug(self, data):
if len(data)<0x8 + 0x2 * 0x4:
if len(data) < 0x8 + 0x2 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -242,7 +247,7 @@ class CommandHandler:
return req
def pkt_memory_debug_64(self, data):
if len(data)<0x8 + 0x2 * 0x8:
if len(data) < 0x8 + 0x2 * 0x8:
raise DataError
st = structhelper_io(BytesIO(data))
@ -255,7 +260,7 @@ class CommandHandler:
return req
def pkt_execute_rsp_cmd(self, data):
if len(data)<0x4 * 0x4:
if len(data) < 0x4 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -268,7 +273,7 @@ class CommandHandler:
return req
def pkt_image_end(self, data):
if len(data)<0x4 * 0x4:
if len(data) < 0x4 * 0x4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -281,7 +286,7 @@ class CommandHandler:
return req
def pkt_done(self, data):
if len(data)<0x3 * 4:
if len(data) < 0x3 * 4:
raise DataError
st = structhelper_io(BytesIO(data))
@ -293,7 +298,7 @@ class CommandHandler:
return req
def pkt_info(self, data):
if len(data)<0x3 * 4 + 0x20:
if len(data) < 0x3 * 4 + 0x20:
raise DataError
st = structhelper_io(BytesIO(data))
@ -306,7 +311,7 @@ class CommandHandler:
return req
def parttbl(self, data):
if len(data)<(0x3 * 4) + 20 + 20:
if len(data) < (0x3 * 4) + 20 + 20:
raise DataError
st = structhelper_io(BytesIO(data))
@ -320,7 +325,7 @@ class CommandHandler:
return req
def parttbl_64bit(self, data):
if len(data)<(0x3 * 8) + 20 + 20:
if len(data) < (0x3 * 8) + 20 + 20:
raise DataError
st = structhelper_io(BytesIO(data))

View file

@ -1,24 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import inspect
import logging
import sys
import os
import sys
import inspect
from queue import Queue
from struct import unpack
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
from edlclient.Library.utils import LogBase, print_progress
MAX_STORE_SIZE = 1024 * 1024 * 1024 * 2 # 2 GBs
MAX_STORE_SIZE = 1024 * 1024 * 1024 * 2 # 2 GBs
class QCSparse(metaclass=LogBase):
@ -183,13 +182,13 @@ class QCSparse(metaclass=LogBase):
if length is None:
return self.unsparse()
if (self.tmp_offset + length) <= len(self.tmpdata):
tdata = self.tmpdata[self.tmp_offset : self.tmp_offset + length]
tdata = self.tmpdata[self.tmp_offset: self.tmp_offset + length]
self.tmp_offset += length
return tdata
while (self.tmp_offset + length) > len(self.tmpdata):
self.tmpdata.extend(self.unsparse())
if (self.tmp_offset + length) <= len(self.tmpdata):
tdata = self.tmpdata[self.tmp_offset : self.tmp_offset + length]
tdata = self.tmpdata[self.tmp_offset: self.tmp_offset + length]
self.tmp_offset += length
return tdata

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -168,7 +168,7 @@ class Streaming(metaclass=LogBase):
def send_section_header(self, name):
# 0x1b open muliimage, 0xe for user-defined partition
resp = self.send(b"\x1b\x0e" + bytes("0:"+name, 'utf-8') + b"\x00")
resp = self.send(b"\x1b\x0e" + bytes("0:" + name, 'utf-8') + b"\x00")
if resp[0] == 0x1c:
return True
self.error("Error on sending section header")
@ -186,7 +186,7 @@ class Streaming(metaclass=LogBase):
return True
return False
def write_flash(self, lba:int=0, partname="", filename="", info=True):
def write_flash(self, lba: int = 0, partname="", filename="", info=True):
wbsize = 1024
filesize = os.stat(filename).st_size
total = filesize
@ -197,8 +197,8 @@ class Streaming(metaclass=LogBase):
adr = lba
while filesize > 0:
subdata = rf.read(wbsize)
if len(subdata)<1024:
subdata += (1024-len(subdata))*b'\xFF'
if len(subdata) < 1024:
subdata += (1024 - len(subdata)) * b'\xFF'
scmd = b"\x07" + pack("<I", adr) + subdata
resp = self.send(scmd)
if len(resp) == 0 or resp[0] != 0x8:
@ -664,7 +664,7 @@ class Streaming(metaclass=LogBase):
for i in range(partcount):
if magic1 == 0xAA7D1B9a and magic2 == 0x1F7D48BC:
name, length, spare, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB",
data[i * 0x1C:(i * 0x1C) + 0x1C])
data[i * 0x1C:(i * 0x1C) + 0x1C])
else:
name, offset, length, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB",
data[i * 0x1C:(i * 0x1C) + 0x1C])
@ -672,10 +672,10 @@ class Streaming(metaclass=LogBase):
if name[1] != 0x3A:
break
partitions[name[2:].rstrip(b"\x00").decode('utf-8')] = dict(offset=offset,
length=(length+spare) & 0xFFFF,
attr1=attr1, attr2=attr2,
attr3=attr3,
which_flash=which_flash)
length=(length + spare) & 0xFFFF,
attr1=attr1, attr2=attr2,
attr3=attr3,
which_flash=which_flash)
if magic1 == 0xAA7D1B9a and magic2 == 0x1F7D48BC:
offset += length + spare
return partitions
@ -861,9 +861,9 @@ class Streaming(metaclass=LogBase):
val = resp[1].flashId.decode('utf-8') if resp[1].flashId[0] != 0x65 else ""
self.info("Flash memory: %s %s, %s (vendor: 0x%02X image_id: 0x%02X)" % (self.settings.flash_mfr, val,
self.settings.flash_descr,
self.settings.flash_pid,
self.settings.flash_fid))
self.settings.flash_descr,
self.settings.flash_pid,
self.settings.flash_fid))
# self.info("Maximum packet size: %i byte",*((unsigned int*)&rbuf[0x24]))
self.info(
"Page size: %d bytes (%d sectors)" % (self.settings.PAGESIZE, self.settings.sectors_per_page))
@ -923,7 +923,7 @@ class Streaming(metaclass=LogBase):
totallength = length * self.settings.num_pages_per_blk * self.settings.PAGESIZE
progbar.show_progress(prefix="Read", pos=pos, total=totallength, display=info)
for curblock in range(block,block+length):
for curblock in range(block, block + length):
for curpage in range(self.settings.num_pages_per_blk):
data, spare = self.flash_read(curblock, curpage, self.settings.sectors_per_page, cwsize)
pos = (curblock * self.settings.num_pages_per_blk + curpage) * self.settings.PAGESIZE
@ -1079,7 +1079,7 @@ def test_nand_config():
errorids = []
for test in testconfig:
nandid, buswidth, density, pagesize, blocksize, oobsize, bchecc, cfg0, \
cfg1, eccbufcfg, bccbchcfg, badblockbyte = test
cfg1, eccbufcfg, bccbchcfg, badblockbyte = test
res_cfg0, res_cfg1, res_ecc_buf_cfg, res_ecc_bch_cfg = qs.nanddevice.nand_setup(nandid)
if cfg0 != res_cfg0 or cfg1 != res_cfg1 or eccbufcfg != res_ecc_buf_cfg or res_ecc_bch_cfg != bccbchcfg:
errorids.append([nandid, res_cfg0, res_cfg1, res_ecc_buf_cfg, res_ecc_bch_cfg])

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -54,7 +54,7 @@ class streaming_client(metaclass=LogBase):
self.printer("-------------------------------------------------------------")
for name in partitions:
partition = partitions[name]
if not isinstance(partition,dict):
if not isinstance(partition, dict):
continue
for i in range(0x10 - len(name)):
name += " "
@ -136,7 +136,7 @@ class streaming_client(metaclass=LogBase):
elif cmd == "rf":
sector = 0
sectors = self.streaming.settings.MAXBLOCK * self.streaming.settings.num_pages_per_blk * \
self.streaming.settings.sectors_per_page
self.streaming.settings.sectors_per_page
filename = options["<filename>"]
self.printer(f"Dumping Flash from sector 0 to sector {hex(sectors)}...")
if self.streaming.read_sectors(sector, sectors, filename, True):
@ -323,7 +323,7 @@ class streaming_client(metaclass=LogBase):
self.error(f"Error: Couldn't find partition file: {partitionfilename}")
return False
else:
ptable = open(partitionfilename,"rb").read()
ptable = open(partitionfilename, "rb").read()
else:
self.error("Partition file is needed for writing (--partitionfilename)")
sys.exit(1)

View file

@ -1,68 +1,69 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
class open_mode_type:
OPEN_MODE_NONE = 0x00 # Not opened yet
OPEN_BOOTLOADER = 0x01 # Bootloader Image
OPEN_BOOTABLE = 0x02 # Bootable Image
OPEN_CEFS = 0x03 # CEFS Image
OPEN_MODE_FACTORY = 0x04 # Factory Image
OPEN_MODE_NONE = 0x00 # Not opened yet
OPEN_BOOTLOADER = 0x01 # Bootloader Image
OPEN_BOOTABLE = 0x02 # Bootable Image
OPEN_CEFS = 0x03 # CEFS Image
OPEN_MODE_FACTORY = 0x04 # Factory Image
class open_multi_mode_type:
OPEN_MULTI_MODE_NONE = 0x00 # Not opened yet
OPEN_MULTI_MODE_PBL = 0x01 # Primary Boot Loader
OPEN_MULTI_MODE_QCSBLHDCFG = 0x02 # QC 2ndary Boot Loader Header and Config Data
OPEN_MULTI_MODE_QCSBL = 0x03 # QC 2ndary Boot Loader
OPEN_MULTI_MODE_OEMSBL = 0x04 # OEM 2ndary Boot Loader
OPEN_MULTI_MODE_AMSS = 0x05 # AMSS modem executable
OPEN_MULTI_MODE_APPS = 0x06 # APPS executable
OPEN_MULTI_MODE_OBL = 0x07 # OTP Boot Loader
OPEN_MULTI_MODE_FOTAUI = 0x08 # FOTA UI binarh
OPEN_MULTI_MODE_CEFS = 0x09 # Modem CEFS image
OPEN_MULTI_MODE_APPSBL = 0x0A # APPS Boot Loader
OPEN_MULTI_MODE_APPS_CEFS = 0x0B # APPS CEFS image
OPEN_MULTI_MODE_FLASH_BIN = 0x0C # Flash.bin image for Windows mobile
OPEN_MULTI_MODE_DSP1 = 0x0D # DSP1 runtime image
OPEN_MULTI_MODE_CUSTOM = 0x0E # Image for user defined partition
OPEN_MULTI_MODE_DBL = 0x0F # DBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_OSBL = 0x10 # OSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_FSBL = 0x11 # FSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_DSP2 = 0x12 # DSP2 executable
OPEN_MULTI_MODE_RAW = 0x13 # APPS EFS2 RAW image
OPEN_MULTI_MODE_EMMC_USER = 0x21 # EMMC USER partition
OPEN_MULTI_MODE_EMMC_BOOT0 = 0x22 # EMMC BOOT partition 0
OPEN_MULTI_MODE_EMMC_BOOT1 = 0x23 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_RPMB = 0x24 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_GPP1 = 0x25 # EMMC GPP partition 1
OPEN_MULTI_MODE_EMMC_GPP2 = 0x26 # EMMC GPP partition 2
OPEN_MULTI_MODE_EMMC_GPP3 = 0x27 # EMMC GPP partition 3
OPEN_MULTI_MODE_EMMC_GPP4 = 0x28 # EMMC GPP partition 4
OPEN_MULTI_MODE_NONE = 0x00 # Not opened yet
OPEN_MULTI_MODE_PBL = 0x01 # Primary Boot Loader
OPEN_MULTI_MODE_QCSBLHDCFG = 0x02 # QC 2ndary Boot Loader Header and Config Data
OPEN_MULTI_MODE_QCSBL = 0x03 # QC 2ndary Boot Loader
OPEN_MULTI_MODE_OEMSBL = 0x04 # OEM 2ndary Boot Loader
OPEN_MULTI_MODE_AMSS = 0x05 # AMSS modem executable
OPEN_MULTI_MODE_APPS = 0x06 # APPS executable
OPEN_MULTI_MODE_OBL = 0x07 # OTP Boot Loader
OPEN_MULTI_MODE_FOTAUI = 0x08 # FOTA UI binarh
OPEN_MULTI_MODE_CEFS = 0x09 # Modem CEFS image
OPEN_MULTI_MODE_APPSBL = 0x0A # APPS Boot Loader
OPEN_MULTI_MODE_APPS_CEFS = 0x0B # APPS CEFS image
OPEN_MULTI_MODE_FLASH_BIN = 0x0C # Flash.bin image for Windows mobile
OPEN_MULTI_MODE_DSP1 = 0x0D # DSP1 runtime image
OPEN_MULTI_MODE_CUSTOM = 0x0E # Image for user defined partition
OPEN_MULTI_MODE_DBL = 0x0F # DBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_OSBL = 0x10 # OSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_FSBL = 0x11 # FSBL Image for SB Architecture 2.0
OPEN_MULTI_MODE_DSP2 = 0x12 # DSP2 executable
OPEN_MULTI_MODE_RAW = 0x13 # APPS EFS2 RAW image
OPEN_MULTI_MODE_EMMC_USER = 0x21 # EMMC USER partition
OPEN_MULTI_MODE_EMMC_BOOT0 = 0x22 # EMMC BOOT partition 0
OPEN_MULTI_MODE_EMMC_BOOT1 = 0x23 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_RPMB = 0x24 # EMMC BOOT partition 1
OPEN_MULTI_MODE_EMMC_GPP1 = 0x25 # EMMC GPP partition 1
OPEN_MULTI_MODE_EMMC_GPP2 = 0x26 # EMMC GPP partition 2
OPEN_MULTI_MODE_EMMC_GPP3 = 0x27 # EMMC GPP partition 3
OPEN_MULTI_MODE_EMMC_GPP4 = 0x28 # EMMC GPP partition 4
class response_code_type:
ACK = 0x00 # Successful
RESERVED_1 = 0x01 # Reserved
NAK_INVALID_DEST = 0x02 # Failure: destination address is invalid.
NAK_INVALID_LEN = 0x03 # Failure: operation length is invalid.
NAK_EARLY_END = 0x04 # Failure: packet was too short for this cmd.
NAK_INVALID_CMD = 0x05 # Failure: invalid command
RESERVED_6 = 0x06 # Reserved
NAK_FAILED = 0x07 # Failure: operation did not succeed.
NAK_WRONG_IID = 0x08 # Failure: intelligent ID code was wrong.
NAK_BAD_VPP = 0x09 # Failure: programming voltage out of spec
NAK_VERIFY_FAILED = 0x0A # Failure: readback verify did not match
RESERVED_0xB = 0x0B # Reserved
NAK_INVALID_SEC_CODE = 0x0C # Failure: Incorrect security code
NAK_CANT_POWER_DOWN = 0x0D # Failure: Cannot power down phone
NAK_NAND_NOT_SUPP = 0x0E # Failure: Download to NAND not supported
NAK_CMD_OUT_SEQ = 0x0F # Failure: Command out of sequence
NAK_CLOSE_FAILED = 0x10 # Failure: Close command failed
NAK_BAD_FEATURE_BITS = 0x11 # Failure: Incompatible Feature Bits
NAK_NO_SPACE = 0x12 # Failure: Out of space
NAK_INVALID_SEC_MODE = 0x13 # Failure: Multi-Image invalid security mode
NAK_MIBOOT_NOT_SUPP = 0x14 # Failure: Multi-Image boot not supported
NAK_PWROFF_NOT_SUPP = 0x15 # Failure: Power off not supported
ACK = 0x00 # Successful
RESERVED_1 = 0x01 # Reserved
NAK_INVALID_DEST = 0x02 # Failure: destination address is invalid.
NAK_INVALID_LEN = 0x03 # Failure: operation length is invalid.
NAK_EARLY_END = 0x04 # Failure: packet was too short for this cmd.
NAK_INVALID_CMD = 0x05 # Failure: invalid command
RESERVED_6 = 0x06 # Reserved
NAK_FAILED = 0x07 # Failure: operation did not succeed.
NAK_WRONG_IID = 0x08 # Failure: intelligent ID code was wrong.
NAK_BAD_VPP = 0x09 # Failure: programming voltage out of spec
NAK_VERIFY_FAILED = 0x0A # Failure: readback verify did not match
RESERVED_0xB = 0x0B # Reserved
NAK_INVALID_SEC_CODE = 0x0C # Failure: Incorrect security code
NAK_CANT_POWER_DOWN = 0x0D # Failure: Cannot power down phone
NAK_NAND_NOT_SUPP = 0x0E # Failure: Download to NAND not supported
NAK_CMD_OUT_SEQ = 0x0F # Failure: Command out of sequence
NAK_CLOSE_FAILED = 0x10 # Failure: Close command failed
NAK_BAD_FEATURE_BITS = 0x11 # Failure: Incompatible Feature Bits
NAK_NO_SPACE = 0x12 # Failure: Out of space
NAK_INVALID_SEC_MODE = 0x13 # Failure: Multi-Image invalid security mode
NAK_MIBOOT_NOT_SUPP = 0x14 # Failure: Multi-Image boot not supported
NAK_PWROFF_NOT_SUPP = 0x15 # Failure: Power off not supported

View file

@ -1,24 +1,25 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import sys
import codecs
import copy
import datetime as dt
import logging
import logging.config
import codecs
import struct
import os
import shutil
import stat
import colorama
import copy
import datetime as dt
import struct
import sys
import time
from io import BytesIO
from struct import unpack, pack
from struct import unpack
import colorama
try:
from capstone import *
@ -29,11 +30,13 @@ try:
except ImportError:
print("Keystone library is missing (optional).")
def is_windows():
if sys.platform == 'win32' or sys.platform == 'win64' or sys.platform == 'winnt':
return True
return False
class structhelper_io:
pos = 0
@ -83,6 +86,7 @@ class structhelper_io:
def seek(self, pos):
self.data.seek(pos)
def find_binary(data, strf, pos=0):
t = strf.split(b".")
pre = 0
@ -113,6 +117,7 @@ def find_binary(data, strf, pos=0):
pre += 1
return None
class progress:
def __init__(self, pagesize):
self.progtime = 0
@ -124,7 +129,7 @@ class progress:
def calcProcessTime(self, starttime, cur_iter, max_iter):
telapsed = time.time() - starttime
if telapsed > 0 and cur_iter > 0:
testimated = (telapsed / cur_iter) * (max_iter)
testimated = (telapsed / cur_iter) * max_iter
finishtime = starttime + testimated
finishtime = dt.datetime.fromtimestamp(finishtime).strftime("%H:%M:%S") # in time
lefttime = testimated - telapsed # in seconds
@ -151,7 +156,7 @@ class progress:
total // self.pagesize,
0), bar_length=10)
if prog > self.prog or prog==100.0:
if prog > self.prog or prog == 100.0:
if display:
t0 = time.time()
tdiff = t0 - self.progtime
@ -188,6 +193,7 @@ class progress:
self.progpos = pos
self.progtime = t0
class structhelper:
pos = 0
@ -248,6 +254,7 @@ class structhelper:
def seek(self, pos):
self.pos = pos
def do_tcp_server(client, arguments, handler):
def tcpprint(arg):
if isinstance(arg, bytes) or isinstance(arg, bytearray):
@ -575,7 +582,7 @@ class patchtools:
badchars = self.has_bad_uart_chars(data)
if not badchars:
badchars = self.has_bad_uart_chars(data2)
if not (badchars):
if not badchars:
return div
div += 4
@ -685,7 +692,7 @@ class patchtools:
continue
rt += 1
prep = data[rt:].find(t[i])
if (prep != 0):
if prep != 0:
error = 1
break
rt += len(t[i])
@ -699,7 +706,7 @@ class patchtools:
return None
def read_object(data: object, definition: object) -> object:
def read_object(data: object, definition: object) -> dict:
"""
Unpacks a structure using the given data and definition.
"""

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -17,7 +17,7 @@ class xmlparser:
continue
line = b"<?xml" + line
if b"\xf0\xe9\x88\x14" in line:
line=line.replace(b"\xf0\xe9\x88\x14",b"")
line = line.replace(b"\xf0\xe9\x88\x14", b"")
parser = ET.XMLParser(encoding="utf-8")
try:
tree = ET.fromstring(line, parser=parser)
@ -37,7 +37,7 @@ class xmlparser:
continue
line = b"<?xml" + line
if b"\xf0\xe9\x88\x14" in line:
line=line.replace(b"\xf0\xe9\x88\x14",b"")
line = line.replace(b"\xf0\xe9\x88\x14", b"")
parser = ET.XMLParser(encoding="utf-8")
try:
tree = ET.fromstring(line, parser=parser)

View file

@ -1,49 +1,51 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
# Beagle to EDL Loader
import os,sys
import sys
from struct import unpack
def main():
if len(sys.argv)<2:
print("Usage: ./beagle_to_loader.py [beagle_log.bin] [loader.elf]")
sys.exit(0)
with open(sys.argv[1],"rb") as rf:
data=rf.read()
outdata=bytearray()
i=0
seq=b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx=data.find(seq)
if idx==-1:
if i==0:
seq=b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i+=1
continue
else:
break
else:
cmd=unpack("<I", data[idx:idx+4])[0]
if cmd==0x03:
cmd,tlen,slen,offset,length=unpack("<IIIII",data[idx:idx+0x14])
elif cmd==0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x14:]
print("Offset : %08X Length: %08X" %(offset,length))
while len(outdata)<offset+length:
outdata.append(0xFF)
outdata[offset:offset+length]=data[:length]
i+=1
wf.write(outdata)
print("Done.")
if __name__=="__main__":
main()
def main():
if len(sys.argv) < 2:
print("Usage: ./beagle_to_loader.py [beagle_log.bin] [loader.elf]")
sys.exit(0)
with open(sys.argv[1], "rb") as rf:
data = rf.read()
outdata = bytearray()
i = 0
seq = b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx = data.find(seq)
if idx == -1:
if i == 0:
seq = b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i += 1
continue
else:
break
else:
cmd = unpack("<I", data[idx:idx + 4])[0]
if cmd == 0x03:
cmd, tlen, slen, offset, length = unpack("<IIIII", data[idx:idx + 0x14])
elif cmd == 0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x14:]
print("Offset : %08X Length: %08X" % (offset, length))
while len(outdata) < offset + length:
outdata.append(0xFF)
outdata[offset:offset + length] = data[:length]
i += 1
wf.write(outdata)
print("Done.")
if __name__ == "__main__":
main()

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -15,6 +15,7 @@ import usb.core
from enum import Enum
import os, sys, inspect
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, current_dir)
@ -23,6 +24,7 @@ try:
except ImportError as e:
print(current_dir)
from qc_diag import qcdiag
pass
try:
@ -38,12 +40,14 @@ class vendor(Enum):
netgear = 0x0846
telit = 0x413c
class deviceclass:
vid=0
pid=0
def __init__(self,vid,pid):
self.vid=vid
self.pid=pid
vid = 0
pid = 0
def __init__(self, vid, pid):
self.vid = vid
self.pid = pid
class connection(metaclass=LogBase):
@ -63,7 +67,7 @@ class connection(metaclass=LogBase):
self.serial = serial.Serial(port=port, baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=1)
self.connected = self.serial.is_open
def waitforusb(self,vid,pid):
def waitforusb(self, vid, pid):
timeout = 0
while timeout < 10:
for device in self.detectusbdevices():
@ -74,12 +78,13 @@ class connection(metaclass=LogBase):
timeout += 1
return False
def websend(self,url):
def websend(self, url):
headers = {'Referer': 'http://192.168.0.1/index.html', 'Accept-Charset': 'UTF-8'}
r = requests.get(url, headers=headers)
if b"FACTORY:ok" in r.content or b"success" in r.content:
print(f"Detected a ZTE in web mode .... switching mode success (convert back by sending \"AT+ZCDRUN=F\" via AT port)")
return self.waitforusb(vendor.zte.value,0x0016)
print(
f"Detected a ZTE in web mode .... switching mode success (convert back by sending \"AT+ZCDRUN=F\" via AT port)")
return self.waitforusb(vendor.zte.value, 0x0016)
return False
def getserialports(self):
@ -87,46 +92,46 @@ class connection(metaclass=LogBase):
def detectusbdevices(self):
dev = usb.core.find(find_all=True)
ids=[deviceclass(cfg.idVendor,cfg.idProduct) for cfg in dev]
ids = [deviceclass(cfg.idVendor, cfg.idProduct) for cfg in dev]
return ids
def detect(self, port):
vendortable={
0x1199:["Sierra Wireless",3],
0x2c7c:["Quectel",3],
0x19d2:["ZTE",2],
0x0846:["Netgear", 2],
0x413c:["Telit",0]
vendortable = {
0x1199: ["Sierra Wireless", 3],
0x2c7c: ["Quectel", 3],
0x19d2: ["ZTE", 2],
0x0846: ["Netgear", 2],
0x413c: ["Telit", 0]
}
mode="Unknown"
mode = "Unknown"
for device in self.detectusbdevices():
if device.vid==vendor.zte.value:
if device.pid==0x0016:
if device.vid == vendor.zte.value:
if device.pid == 0x0016:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Diag mode")
mode="AT"
mode = "AT"
break
elif device.pid==0x1403:
elif device.pid == 0x1403:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
mode="Web"
mode = "Web"
# url = 'http://192.168.0.1/goform/goform_set_cmd_process?goformId=USB_MODE_SWITCH&usb_mode=1' #adb
url = 'http://192.168.0.1/goform/goform_process?goformId=MODE_SWITCH&switchCmd=FACTORY'
if self.websend(url):
mode="AT"
mode = "AT"
break
elif device.vid==vendor.telit.value:
if device.pid==0x81d7:
elif device.vid == vendor.telit.value:
if device.pid == 0x81d7:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Diag mode")
print("Sending download mode command")
interface = 5
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x413c, 0x81d7, interface]])
if diag.connect():
data=diag.hdlc.receive_reply()
data = diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
if res[0]==0x4B:
if res[0] == 0x4B:
print("Sending download mode succeeded")
diag.disconnect()
break
if mode=="AT" or mode=="Unknown":
if mode == "AT" or mode == "Unknown":
for port in self.getserialports():
if port.vid in vendortable:
portid = port.location[-1:]
@ -137,7 +142,7 @@ class connection(metaclass=LogBase):
def readreply(self):
info = []
timeout=0
timeout = 0
if self.serial is not None:
while True:
tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '')
@ -146,11 +151,11 @@ class connection(metaclass=LogBase):
return info
elif "ERROR" in tmp:
return -1
if tmp!="":
if tmp != "":
info.append(tmp)
else:
timeout+=1
if timeout==20:
timeout += 1
if timeout == 20:
break
return info
@ -180,7 +185,7 @@ class connection(metaclass=LogBase):
self.connected = False
def ati(self):
data={}
data = {}
info = self.send("ATI")
if info != -1:
for line in info:
@ -191,20 +196,21 @@ class connection(metaclass=LogBase):
if "Quectel" in line:
data["vendor"] = "Quectel"
if "Manufacturer" in line:
data["manufacturer"]=line.split(":")[1].strip()
data["manufacturer"] = line.split(":")[1].strip()
if "Sierra Wireless" in data["manufacturer"]:
data["vendor"]="Sierra Wireless"
data["vendor"] = "Sierra Wireless"
elif "ZTE CORPORATION" in data["manufacturer"]:
data["vendor"]="ZTE"
data["vendor"] = "ZTE"
elif "Netgear" in data["manufacturer"]:
data["vendor"]="Netgear"
data["vendor"] = "Netgear"
elif "Telit" in data["manufacturer"]:
data["vendor"]="Telit"
data["vendor"] = "Telit"
return data
class dwnloadtools(metaclass=LogBase):
def sendcmd(self, tn,cmd):
tn.write(bytes(cmd,'utf-8')+b"\n")
def sendcmd(self, tn, cmd):
tn.write(bytes(cmd, 'utf-8') + b"\n")
time.sleep(0.05)
return tn.read_eager().strip().decode('utf-8')
@ -212,45 +218,45 @@ class dwnloadtools(metaclass=LogBase):
port = args.port
cn = connection(port)
if cn.connected:
info=cn.ati()
info = cn.ati()
if "vendor" in info:
if info["vendor"]=="Sierra Wireless" or info["vendor"]=="Netgear":
if info["vendor"] == "Sierra Wireless" or info["vendor"] == "Netgear":
print("Sending download mode command")
print(cn.send("AT!BOOTHOLD\r"))
print(cn.send('AT!QPSTDLOAD\r'))
print("Done switching to download mode")
elif info["vendor"]=="Quectel":
elif info["vendor"] == "Quectel":
print("Sending download mode command")
interface=0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x2c7c,0x0125,interface]])
interface = 0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x2c7c, 0x0125, interface]])
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
diag.disconnect()
print("Done switching to download mode")
elif info["vendor"]=="Telit":
elif info["vendor"] == "Telit":
print("Sending download mode command")
interface=0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x2c7c,0x0125,interface]])
interface = 0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x2c7c, 0x0125, interface]])
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
diag.disconnect()
print("Done switching to download mode")
elif info["vendor"]=="ZTE":
elif info["vendor"] == "ZTE":
print("Sending download mode command")
interface=0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x19d2,0x0016, interface]])
interface = 0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x19d2, 0x0016, interface]])
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
if res[0]==0x4B:
if res[0] == 0x4B:
print("Done switching to ENANDPRG mode")
else:
res = diag.send(b"\x3a")
if res[0]==0x3A:
if res[0] == 0x3A:
while True:
state=cn.waitforusb(vendor.zte.value,0x0076)
state = cn.waitforusb(vendor.zte.value, 0x0076)
if not state:
diag.disconnect()
if diag.connect():
@ -264,10 +270,11 @@ class dwnloadtools(metaclass=LogBase):
diag.disconnect()
cn.close()
def main():
version = "1.1"
info = 'Modem Gimme-EDL ' + version + ' (c) B. Kerler 2020-2021'
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description=info)
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description=info)
parser.add_argument(
'-port', '-p',
help='use com port for auto unlock',
@ -277,8 +284,12 @@ def main():
help='use logfile for debug log',
default="")
args = parser.parse_args()
dw=dwnloadtools()
if not args.port:
parser.print_help()
return
dw = dwnloadtools()
dw.run(args)
if __name__=="__main__":
if __name__ == "__main__":
main()

View file

@ -1,17 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import argparse
import hashlib
import time
from Exscript.protocols.telnetlib import Telnet
import requests
import serial
import serial.tools.list_ports
import argparse
import requests
import hashlib
from Exscript.protocols.telnetlib import Telnet
try:
from edlclient.Tools.qc_diag import qcdiag
@ -38,12 +39,13 @@ import logging.config
import logging.handlers
import colorama
itoa64 = bytearray(b"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")
def _crypt_to64(s, v, n):
out=bytearray()
out = bytearray()
while --n >= 0:
out.append(itoa64[v&0x3f])
out.append(itoa64[v & 0x3f])
v >>= 6
@ -191,7 +193,8 @@ class connection:
mode = "AT"
break
elif device.pid == 0x1403:
print(f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
print(
f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
mode = "Web"
self.ZTE_Web()
break
@ -358,7 +361,7 @@ class adbtools(metaclass=LogBase):
if cn.connected:
while True:
resp2 = cn.serial.read(8)
if len(resp2)>0:
if len(resp2) > 0:
break
cn.serial.write(mode)
response = cn.serial.read(8)
@ -378,36 +381,33 @@ class adbtools(metaclass=LogBase):
res = False
if "vendor" in info:
if info["vendor"] == "Sierra Wireless" or info["vendor"] == "Netgear":
res=self.SierraWireless(cn, info, enable)
res = self.SierraWireless(cn, info, enable)
elif info["vendor"] == "Quectel":
print("Sending at switch command")
res=self.Quectel(cn, enable)
res = self.Quectel(cn, enable)
elif info["vendor"] == "ZTE":
print("Sending switch command via diag")
res=self.ZTE(cn, enable)
res = self.ZTE(cn, enable)
elif info["vendor"] == "Simcom":
res=self.Simcom(cn)
res = self.Simcom(cn, enable)
elif info["vendor"] == "Fibocom":
res=self.Fibocom(cn, enable)
res = self.Fibocom(cn, enable)
elif info["vendor"] == "Alcatel":
res=self.Alcatel(enable)
res = self.Alcatel(enable)
elif info["vendor"] == "Samsung":
res=self.Samsung(cn, enable)
if enable:
mode="enabled"
else:
mode="disabled"
res = self.Samsung(cn, enable)
mode = "enabled" if enable else "disabled"
if res:
print("ADB successfully "+mode)
print("ADB successfully " + mode)
else:
print("ADB couldn't be "+mode)
print("ADB couldn't be " + mode)
cn.close()
else:
print("No device detected")
def SierraWireless(self, cn, info, enable):
print("Sending at switch command")
kg = SierraKeygen(cn=cn,devicegeneration=None)
kg = SierraKeygen(cn=cn, devicegeneration=None)
kg.detectdevicegeneration()
if kg.openlock():
if enable:
@ -428,25 +428,25 @@ class adbtools(metaclass=LogBase):
print("Successfully enabled PID 68E2")
return True
index=-1
type=-1
bitmask=-1
resp=cn.send("AT!USBCOMP?")
if resp!=-1:
index = -1
type = -1
bitmask = -1
resp = cn.send("AT!USBCOMP?")
if resp != -1:
print(resp)
for val in resp:
if "Config Index" in val:
index=val[val.find("Config Index: ")+14:]
index = val[val.find("Config Index: ") + 14:]
elif "Config Type" in val:
type=val[val.find("Config Type: ")+14:].replace(" (Generic)","")
type = val[val.find("Config Type: ") + 14:].replace(" (Generic)", "")
elif "Interface bitmask" in val:
bitmask=val[val.find("Interface bitmask: ")+19:]
bitmask = val[val.find("Interface bitmask: ") + 19:]
if " " in bitmask:
bitmask="0x"+bitmask.split(" ")[0]
if index!=-1 and type!=-1 and bitmask!=1:
index=int(index)
type=int(type)
bitmask=int(bitmask,16)
bitmask = "0x" + bitmask.split(" ")[0]
if index != -1 and type != -1 and bitmask != 1:
index = int(index)
type = int(type)
bitmask = int(bitmask, 16)
# AT!USBCOMP=<Config Index>,<Config Type>,<Interface bitmask>
# <Config Index> - configuration index to which the composition applies, should be 1
# <Config Type> - 1:Generic, 2:USBIF-MBIM, 3:RNDIS
@ -465,13 +465,13 @@ class adbtools(metaclass=LogBase):
# ECM - 0x00080000,
# UBIST - 0x00200000
#if enable:
cmd=f"AT!USBCOMP={index},{type},%08X" % 0x0080010E
cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0080010E
#else:
# cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0000010D
resp=cn.send(cmd)
if resp!=-1:
resp=cn.send("AT!RESET")
if resp!=-1:
resp = cn.send(cmd)
if resp != -1:
resp = cn.send("AT!RESET")
if resp != -1:
return True
return False
return True
@ -600,7 +600,7 @@ def main():
else:
enable = False
ad.run(port=args.port, enable=enable)
#ad.meta(port=args.port)
# ad.meta(port=args.port)
if __name__ == "__main__":

View file

@ -1,29 +1,36 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import hashlib
import inspect
import os
import sys
from os import walk
import hashlib
from struct import unpack, pack
from shutil import copyfile
import os, sys, inspect
from io import BytesIO
from Library.utils import elf
from Library.loader_db import loader_utils
from Config.qualcomm_config import vendor
from os import walk
from shutil import copyfile
from struct import unpack
try:
from Config.qualcomm_config import vendor
from Library.loader_db import loader_utils
from Library.utils import elf
except ModuleNotFoundError:
from edlclient.Config.qualcomm_config import vendor
from edlclient.Library.loader_db import loader_utils
from edlclient.Library.utils import elf
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
lu=loader_utils()
lu = loader_utils()
class MBN:
def __init__(self, memory):
self.imageid, self.flashpartitionversion, self.imagesrc, self.loadaddr, self.imagesz, self.codesz, \
self.sigptr, self.sigsz, self.certptr, self.certsz = unpack("<IIIIIIIIII", memory[0xC:0xC + 40])
self.sigptr, self.sigsz, self.certptr, self.certsz = unpack("<IIIIIIIIII", memory[0xC:0xC + 40])
class Signed:
@ -90,9 +97,9 @@ def extract_hdr(memsection, version, sign_info, mem_section, code_size, signatur
anti_rollback_version=unpack("<I", mm[md_offset:md_offset + 4])[0]
'''
if version==6:
if version == 6:
signatureoffset = memsection.file_start_addr + 0x30 + md_size + code_size + signature_size
elif version==7:
elif version == 7:
signatureoffset = memsection.file_start_addr + 0x28 + hdr1 + hdr2 + hdr3 + md_size + code_size + hdr4
try:
if mem_section[signatureoffset] != 0x30:
@ -106,7 +113,8 @@ def extract_hdr(memsection, version, sign_info, mem_section, code_size, signatur
len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
casignature2offset = signatureoffset + len1
len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
idx = signatureoffset
signature = {}
@ -171,7 +179,8 @@ def extract_old_hdr(signatureoffset, sign_info, mem_section, code_size, signatur
len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
casignature2offset = signatureoffset + len1
len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
rootsignature3 = mem_section[(casignature2offset + len2):(casignature2offset + len2) + 999999999].split(
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')[0]
sign_info.pk_hash = hashlib.sha256(rootsignature3).hexdigest()
idx = signatureoffset
@ -220,7 +229,7 @@ def extract_old_hdr(signatureoffset, sign_info, mem_section, code_size, signatur
def init_loader_db():
loaderdb = {}
loaders=os.path.join(current_dir,"..","..","Loaders")
loaders = os.path.join(current_dir, "..", "..", "Loaders")
if not os.path.exists(loaders):
loaders = os.path.join(current_dir, "Loaders")
if not os.path.exists(loaders):
@ -326,7 +335,7 @@ def main(argv):
filelist = []
rt = open(os.path.join(outputdir, argv[1] + ".log"), "w")
for filename in file_list:
filesize=os.stat(filename).st_size
filesize = os.stat(filename).st_size
elfpos = 0
with open(filename, 'rb') as rhandle:
data = rhandle.read()
@ -339,30 +348,30 @@ def main(argv):
signinfo.filename = filename
signinfo.filesize = os.stat(filename).st_size
while elfpos<filesize:
if elfpos==-1:
while elfpos < filesize:
if elfpos == -1:
break
mem_section = data[elfpos:]
elfheader = elf(mem_section, signinfo.filename)
if len(elfheader.pentry)<4:
elfpos = data.find(b"\x7FELF", elfpos+1)
if len(elfheader.pentry) < 4:
elfpos = data.find(b"\x7FELF", elfpos + 1)
continue
idx = 0
for entry in elfheader.pentry:
if entry.p_type==0 and entry.p_flags&0xF000000==0x2000000:
if entry.p_type == 0 and entry.p_flags & 0xF000000 == 0x2000000:
break
idx+=1
idx += 1
if 'memorylayout' in dir(elfheader):
memsection = elfheader.memorylayout[idx]
try:
sect=BytesIO(mem_section[memsection.file_start_addr+0x4:])
version = int.from_bytes(sect.read(4),'little')
hdr1 = int.from_bytes(sect.read(4),'little')
hdr2 = int.from_bytes(sect.read(4),'little')
hdr3 = int.from_bytes(sect.read(4),'little')
code_size = int.from_bytes(sect.read(4),'little')
hdr4 = int.from_bytes(sect.read(4),'little')
signature_size = int.from_bytes(sect.read(4),'little')
sect = BytesIO(mem_section[memsection.file_start_addr + 0x4:])
version = int.from_bytes(sect.read(4), 'little')
hdr1 = int.from_bytes(sect.read(4), 'little')
hdr2 = int.from_bytes(sect.read(4), 'little')
hdr3 = int.from_bytes(sect.read(4), 'little')
code_size = int.from_bytes(sect.read(4), 'little')
hdr4 = int.from_bytes(sect.read(4), 'little')
signature_size = int.from_bytes(sect.read(4), 'little')
# cert_chain_size=unpack("<I", mem_section[memsection.file_start_addr + 0x24:memsection.file_start_addr + 0x24 + 0x4])[0]
except Exception as err:
print(err)
@ -381,7 +390,8 @@ def main(argv):
filelist.append(signinfo)
break
elif version >= 6: # SDM
signinfo = extract_hdr(memsection, version, signinfo, mem_section, code_size, signature_size, hdr1,
signinfo = extract_hdr(memsection, version, signinfo, mem_section, code_size, signature_size,
hdr1,
hdr2, hdr3, hdr4)
if signinfo is None:
continue
@ -390,7 +400,7 @@ def main(argv):
else:
print("Unknown version for " + filename)
continue
if elfpos == -1 and int.from_bytes(data[:4],'little') == 0x844BDCD1:
if elfpos == -1 and int.from_bytes(data[:4], 'little') == 0x844BDCD1:
mbn = MBN(mem_section)
if mbn.sigsz == 0:
print("%s has no signature." % filename)
@ -414,13 +424,13 @@ def main(argv):
loaderlists = {}
for item in sorted_x:
if item.oem_id != '':
oemid=int(item.oem_id,16)
oemid = int(item.oem_id, 16)
if oemid in vendor:
oeminfo = vendor[oemid]
else:
oeminfo=item.oem_id
if len(item.sw_id)<16:
item.sw_id="0"*(16-len(item.sw_id))+item.sw_id
oeminfo = item.oem_id
if len(item.sw_id) < 16:
item.sw_id = "0" * (16 - len(item.sw_id)) + item.sw_id
info = f"OEM:{oeminfo}\tMODEL:{item.model_id}\tHWID:{item.hw_id}\tSWID:{item.sw_id}\tSWSIZE:{item.sw_size}\tPK_HASH:{item.pk_hash}\t{item.filename}\t{str(item.filesize)}"
if item.oem_version != '':
info += "\tOEMVER:" + item.oem_version + "\tQCVER:" + item.qc_version + "\tVAR:" + item.image_variant
@ -444,11 +454,11 @@ def main(argv):
if b"peek\x00" in data:
auth += "_peek"
fna = os.path.join(outputdir, (
hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower())
hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower())
if not os.path.exists(fna):
copyfile(item.filename,
os.path.join(outputdir, hwid + "_" + (
loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower()))
loader_info.pk_hash[0:16] + "_FHPRG" + auth + ".bin").lower()))
elif item.filesize > os.stat(fna).st_size:
copyfile(item.filename, os.path.join(outputdir,
(hwid + "_" + loader_info.pk_hash[
@ -470,7 +480,7 @@ def main(argv):
except:
continue
else:
print("Unknown :"+item.filename)
print("Unknown :" + item.filename)
copyfile(item.filename, os.path.join(outputdir, "Unknown", os.path.basename(item.filename).lower()))
for item in filelist:

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2023 under GPLv3 license
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
@ -90,7 +90,7 @@ subnvitem_type = [
]
class fs_factimage_read_info():
class fs_factimage_read_info:
def_fs_factimage_read_info = [
("stream_state", "B"), # 0 indicates no more data to be sent, otherwise set to 1
("info_cluster_sent", "B"), # 0 indicates if info_cluster was not sent, else 1
@ -117,7 +117,7 @@ class fs_factimage_read_info():
return data
class FactoryHeader():
class FactoryHeader:
def_factory_header = [
("magic1", "I"),
("magic2", "I"),
@ -160,7 +160,7 @@ class FactoryHeader():
return data
class nvitem():
class nvitem:
item = 0x0
data = b""
status = 0x0
@ -409,7 +409,7 @@ class qcdiag(metaclass=LogBase):
self.cdc.close(True)
def send(self, cmd):
if self.hdlc != None:
if self.hdlc is not None:
return self.hdlc.send_cmd_np(cmd)
def cmd_info(self):
@ -502,7 +502,7 @@ class qcdiag(metaclass=LogBase):
res, nvitem = self.read_nvitem(item)
if res:
info = self.DecodeNVItems(nvitem)
if res != False:
if res:
if nvitem.name != "":
ItemNumber = f"{hex(item)} ({nvitem.name}): "
else:
@ -520,7 +520,7 @@ class qcdiag(metaclass=LogBase):
def print_nvitemsub(self, item, index):
res, nvitem = self.read_nvitemsub(item, index)
info = self.DecodeNVItems(nvitem)
if res != False:
if res:
if nvitem.name != "":
ItemNumber = f"{hex(item), hex(index)} ({nvitem.name}): "
else:
@ -545,7 +545,7 @@ class qcdiag(metaclass=LogBase):
print_progress(prog, 100, prefix="Progress:", suffix=f"Complete, item {hex(item)}", bar_length=50)
old = prog
res, nvitem = self.read_nvitem(item)
if res != False:
if res:
if nvitem.status != 0x5:
nvitem.status = self.DecodeNVItems(nvitem)
nvitems.append(dict(id=nvitem.item, name=nvitem.name, data=hexlify(nvitem.data).decode("utf-8"),
@ -649,7 +649,7 @@ class qcdiag(metaclass=LogBase):
if len(res) > 0:
if res[0] == 0x27:
res, nvitem = self.read_nvitem(item)
if res == False:
if not res:
print(f"Error while writing nvitem {hex(item)} data, %s" % data)
else:
if nvitem.data != data:
@ -671,7 +671,7 @@ class qcdiag(metaclass=LogBase):
if len(res) > 0:
if res[0] == 0x4B:
res, nvitem = self.read_nvitemsub(item, index)
if res == False:
if not res:
print(f"Error while writing nvitem {hex(item)} index {hex(index)} data, %s" % data)
else:
if nvitem.data != data:
@ -809,7 +809,7 @@ class qcdiag(metaclass=LogBase):
return False
write_handle.close()
if efserr == False:
if not efserr:
print("Successfully read EFS.")
return True
else:
@ -1408,7 +1408,7 @@ def main():
parser_nvwritesub.add_argument("-debugmode", help="[Option] Enable verbose logging", action="store_true")
parser_writeimei = subparser.add_parser("writeimei", help="Write imei")
parser_writeimei.add_argument("imei", metavar=("<imei1,imei2,...>"), help="[Option] IMEI to write", default="")
parser_writeimei.add_argument("imei", metavar="<imei1,imei2,...>", help="[Option] IMEI to write", default="")
parser_writeimei.add_argument("-vid", metavar="<vid>", help="[Option] Specify vid", default="")
parser_writeimei.add_argument("-pid", metavar="<pid>", help="[Option] Specify pid", default="")
parser_writeimei.add_argument("-interface", metavar="<pid>", help="[Option] Specify interface number, default=0)",

View file

@ -17,6 +17,7 @@ import logging.config
import logging.handlers
import colorama
class ColorFormatter(logging.Formatter):
LOG_COLORS = {
logging.ERROR: colorama.Fore.RED,
@ -153,8 +154,8 @@ infotable = {
"SDX65": ["MR6400", "MR6500", "MR6110", "MR6150", "MR6450", "MR6550"]
}
# 0 MC8775_H2.0.8.19 !OPENLOCK, !OPENCND .. MC8765V,MC8765,MC8755V,MC8775,MC8775V,MC8775,AC850,
# AC860,AC875,AC881,AC881U,AC875, AC340U 1.13.12.14
# 0 MC8775_H2.0.8.19 !OPENLOCK, !OPENCND .. MC8765V,MC8765,MC8755V,MC8775,MC8775V,MC8775,AC850,
# AC860,AC875,AC881,AC881U,AC875, AC340U 1.13.12.14
keytable = bytearray([0xF0, 0x14, 0x55, 0x0D, 0x5E, 0xDA, 0x92, 0xB3, 0xA7, 0x6C, 0xCE, 0x84, 0x90, 0xBC, 0x7F, 0xED,
# 1 MC8775_H2.0.8.19 AC340U, OPENMEP default
0x61, 0x94, 0xCE, 0xA7, 0xB0, 0xEA, 0x4F, 0x0A, 0x73, 0xC5, 0xC3, 0xA6, 0x5E, 0xEC, 0x1C, 0xE2,
@ -210,7 +211,8 @@ keytable = bytearray([0xF0, 0x14, 0x55, 0x0D, 0x5E, 0xDA, 0x92, 0xB3, 0xA7, 0x6C
0x46, 0x30, 0x33, 0x43, 0x44, 0x36, 0x42, 0x34, 0x41, 0x32, 0x31, 0x32, 0x30, 0x35, 0x39, 0x37
])
class SierraGenerator():
class SierraGenerator:
tbl = bytearray()
rtbl = bytearray()
devicegeneration = None
@ -265,7 +267,7 @@ class SierraGenerator():
{"challenge": "BE96CBBEE0829BCA", "devicegeneration": "MDM9200", "response": "EEDBF8BFF8DAE346"},
{"challenge": "20E253156762DACE", "devicegeneration": "SDX55", "response": "03940D7067145323"},
{"challenge": "2387885E7D290FEE", "devicegeneration": "MDM9x15A", "response": "DC3E51897BAA9C1E"},
{"challenge": "4B1FEF9FD43C6DAA", "devicegeneration": "SDX65", "response":"1253C1B1E447B697"}
{"challenge": "4B1FEF9FD43C6DAA", "devicegeneration": "SDX65", "response": "1253C1B1E447B697"}
]
for test in test_table:
challenge = test["challenge"]
@ -425,7 +427,7 @@ class connection:
def readreply(self):
info = []
if self.serial is not None:
while (True):
while True:
tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '')
if "OK" in info:
return info
@ -464,7 +466,7 @@ class SierraKeygen(metaclass=LogBase):
def __init__(self, cn, devicegeneration=None):
self.cn = cn
self.keygen = SierraGenerator()
if devicegeneration == None:
if devicegeneration is None:
self.detectdevicegeneration()
else:
self.devicegeneration = devicegeneration
@ -506,7 +508,7 @@ class SierraKeygen(metaclass=LogBase):
devicegeneration = "MDM9x30_V1"
else:
devicegeneration = "MDM9x30"
elif "9X40" in revision and not "9X40C" in revision:
elif "9X40" in revision and "9X40C" not in revision:
devicegeneration = "MDM9x40"
elif "9X50" in revision:
if "NTG9X50" in revision:
@ -528,7 +530,7 @@ class SierraKeygen(metaclass=LogBase):
devicegeneration = "SDX55"
else: # MR6400 NTGX65_10.04.13.03
devicegeneration = "SDX65"
# MR6550 NTGX65_12.01.31.00
# MR6550 NTGX65_12.01.31.00
devicegeneration = "SDX65"
else:
devicegeneration = ""

View file

@ -7,51 +7,53 @@
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
# TXT to EDL Loader (c) B.Kerler 2023
import os,sys
import sys
from struct import unpack
def main():
if len(sys.argv)<2:
print("Usage: ./txt_to_loader.py [log.txt] [loader.elf]")
sys.exit(0)
with open(sys.argv[1],"rb") as rf:
data=bytearray()
for line in rf.readlines():
if line[0]==0x20:
tt=line.split(b" ")[:-1]
tt=tt[1:17]
xx=b"".join(tt)
data.extend(bytes.fromhex(xx.decode('utf-8')))
if len(sys.argv) < 2:
print("Usage: ./txt_to_loader.py [log.txt] [loader.elf]")
sys.exit(0)
with open(sys.argv[1], "rb") as rf:
data = bytearray()
for line in rf.readlines():
if line[0] == 0x20:
tt = line.split(b" ")[:-1]
tt = tt[1:17]
xx = b"".join(tt)
data.extend(bytes.fromhex(xx.decode('utf-8')))
outdata=bytearray()
i=0
seq=b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx=data.find(seq)
if idx==-1:
if i==0:
seq=b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i+=1
continue
else:
break
else:
cmd=unpack("<I", data[idx:idx+4])[0]
if cmd==0x03:
cmd,tlen,slen,offset,length=unpack("<IIIII",data[idx:idx+0x14])
elif cmd==0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x20:]
print("Offset : %08X Length: %08X" %(offset,length))
while len(outdata)<offset+length:
outdata.append(0xFF)
outdata[offset:offset+length]=data[:length]
i+=1
data = data[length:]
wf.write(outdata)
outdata = bytearray()
i = 0
seq = b"\x03\x00\x00\x00\x14\x00\x00\x00\x0D\x00\x00\x00"
with open(sys.argv[2], "wb") as wf:
while True:
idx = data.find(seq)
if idx == -1:
if i == 0:
seq = b"\x12\x00\x00\x00\x20\x00\x00\x00\x0D\x00\x00\x00\x00\x00\x00\x00"
i += 1
continue
else:
break
else:
cmd = unpack("<I", data[idx:idx + 4])[0]
if cmd == 0x03:
cmd, tlen, slen, offset, length = unpack("<IIIII", data[idx:idx + 0x14])
elif cmd == 0x12:
cmd, tlen, slen, offset, length = unpack("<IIQQQ", data[idx:idx + 0x20])
data = data[idx + 0x20:]
print("Offset : %08X Length: %08X" % (offset, length))
while len(outdata) < offset + length:
outdata.append(0xFF)
outdata[offset:offset + length] = data[:length]
i += 1
data = data[length:]
wf.write(outdata)
print("Done.")
if __name__=="__main__":
main()
print("Done.")
if __name__ == "__main__":
main()

View file

@ -1,5 +1,5 @@
# Challenge/Response Generator for Sierra Wireless Cards V1.2
(c) B. Kerler 2019-2023
(c) B. Kerler 2019-2024
GPLv3 License
## Why