MDM9x60 support. Multiple usb fixes. Improved streaming stuff

This commit is contained in:
Bjoern Kerler 2021-01-19 16:07:26 +01:00
parent 39984f79d7
commit 36cd459ed1
14 changed files with 528 additions and 233 deletions

View file

@ -190,9 +190,10 @@ class firehose(metaclass=LogBase):
self.cdc.write(data, self.cfg.MaxXMLSizeInBytes)
else:
self.cdc.write(bytes(data, 'utf-8'), self.cfg.MaxXMLSizeInBytes)
time.sleep(0.01)
rdata = bytearray()
counter = 0
timeout = 3
timeout = 30
resp = {"value": "NAK"}
status = False
if not skipresponse:
@ -201,7 +202,7 @@ class firehose(metaclass=LogBase):
tmp = self.cdc.read(self.cfg.MaxXMLSizeInBytes)
if tmp == b"":
counter += 1
time.sleep(0.3)
time.sleep(0.05)
if counter > timeout:
break
rdata += tmp
@ -380,7 +381,7 @@ class firehose(metaclass=LogBase):
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data)
time.sleep(0.01)
#time.sleep(0.01)
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
if rsp[0]:
@ -410,7 +411,7 @@ class firehose(metaclass=LogBase):
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
self.cdc.write(b'', self.cfg.MaxPayloadSizeToTargetInBytes)
time.sleep(0.2)
#time.sleep(0.2)
info = self.xml.getlog(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
rsp = self.xml.getresponse(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
if "value" in rsp:
@ -455,7 +456,7 @@ class firehose(metaclass=LogBase):
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data)
time.sleep(0.01)
#time.sleep(0.01)
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
if rsp[0]:
@ -484,7 +485,7 @@ class firehose(metaclass=LogBase):
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
self.cdc.write(b'', self.cfg.MaxPayloadSizeToTargetInBytes)
time.sleep(0.2)
#time.sleep(0.2)
info = self.xml.getlog(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
rsp = self.xml.getresponse(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
if "value" in rsp:
@ -525,7 +526,7 @@ class firehose(metaclass=LogBase):
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data)
time.sleep(0.01)
#time.sleep(0.01)
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
if rsp[0]:
@ -542,7 +543,7 @@ class firehose(metaclass=LogBase):
bytesToWrite -= wlen
pos += wlen
self.cdc.write(b'', self.cfg.MaxPayloadSizeToTargetInBytes)
time.sleep(0.2)
#time.sleep(0.2)
info = self.xml.getlog(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
rsp = self.xml.getresponse(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
if "value" in rsp:
@ -584,7 +585,7 @@ class firehose(metaclass=LogBase):
f" start_sector=\"{cursector}\"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
time.sleep(0.01)
#time.sleep(0.01)
if rsp[0]:
if "value" in rsp[1]:
if rsp[1]["value"] == "NAK":

View file

@ -8,7 +8,7 @@ from Config.qualcomm_config import infotbl, msmids, secureboottbl, sochw
from Library.xmlparser import xmlparser
from Library.utils import do_tcp_server
from Config.qualcomm_config import memory_type
from Library.utils import LogBase
from Library.utils import LogBase, getint
try:
import xml.etree.cElementTree as ET
from xml.etree import cElementTree as ElementTree
@ -38,8 +38,8 @@ class firehose_client(metaclass=LogBase):
self.cfg.ZLPAwareHost = 1
self.cfg.SkipStorageInit = arguments["--skipstorageinit"]
self.cfg.SkipWrite = arguments["--skipwrite"]
self.cfg.MaxPayloadSizeToTargetInBytes = int(arguments["--maxpayload"], 16)
self.cfg.SECTOR_SIZE_IN_BYTES = int(arguments["--sectorsize"], 16)
self.cfg.MaxPayloadSizeToTargetInBytes = getint(arguments["--maxpayload"])
self.cfg.SECTOR_SIZE_IN_BYTES = getint(arguments["--sectorsize"])
self.cfg.bit64 = sahara.bit64
devicemodel = ""
skipresponse = False
@ -427,8 +427,8 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Peek command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
length = int(options["<length>"], 16)
offset = getint(options["<offset>"])
length = getint(options["<length>"])
filename = options["<filename>"]
self.firehose.cmd_peek(offset, length, filename, True)
self.__logger.info(
@ -441,8 +441,8 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Peek command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
length = int(options["<length>"], 16)
offset = getint(options["<offset>"])
length = getint(options["<length>"])
resp = self.firehose.cmd_peek(offset, length, "", True)
self.printer("\n")
self.printer(hexlify(resp))
@ -454,7 +454,7 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Peek command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
resp = self.firehose.cmd_peek(offset, 8, "", True)
self.printer("\n")
self.printer(hex(unpack("<Q", resp[:8])[0]))
@ -466,7 +466,7 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Peek command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
resp = self.firehose.cmd_peek(offset, 4, "", True)
self.printer("\n")
self.printer(hex(unpack("<I", resp[:4])[0]))
@ -478,7 +478,7 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Poke command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
filename = options["<filename>"]
return self.firehose.cmd_poke(offset, "", filename, True)
elif cmd == "pokehex":
@ -488,7 +488,7 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Poke command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
data = unhexlify(options["<data>"])
return self.firehose.cmd_poke(offset, data, "", True)
elif cmd == "pokeqword":
@ -498,8 +498,8 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Poke command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
data = pack("<Q", int(options["<data>"], 16))
offset = getint(options["<offset>"])
data = pack("<Q", getint(options["<data>"]))
return self.firehose.cmd_poke(offset, data, "", True)
elif cmd == "pokedword":
if not self.check_param(["<offset>", "<data>"]):
@ -508,8 +508,8 @@ class firehose_client(metaclass=LogBase):
self.__logger.error("Poke command isn't supported by edl loader")
return False
else:
offset = int(options["<offset>"], 16)
data = pack("<I", int(options["<data>"], 16))
offset = getint(options["<offset>"])
data = pack("<I", getint(options["<data>"]))
return self.firehose.cmd_poke(offset, data, "", True)
elif cmd == "memcpy":
if not self.check_param(["<offset>", "<size>"]):
@ -517,8 +517,8 @@ class firehose_client(metaclass=LogBase):
if not self.check_cmd("poke"):
self.printer("Poke command isn't supported by edl loader")
else:
srcoffset = int(options["<offset>"], 16)
size = int(options["<size>"], 16)
srcoffset = getint(options["<offset>"])
size = getint(options["<size>"])
dstoffset = srcoffset + size
if self.firehose.cmd_memcpy(dstoffset, srcoffset, size):
self.printer(f"Memcpy from {hex(srcoffset)} to {hex(dstoffset)} succeeded")

View file

@ -197,6 +197,8 @@ class hdlc:
# FlushFileBuffers(ser)
def send_cmd_base(self, outdata, prefixflag, nocrc=False):
if isinstance(outdata,str):
outdata=bytes(outdata,'utf-8')
packet = self.convert_cmdbuf(bytearray(outdata))
if self.send_unframed_buf(packet, prefixflag):
if nocrc:

View file

@ -1,18 +1,19 @@
import ctypes
from enum import Enum
from struct import unpack, pack
from Config.qualcomm_config import secgen,secureboottbl
c_uint8 = ctypes.c_uint8
# nandbase MSM_NAND_BASE
# qfprom SECURITY_CONTROL_BASE_PHYS
config_tbl={
# bam nandbase bcraddr secureboot pbl qfprom memtbl
3: ["9x25",1,0xf9af0000,0xfc401a40,0xFC4B8000 + 0x6080,[0xFC010000, 0x18000],[0xFC4B8000, 0x6000], [0x200000, 0x24000]],
8: ["9x35",1,0xf9af0000,0xfc401a40,0xFC4B8000 + 0x6080,[0xFC010000, 0x18000],[0xFC4B8000, 0x6000], [0x200000, 0x24000]],
10: ["9x45",1,0x79B0000,0x183f000,0xFC4B8000 + 0x6080,[0xFC010000, 0x18000],[0x58000, 0x6000],[0x200000, 0x24000]],
16: ["9x55",0,0x79B0000,0x183F000,0x000a01d0,[0x100000, 0x18000],[0x000A0000, 0x6000],[0x200000, 0x24000]], #9x6x as well
12: ["9x07",0,0x79B0000,0x183F000,0x000a01d0,[0x100000, 0x18000],[0x000A0000, 0x6000],[0x200000, 0x24000]]
3: ["9x25",1,0xf9af0000,0xfc401a40,secureboottbl["MDM9x25"],secgen[2][0],secgen[2][1],secgen[2][2]],
8: ["9x35",1,0xf9af0000,0xfc401a40,secureboottbl["MDM9x35"],secgen[2][0],secgen[2][1],secgen[2][2]],
10: ["9x45",1,0x79B0000,0x183f000,secureboottbl["MDM9x45"],secgen[2][0],secgen[2][1],secgen[2][2]],
16: ["9x55",0,0x79B0000,0x183f000,secureboottbl["MDM9x45"],secgen[5][0],secgen[5][1],secgen[5][2]],
17: ["9x60",0,0x79B0000,0x183f000,secureboottbl["MDM9x60"],secgen[5][0],secgen[5][1],secgen[5][2]],
12: ["9x07",0,0x79B0000,0x183f000,secureboottbl["MDM9607"],secgen[5][0],secgen[5][1],secgen[5][2]]
}
supported_flash = {
@ -264,6 +265,11 @@ class SettingsOpt:
self.BAD_BLOCK_IN_SPARE_AREA = 0
self.ECC_MODE = 0
self.bad_loader = 1
self.secureboot=secureboottbl["MDM9607"]
self.pbl=secgen[5][0]
self.qfprom=secgen[5][1]
self.memtbl=secgen[5][2]
self.chipname = "Unknown"
if chipset in config_tbl:
self.chipname, self.bam, self.nandbase, self.bcraddr, self.secureboot, self.pbl, self.qfprom, self.memtbl=config_tbl[chipset]
self.bad_loader = 0
@ -274,6 +280,8 @@ class SettingsOpt:
self.chipname, self.bam, self.nandbase, self.bcraddr, self.secureboot, self.pbl, self.qfprom, self.memtbl = \
config_tbl[chipid]
self.bad_loader = 0
if chipset==0xFF:
self.bad_loader=0
class nand_toshiba_ids(ctypes.LittleEndianStructure):
_fields_ = [

View file

@ -367,10 +367,24 @@ class sahara(metaclass=LogBase):
return ["sahara", None]
elif b"<?xml" in v:
return ["firehose", None]
elif v[0]==0x7E:
return ["nandprg", None]
else:
data = b"<?xml version=\"1.0\" ?><data><nop /></data>"
self.cdc.write(data, 4096)
self.cdc.write(data, 0x80)
res = self.cdc.read()
if res==b"":
try:
data = b"\x7E\x06\x4E\x95\x7E" # Streaming nop
self.cdc.write(data, 0x80)
res = self.cdc.read()
if b"\x7E\x0D\x16\x00\x00\x00\x00" in res or b"Invalid Command" in res:
return ["nandprg", None]
else:
return ["", None]
except Exception as e:
self.__logger.error(str(e))
return ["", None]
if (b"<?xml" in res):
return ["firehose", None]
elif len(res)>0 and res[0] == self.cmd.SAHARA_END_TRANSFER:
@ -388,17 +402,7 @@ class sahara(metaclass=LogBase):
except Exception as e:
self.__logger.error(str(e))
try:
data = b"\x7E\x06\x4E\x95\x7E" # Streaming nop
self.cdc.write(data, 4096)
res = self.cdc.read()
if b"\x7E\x0D\x16\x00\x00\x00\x00" in res:
return ["nandprg", None]
else:
return ["", None]
except Exception as e:
self.__logger.error(str(e))
return ["", None]
self.cmd_modeswitch(self.sahara_mode.SAHARA_MODE_MEMORY_DEBUG)
cmd, pkt = self.get_rsp()
if cmd==-1 and pkt==-1:
@ -471,6 +475,10 @@ class sahara(metaclass=LogBase):
self.oem_str = "{:04x}".format(self.oem_id)
self.model_id = "{:04x}".format(self.model_id)
self.msm_str = "{:08x}".format(self.msm_id)
if self.msm_id in msmids:
cpustr=f"CPU detected: \"{msmids[self.msm_id]}\"\n"
else:
cpustr="Unknown CPU, please send log as issue to https://github.com/bkerler/edl\n"
"""
if self.version >= 2.4:
self.__logger.info(f"\n------------------------\n" +
@ -486,6 +494,7 @@ class sahara(metaclass=LogBase):
f"HWID: 0x{self.hwidstr} (MSM_ID:0x{self.msm_str}," +
f"OEM_ID:0x{self.oem_str}," +
f"MODEL_ID:0x{self.model_id})\n" +
cpustr +
f"PK_HASH: 0x{self.pkhash}\n" +
f"Serial: 0x{self.serials}\n")
if self.programmer == "":
@ -563,7 +572,10 @@ class sahara(metaclass=LogBase):
def cmd_reset(self):
self.cdc.write(pack("<II", self.cmd.SAHARA_RESET_REQ, 0x8))
cmd, pkt = self.get_rsp()
try:
cmd, pkt = self.get_rsp()
except:
return False
if cmd["cmd"] == self.cmd.SAHARA_RESET_RSP:
return True
elif "status" in pkt:

View file

@ -1,4 +1,5 @@
from struct import unpack, pack
from struct import pack
from binascii import unhexlify
from Library.utils import *
from Library.hdlc import *
from Library.nand_config import BadFlags, SettingsOpt, nand_ids, nand_manuf_ids, nandregs, NandDevice
@ -15,6 +16,9 @@ class Streaming(metaclass=LogBase):
self.nanddevice = None
self.nandbase = 0
self.__logger.setLevel(loglevel)
self.Qualcomm = 0
self.Patched = 1
if loglevel==logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
@ -139,10 +143,11 @@ class Streaming(metaclass=LogBase):
if len(resp) > 2 and resp[1] == 0x16:
time.sleep(0.5)
return True
logging.error("Error on closing stream")
self.__logger.error("Error on closing stream")
return False
def send_section_header(self, name):
# 0x1b open muliimage, 0xe for user-defined partition
resp = self.send(b"\x1b\x0e" + name + b"\x00")
if resp[1] == 0x1c:
return True
@ -176,6 +181,37 @@ class Streaming(metaclass=LogBase):
self.__logger.error("Error on closing data stream")
return False
def read_sectors(self,sector,sectors,filename, info=False):
old=0
sectorsize = self.settings.PAGESIZE // self.settings.sectors_per_page
if info:
print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50)
with open(filename, "wb") as write_handle:
while sector < sectors:
offset = (sector // self.settings.sectors_per_page) * self.settings.PAGESIZE
page = int(offset / self.settings.PAGESIZE)
curblock = int(page / self.settings.num_pages_per_blk)
curpage = int(page - curblock * self.settings.num_pages_per_blk)
if sectors - sector < self.settings.sectors_per_page:
sectorstoread = sectors - sector
else:
sectorstoread = self.settings.sectors_per_page
data, extra = self.flash_read(curblock, curpage, sectorstoread,
self.settings.UD_SIZE_BYTES)
if sector % self.settings.sectors_per_page != 0:
data = data[sectorsize * sector:]
write_handle.write(data)
sector += sectorstoread
if info:
prog = int(float(sector) / float(sectors) * float(100))
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete (Sector %d)' % sector, bar_length=50)
old = prog
self.nand_post()
if info:
print_progress(100, 100, prefix='Progress:', suffix='Complete', bar_length=50)
return True
def send_ptable(self, parttable, mode):
cmdbuf = b"\x19" + pack("<B", mode) + parttable
resp = self.send(cmdbuf)
@ -187,13 +223,41 @@ class Streaming(metaclass=LogBase):
self.__logger.error("Partition tables do not match - you need to fully flash the modem")
return False
def memread(self, address, length):
logging.debug("memread %08X:%08X" % (address, length))
def qc_memread(self, address, length):
self.__logger.debug("memread %08X:%08X" % (address, length))
data=bytearray()
cmdbuf=b"\x03"
toread=length
for i in range(0, length, 512):
size=512
if toread<size:
size=toread
tmp = self.send(cmdbuf + pack("<I", address+i) + pack("<H", size), True)
if tmp[1] == 0x04:
data.extend(tmp[6:])
else:
return b""
toread-=size
return data
def patched_memread(self, address, length):
self.__logger.debug("memread %08X:%08X" % (address, length))
result = b""
cmdbuf = bytearray(
[0x11, 0x00, 0x24, 0x30, 0x9f, 0xe5, 0x24, 0x40, 0x9f, 0xe5, 0x12, 0x00, 0xa0, 0xe3, 0x04, 0x00,
0x81, 0xe4, 0x04, 0x00, 0x83, 0xe0, 0x04, 0x20, 0x93, 0xe4, 0x04, 0x20, 0x81, 0xe4, 0x00, 0x00,
0x53, 0xe1, 0xfb, 0xff, 0xff, 0x3a, 0x04, 0x40, 0x84, 0xe2, 0x1e, 0xff, 0x2f, 0xe1])
"""
LDR R3, loc_2C
LDR R4, loc_30
MOV R0, #0x12
STR R0, [R1],#4
ADD R0, R3, R4
LDR R2, [R3],#4
STR R2, [R1],#4
CMP R3, R0
BCC loc_14
ADD R4, R4, #4
BX LR
"""
readt=unhexlify("24309fe524409fe51200a0e3040081e4040083e0042093e4042081e4000053e1fbffff3a044084e21eff2fe1")
cmdbuf = b"\x11\x00"+readt
errcount = 0
blklen = 1000
for i in range(0, length, 1000):
@ -211,7 +275,7 @@ class Streaming(metaclass=LogBase):
else:
break
if tries == 0:
logging.error(
self.__logger.error(
f"Error reading memory at addr {hex(address)}, {str(blklen)} bytes required, {str(iolen)} bytes "
f"received.")
errcount += 1
@ -226,18 +290,29 @@ class Streaming(metaclass=LogBase):
def mempeek(self, address):
res = self.memread(address, 4)
if res != b"":
data = unpack("<I", res)[0]
logging.debug("memread %08X:%08X" % (address, data))
data = unpack("<I", res[:4])[0]
self.__logger.debug("memread %08X:%08X" % (address, data))
return data
return -1
def memwrite(self, address, data):
length = len(data)
cmdbuf = bytearray(
[0x11, 0x00, 0x38, 0x00, 0x80, 0xe2, 0x24, 0x30, 0x9f, 0xe5, 0x24, 0x40, 0x9f, 0xe5, 0x04, 0x40,
0x83, 0xe0, 0x04, 0x20, 0x90, 0xe4, 0x04, 0x20, 0x83, 0xe4, 0x04, 0x00, 0x53, 0xe1, 0xfb, 0xff,
0xff, 0x3a, 0x12, 0x00, 0xa0, 0xe3, 0x00, 0x00, 0xc1, 0xe5, 0x01, 0x40, 0xa0, 0xe3, 0x1e, 0xff,
0x2f, 0xe1])
"""
ADD R0, R0, #0x38 ; '8'
LDR R3, loc_30
LDR R4, loc_34
ADD R4, R3, R4
LDR R2, [R0],#4
STR R2, [R3],#4
CMP R3, R4
BCC loc_10
MOV R0, #0x12
STRB R0, [R1]
MOV R4, #1
BX LR
"""
writet=unhexlify("380080e224309fe524409fe5044083e0042090e4042083e4040053e1fbffff3a1200a0e30000c1e50140a0e31eff2fe1")
cmdbuf = b"\x11\x00" + writet
if len(data) > 1000:
data = data[0:1000]
length = 1000
@ -252,7 +327,7 @@ class Streaming(metaclass=LogBase):
return False
def mempoke(self, address, value):
logging.debug("mempoke %08X:%08X" % (address, value))
self.__logger.debug("mempoke %08X:%08X" % (address, value))
data = pack("<I", value & 0xFFFFFFFF)
return self.memwrite(address, data)
@ -378,6 +453,7 @@ class Streaming(metaclass=LogBase):
return 1
"""
def flash_read(self, block, page, sectors, cwsize=None):
buffer = bytearray()
spare = bytearray()
@ -537,9 +613,13 @@ class Streaming(metaclass=LogBase):
return buffer
return -1
def get_partitions(self):
def get_partitions(self,filename=""):
partitions = {}
partdata = self.read_partition_table()
if filename=="":
partdata = self.read_partition_table()
else:
with open(filename, "rb") as rf:
partdata = rf.read()
if partdata != -1:
data = partdata[0x10:]
for i in range(0, len(data) // 0x1C):
@ -589,88 +669,155 @@ class Streaming(metaclass=LogBase):
"NAND_FLASH_BUFFER": self.nanddevice.NAND_FLASH_BUFFER
}
def hello(self):
info = b"\x01QCOM fast download protocol host\x03\x23\x23\x23\x20"
resp = self.send(info, True)
if resp==b"":
return False
if resp[1] != b'\x02':
resp = self.send(info, True)
if len(resp) > 0x2c:
self.__logger.info("Successfully uploaded programmer :)")
if b"Unrecognized flash device" in resp:
self.__logger.error("Unrecognized flash device, patch loader to match flash or use different loader!")
self.reset()
return False,None
resp = bytearray(resp)
try:
class hellopacket:
command=0
magic=b""
version=0
compatibleVersion=0
maxPreferredBlockSize=0
baseFlashAddress=0
flashIdLength=0
flashId=b""
windowSize=0
numberOfSectors=0
sectorSizes=0
featureBits=0
hp = hellopacket()
hp.command=resp[1]
hp.magic=resp[2:2+32]
hp.version=resp[2+32]
hp.compatibleVersion=resp[3+32]
hp.maxPreferredBlockSize=unpack("I",resp[4+32:4+32+4])[0]
hp.baseFlashAddress=unpack("I",resp[4+4+32:4+32+4+4])[0]
hp.flashIdLength=resp[44]
offset=45
hp.flashId=resp[offset:offset+hp.flashIdLength]
offset+=hp.flashIdLength
data=unpack("HH",resp[offset:offset+4])
hp.windowSize=data[0]
hp.numberOfSectors=data[1]
data=unpack(str(hp.numberOfSectors)+"I",resp[offset+4:offset+4+(4*hp.numberOfSectors)])
hp.sectorSizes=data
hp.featureBits=resp[offset+4+hp.numberOfSectors*4:offset+4+hp.numberOfSectors*4+1]
"""
self.settings.PAGESIZE=512
self.settings.UD_SIZE_BYTES=512
self.settings.num_pages_per_blk=hp.maxPreferredBlockSize
self.settings.sectors_per_page=hp.numberOfSectors
"""
return True, hp
except Exception as e:
return False, None
def connect(self, mode=1):
time.sleep(0.200)
self.memread = self.patched_memread
if mode == 0:
cmdbuf = bytearray(
[0x11, 0x00, 0x12, 0x00, 0xa0, 0xe3, 0x00, 0x00, 0xc1, 0xe5, 0x01, 0x40, 0xa0, 0xe3, 0x1e, 0xff, 0x2f,
0xe1])
resp = self.send(cmdbuf, True)
resp2 = self.hdlc.receive_reply(5)
i = resp[1]
if i == 0x12:
# if not self.tst_loader():
# print("Unlocked bootloader being used, cannot continue")
# exit(2)
self.streaming_mode = self.Patched
chipset = self.identify_chipset()
if self.streaming_mode == self.Patched:
self.memread = self.patched_memread
self.settings = SettingsOpt(self, chipset)
self.nanddevice = NandDevice(self.settings)
self.setupregs()
self.get_flash_config()
return True
return True
else:
if not b"Invalid" in resp:
self.streaming_mode = self.Qualcomm
self.memread = self.qc_memread
self.settings = SettingsOpt(self, 0xFF)
return True
resp=self.hello()
if resp[0]:
if mode == 2:
self.__logger.info("Detected flash memory: %s" % resp[1].flashId.decode('utf-8'))
return True
self.streaming_mode = self.Patched
chipset = self.identify_chipset()
if chipset==0xFF:
self.streaming_mode = self.Qualcomm
if self.streaming_mode==self.Qualcomm:
self.__logger.info("Unpatched loader detected. Using standard QC mode. Limited methods supported: peek")
self.settings = SettingsOpt(self, chipset)
self.memread = self.qc_memread
else:
self.memread = self.patched_memread
self.settings = SettingsOpt(self, chipset)
if self.settings.bad_loader:
self.__logger.error("Loader id doesn't match device, please fix config and patch loader. Rebooting.")
self.reset()
return False
self.nanddevice = NandDevice(self.settings)
self.setupregs()
if self.cdc.pid == 0x900e or self.cdc.pid==0x0076:
print("Boot to 0x9008")
self.mempoke(0x193d100, 1)
# dload-mode-addr, TCSR_BOOT_MISC_DETECT, iomap.h
# msm8916,8939,8953 0x193d100
# msm8996 0x7b3000
self.mempeek(0x7980000)
self.cdc.close()
sys.exit(0)
if self.settings.bam:
self.disable_bam() # only for sahara
self.get_flash_config()
return True
cfg0 = self.mempeek(self.nanddevice.NAND_DEV0_CFG0)
sectorsize = (cfg0 & (0x3ff << 9)) >> 9
sparebytes = (cfg0 >> 23) & 0xf
self.__logger.info("HELLO protocol version: %i" % resp[1].version)
if self.streaming_mode==self.Patched:
self.__logger.info("Chipset: %s" % self.settings.chipname)
self.__logger.info("Base address of the NAND controller: %08x" % self.settings.nandbase)
self.__logger.info("Sector size: %d bytes" % sectorsize)
self.__logger.info("Spare bytes: %d bytes" % sparebytes)
markerpos = "spare" if self.nanddevice.BAD_BLOCK_IN_SPARE_AREA else "user"
self.__logger.info("Defective block marker position: %s+%x" % (markerpos, self.nanddevice.BAD_BLOCK_BYTE_NUM))
self.__logger.info("The total size of the flash memory = %u blocks (%i MB)" % (self.settings.MAXBLOCK,
self.settings.MAXBLOCK * self.settings.num_pages_per_blk / 1024 * self.settings.PAGESIZE / 1024))
info = b"\x01QCOM fast download protocol host\x03\x23\x23\x23\x20"
resp = self.send(info, True)
if b"Unrecognized flash device" in resp:
logging.error("Unrecognized flash device, patch loader !")
self.reset()
return False
resp = bytearray(resp)
if resp[1] != 2:
resp = self.send(info, True)
if len(resp) > 0x2c:
logging.info("Successfully uploaded programmer :)")
infolen = resp[0x2c]
if mode == 2:
logging.info("Detected flash memory: %s" % resp[0x2d:0x2d + infolen].decode('utf-8'))
return True
chipset = self.identify_chipset()
self.settings = SettingsOpt(self, chipset)
if self.settings.bad_loader:
logging.error("Loader id doesn't match device, please fix config and patch loader. Rebooting.")
self.reset()
return False
self.nanddevice = NandDevice(self.settings)
self.setupregs()
if self.cdc.pid == 0x900e or self.cdc.pid==0x0076:
print("Boot to 0x9008")
self.mempoke(0x193d100, 1)
# dload-mode-addr, TCSR_BOOT_MISC_DETECT, iomap.h
# msm8916,8939,8953 0x193d100
# msm8996 0x7b3000
self.mempeek(0x7980000)
self.cdc.close()
sys.exit(0)
if self.settings.bam:
self.disable_bam() # only for sahara
self.get_flash_config()
cfg0 = self.mempeek(self.nanddevice.NAND_DEV0_CFG0)
sectorsize = (cfg0 & (0x3ff << 9)) >> 9
sparebytes = (cfg0 >> 23) & 0xf
logging.info("HELLO protocol version: %i" % resp[0x22])
logging.info("Chipset: %s" % self.settings.chipname)
logging.info("Base address of the NAND controller: %08x" % self.settings.nandbase)
val = resp[0x2d:0x2d + infolen].decode('utf-8') if resp[0x2d] != 0x65 else ""
logging.info("Flash memory: %s %s, %s" % (self.settings.flash_mfr, val, self.settings.flash_descr))
# logging.info("Maximum packet size: %i byte",*((unsigned int*)&rbuf[0x24]))
logging.info("Sector size: %d bytes" % sectorsize)
logging.info("Page size: %d bytes (%d sectors)" % (self.settings.PAGESIZE, self.settings.sectors_per_page))
logging.info("The number of pages in the block: %d" % self.settings.num_pages_per_blk)
logging.info("OOB size: %d bytes" % self.settings.OOBSIZE)
val = resp[1].flashId.decode('utf-8') if resp[1].flashId[0] != 0x65 else ""
self.__logger.info("Flash memory: %s %s, %s" % (self.settings.flash_mfr, val, self.settings.flash_descr))
# self.__logger.info("Maximum packet size: %i byte",*((unsigned int*)&rbuf[0x24]))
self.__logger.info("Page size: %d bytes (%d sectors)" % (self.settings.PAGESIZE, self.settings.sectors_per_page))
self.__logger.info("The number of pages in the block: %d" % self.settings.num_pages_per_blk)
self.__logger.info("OOB size: %d bytes" % self.settings.OOBSIZE)
ecctype = "BCH" if self.settings.cfg1_enable_bch_ecc else "R-S"
logging.info("ECC: %s, %i bit" % (ecctype, self.settings.ecc_bit))
logging.info("ЕСС size: %d bytes" % self.settings.ecc_size)
logging.info("Spare bytes: %d bytes" % sparebytes)
markerpos = "spare" if self.nanddevice.BAD_BLOCK_IN_SPARE_AREA else "user"
logging.info("Defective block marker position: %s+%x" % (markerpos, self.nanddevice.BAD_BLOCK_BYTE_NUM))
logging.info("The total size of the flash memory = %u blocks (%i MB)" % (self.settings.MAXBLOCK,
self.settings.MAXBLOCK * self.settings.num_pages_per_blk / 1024 * self.settings.PAGESIZE / 1024))
self.__logger.info("ECC: %s, %i bit" % (ecctype, self.settings.ecc_bit))
self.__logger.info("ЕСС size: %d bytes" % self.settings.ecc_size)
return True
else:
logging.error("Uploaded programmer doesn't respond :(")
self.__logger.error("Uploaded programmer doesn't respond :(")
return False
def load_block(self, block, cwsize):
@ -680,6 +827,35 @@ class Streaming(metaclass=LogBase):
buffer.extend(tmp)
return buffer
def memtofile(self,offset,length,filename, info=True):
old=0
pos=0
toread=length
if info:
print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50)
with open(filename, "wb") as wf:
while toread > 0:
size = 0x20000
if self.streaming_mode==self.Qualcomm:
size = 0x200
if toread < size:
size = toread
data = self.memread(offset+pos, size)
if data != b"":
wf.write(data)
else:
break
toread -= size
pos+=size
if info:
prog = int(float(pos) / float(length) * float(100))
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete (Offset: %08X)' % (offset+pos), bar_length=50)
old = prog
if info:
print_progress(100, 100, prefix='Progress:', suffix='Complete', bar_length=50)
return True
def read_blocks(self, fw, block, length, cwsize, savespare=False, info=True):
badblocks = 0
old = 0
@ -771,17 +947,42 @@ class Streaming(metaclass=LogBase):
return False
def identify_chipset(self):
cmd = bytearray([0x11, 0x00, 0x04, 0x10, 0x2d, 0xe5, 0x0e, 0x00, 0xa0, 0xe1, 0x03, 0x00, 0xc0, 0xe3, 0xff, 0x30,
0x80, 0xe2, 0x34, 0x10, 0x9f, 0xe5, 0x04, 0x20, 0x90, 0xe4, 0x01, 0x00, 0x52, 0xe1, 0x03, 0x00,
0x00, 0x0a, 0x03, 0x00, 0x50, 0xe1, 0xfa, 0xff, 0xff, 0x3a, 0x00, 0x00, 0xa0, 0xe3, 0x00, 0x00,
0x00, 0xea, 0x00, 0x00, 0x90, 0xe5, 0x04, 0x10, 0x9d, 0xe4, 0x01, 0x00, 0xc1, 0xe5, 0xaa, 0x00,
0xa0, 0xe3, 0x00, 0x00, 0xc1, 0xe5, 0x02, 0x40, 0xa0, 0xe3, 0x1e, 0xff, 0x2f, 0xe1, 0xef, 0xbe,
0xad, 0xde])
"""
PUSH {R1}
MOV R0, LR
BIC R0, R0, #3
ADD R3, R0, #0xFF
LDR R1, =0xDEADBEEF
LDR R2, [R0],#4
CMP R2, R1
BEQ loc_30
CMP R0, R3
BCC loc_14
MOV R0, #0
B loc_34
LDR R0, [R0]
POP {R1}
STRB R0, [R1,#1]
MOV R0, #0xAA
STRB R0, [R1]
MOV R4, #2
BX LR
"""
search=unhexlify("04102de50e00a0e10300c0e3ff3080e234109fe5042090e4010052e10300000a030050e1" +
"faffff3a0000a0e3000000ea000090e504109de40100c1e5aa00a0e30000c1e50240a0e31eff2fe1efbeadde")
cmd = b"\x11\x00"+search
resp = self.send(cmd, True)
resp2=self.hdlc.receive_reply(5)
if b"Power off not supported" in resp:
self.streaming_mode = self.Qualcomm
self.memread = self.qc_memread
return 0xFF
if resp[1] != 0xaa:
resp = self.send(cmd, True)
if resp[1] != 0xaa:
return -1
self.streaming_mode = self.Patched
self.memread = self.patched_memread
return resp[2] # 08

View file

@ -4,7 +4,7 @@ import logging
from Library.streaming import Streaming
from binascii import hexlify, unhexlify
from struct import unpack, pack
from Library.utils import do_tcp_server, LogBase
from Library.utils import do_tcp_server, LogBase, getint
class streaming_client(metaclass=LogBase):
@ -59,11 +59,22 @@ class streaming_client(metaclass=LogBase):
def handle_streaming(self, cmd, options):
mode = 0
"""
offset = getint(options["<offset>"])
length = getint(options["<length>"])
filename = options["<filename>"]
self.streaming.streaming_mode=self.streaming.Qualcomm
self.streaming.memread=self.streaming.qc_memread
self.streaming.memtofile(offset, length, filename)
"""
if "<mode>" in options:
mode = options["<mode>"]
if self.streaming.connect(mode):
xflag = 0
self.streaming.nand_init(xflag)
res=self.streaming.hdlc.receive_reply(5)
if self.streaming.streaming_mode==self.streaming.Patched:
self.streaming.nand_init(xflag)
if cmd == "gpt":
directory = options["<directory>"]
if directory is None:
@ -106,21 +117,19 @@ class streaming_client(metaclass=LogBase):
self.__logger.error(f"Error: Couldn't detect partition: {partition}\nAvailable partitions:")
self.print_partitions(rpartitions)
elif cmd == "rs":
start = int(options["<start_sector>"])
sectors = int(options["<sectors>"])
sector = getint(options["<start_sector>"]) #Page
sectors = getint(options["<sectors>"])
filename = options["<filename>"]
self.printer(f"Dumping Sector {hex(start)} with Sectorcount {hex(sectors)}...")
block = 131
page = 0x20
data, extra = self.streaming.flash_read(block, page, sectors, self.streaming.settings.UD_SIZE_BYTES)
try:
with open(filename, "wb") as write_handle:
write_handle.write(data)
self.printer(f"Dumped sector {str(start)} with sector count {str(sectors)} as {filename}.")
return
except Exception as error:
self.__logger.error(f"Couldn't open {filename} for writing: %s" % str(error))
self.streaming.nand_post()
self.printer(f"Dumping at Sector {hex(sector)} with Sectorcount {hex(sectors)}...")
if self.streaming.read_sectors(sector,sectors,filename,True):
self.printer(f"Dumped sector {str(sector)} with sector count {str(sectors)} as {filename}.")
elif cmd == "rf":
sector = 0
sectors = self.streaming.settings.MAXBLOCK*self.streaming.settings.num_pages_per_blk*self.streaming.settings.sectors_per_page
filename = options["<filename>"]
self.printer(f"Dumping Flash from sector 0 to sector {hex(sectors)}...")
if self.streaming.read_sectors(sector,sectors,filename,True):
self.printer(f"Dumped sector {str(sector)} with sector count {str(sectors)} as {filename}.")
elif cmd == "rl":
directory = options["<directory>"]
if options["--skip"]:
@ -156,40 +165,30 @@ class streaming_client(metaclass=LogBase):
f"{filename}.")
self.streaming.read_raw(offset, length, self.streaming.settings.UD_SIZE_BYTES, partfilename)
elif cmd == "peek":
offset = int(options["<offset>"], 16)
length = int(options["<length>"], 16)
offset = getint(options["<offset>"])
length = getint(options["<length>"])
filename = options["<filename>"]
with open(filename, "wb") as wf:
while length > 0:
size = 0x20000
if length < size:
size = length
data = self.streaming.memread(offset, size)
if data != b"":
wf.write(data)
else:
break
length -= size
self.__logger.info(
f"Peek data from offset {hex(offset)} and length {hex(length)} was written to {filename}")
if self.streaming.memtofile(offset,length,filename):
self.__logger.info(
f"Peek data from offset {hex(offset)} and length {hex(length)} was written to {filename}")
elif cmd == "peekhex":
offset = int(options["<offset>"], 16)
length = int(options["<length>"], 16)
offset = getint(options["<offset>"])
length = getint(options["<length>"])
resp = self.streaming.memread(offset, length)
self.printer("\n")
self.printer(hexlify(resp))
elif cmd == "peekqword":
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
resp = self.streaming.memread(offset, 8)
self.printer("\n")
self.printer(hex(unpack("<Q", resp[:8])[0]))
elif cmd == "peekdword":
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
resp = self.streaming.mempeek(offset)
self.printer("\n")
self.printer(hex(resp))
elif cmd == "poke":
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
filename = unhexlify(options["<filename>"])
try:
with open(filename, "rb") as rf:
@ -201,22 +200,22 @@ class streaming_client(metaclass=LogBase):
except Exception as e:
self.__logger.error(str(e))
elif cmd == "pokehex":
offset = int(options["<offset>"], 16)
offset = getint(options["<offset>"])
data = unhexlify(options["<data>"])
if self.streaming.memwrite(offset, data):
self.__logger.info("Poke succeeded.")
else:
self.__logger.error("Poke failed.")
elif cmd == "pokeqword":
offset = int(options["<offset>"], 16)
data = pack("<Q", int(options["<data>"], 16))
offset = getint(options["<offset>"])
data = pack("<Q", getint(options["<data>"]))
if self.streaming.memwrite(offset, data):
self.__logger.info("Poke succeeded.")
else:
self.__logger.error("Poke failed.")
elif cmd == "pokedword":
offset = int(options["<offset>"], 16)
data = pack("<I", int(options["<data>"], 16))
offset = getint(options["<offset>"])
data = pack("<I", getint(options["<data>"]))
if self.streaming.mempoke(offset, data):
self.__logger.info("Poke succeeded.")
else:
@ -281,8 +280,8 @@ class streaming_client(metaclass=LogBase):
elif cmd == "memcpy":
if not self.check_param(["<offset>", "<size>"]):
return False
srcoffset = int(options["<offset>"], 16)
size = int(options["<size>"], 16)
srcoffset = getint(options["<offset>"])
size = getint(options["<size>"])
dstoffset = srcoffset + size
if self.streaming.cmd_memcpy(dstoffset, srcoffset, size):
self.printer(f"Memcpy from {hex(srcoffset)} to {hex(dstoffset)} succeeded")
@ -291,6 +290,7 @@ class streaming_client(metaclass=LogBase):
return False
###############################
elif cmd == "nop":
#resp=self.streaming.send(b"\x7E\x09")
self.__logger.error("Nop command isn't supported by streaming loader")
return True
elif cmd == "setbootablestoragedrive":
@ -304,10 +304,19 @@ class streaming_client(metaclass=LogBase):
return False
partitionname = options["<partitionname>"]
filename = options["<filename>"]
partitionfilename=""
if "--partitionfilename" in options:
partitionfilename = options["--partitionfilename"]
if not os.path.exists(partitionfilename):
self.__logger.error(f"Error: Couldn't find partition file: {partitionfilename}")
return False
if not os.path.exists(filename):
self.__logger.error(f"Error: Couldn't find file: {filename}")
return False
rpartitions = self.streaming.get_partitions()
if partitionfilename=="":
rpartitions = self.streaming.get_partitions()
else:
rpartitions = self.streaming.get_partitions(partitionfilename)
if self.streaming.enter_flash_mode():
if partitionname in rpartitions:
spartition = rpartitions[partitionname]

View file

@ -255,9 +255,9 @@ class usb_class(metaclass=LogBase):
pos += pktsize
except:
# print("Error while writing")
time.sleep(0.05)
time.sleep(0.01)
i += 1
if i == 5:
if i == 3:
return False
pass
self.verify_data(bytearray(command), "TX:")
@ -280,6 +280,9 @@ class usb_class(metaclass=LogBase):
self.__logger.debug("Timed out")
self.__logger.debug(tmp)
return bytearray(tmp)
elif "Overflow" in error:
self.__logger.error("USB Overflow")
sys.exit(0)
elif e.errno is not None:
print(repr(e), type(e), e.errno)
sys.exit(0)

View file

@ -143,6 +143,15 @@ def parse_args(cmd, args, mainargs):
options["<xmlstring>"] = opts[0]
return options
def getint(valuestr):
try:
return int(valuestr)
except:
try:
return int(valuestr,16)
except:
return 0
class ColorFormatter(logging.Formatter):
LOG_COLORS = {
logging.ERROR: colorama.Fore.RED,

132
edl.py
View file

@ -17,7 +17,7 @@ Usage:
edl.py rl <directory> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skip=partnames] [--genxml] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid]
edl.py rf <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid]
edl.py rs <start_sector> <sectors> <filename> [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid]
edl.py w <partitionname> <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skipwrite] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value]
edl.py w <partitionname> <filename> [--partitionfilename=filename] [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skipwrite] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value]
edl.py wl <directory> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skip=partnames] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value]
edl.py wf <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--skipresponse] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value]
edl.py ws <start_sector> <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skipwrite] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value]
@ -56,7 +56,7 @@ Description:
rl <directory> [--memory=memtype] [--lun=lun] [--skip=partname] # Read all partitions from flash to a directory
rf <filename> [--memory=memtype] [--lun=lun] # Read whole flash to file
rs <start_sector> <sectors> <filename> [--lun=lun] # Read sectors starting at start_sector to filename
w <partitionname> <filename> [--memory=memtype] [--lun=lun] [--skipwrite] # Write filename to partition to flash
w <partitionname> <filename> [--partitionfilename=filename] [--memory=memtype] [--lun=lun] [--skipwrite] # Write filename to partition to flash
wl <directory> [--memory=memtype] [--lun=lun] # Write all files from directory to flash
wf <filename> [--memory=memtype] [--lun=lun] # Write whole filename to flash
ws <start_sector> <filename> [--memory=memtype] [--lun=lun] [--skipwrite] # Write filename to flash at start_sector
@ -65,7 +65,7 @@ Description:
ep <partitionname> <sectors> [--memory=memtype] [--skipwrite] [--lun=lun] # Erase sector count from flash partition
footer <filename> [--memory=memtype] [--lun=lun] # Read crypto footer from flash
peek <offset> <length> <filename> # Dump memory at offset with given length to filename
peekhex <offset> <length> # Dump memory at offset and given length as hex string
peekhex <offset> <length> # Dump memory at offset and given length
peekdword <offset> # Dump DWORD at memory offset
peekqword <offset> # Dump QWORD at memory offset
memtbl <filename> # Dump memory table to file
@ -96,8 +96,9 @@ Options:
--pid=pid Set usb product id used for EDL [default: -1]
--lun=lun Set lun to read/write from (UFS memory only) [default: None]
--maxpayload=bytes Set the maximum payload for EDL [default: 0x100000]
--sectorsize=bytes Set default sector size [default: 0x200]
--sectorsize=bytes Set default sector size
--memory=memtype Set memory type ("NAND", "eMMC", "UFS", "spinor")
--partitionfilename=filename Set partition table as filename for streaming mode
--skipwrite Do not allow any writes to flash (simulate only)
--skipresponse Do not expect a response from phone on read/write (some Qualcomms)
--skipstorageinit Skip storage initialisation
@ -115,8 +116,11 @@ import os
import sys
import time
import logging
import subprocess
import re
from docopt import docopt
args = docopt(__doc__, version='3')
default_ids = [
@ -140,37 +144,44 @@ from Library.streaming import Streaming
print("Qualcomm Sahara / Firehose Client V3.2 (c) B.Kerler 2018-2021.")
class main(metaclass=LogBase):
def doconnect(self,cdc, loop, mode, resp, sahara):
while not cdc.connected:
cdc.connected = cdc.connect()
if not cdc.connected:
def doconnect(self, loop, mode, resp):
while not self.cdc.connected:
self.cdc.connected = self.cdc.connect()
if not self.cdc.connected:
sys.stdout.write('.')
if loop == 5:
sys.stdout.write('\n')
self.__logger.info("Hint: Press and hold vol up+dwn, connect usb. For some, only use vol up.")
self.__logger.info("Xiaomi: Press and hold Vol up + pwr, in fastboot mode connect usb.\n" +
" Run \"./fastboot oem edl\".")
self.__logger.info("Other: Run \"adb reboot edl\".")
sys.stdout.write('\n')
if loop >= 20:
sys.stdout.write('\n')
loop = 0
loop = 6
loop += 1
time.sleep(1)
sys.stdout.flush()
else:
self.__logger.info("Device detected :)")
try:
mode, resp = sahara.connect()
if mode == "" or resp == -1:
mode, resp = sahara.connect()
mode, resp = self.sahara.connect()
except Exception as e:
if mode == "" or resp == -1:
mode, resp = sahara.connect()
mode, resp = self.sahara.connect()
if mode == -1:
mode, resp = self.sahara.connect()
if mode == "":
self.__logger.info("Unknown mode. Aborting.")
cdc.close()
sys.exit()
self.exit()
self.__logger.info(f"Mode detected: {mode}")
break
return mode, resp
def exit(self,cdc):
cdc.close()
def exit(self):
self.cdc.close()
sys.exit()
def parse_option(self,args):
@ -260,7 +271,18 @@ class main(metaclass=LogBase):
cmd = "qfil"
return cmd
def console_cmd(self,cmd):
read = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, \
stderr=subprocess.STDOUT, close_fds=True)
output = read.stdout.read().decode()
return output
def run(self):
if sys.platform == 'win32' or sys.platform == 'win64' or sys.platform == 'winnt':
proper_driver = self.console_cmd(r'reg query HKLM\HARDWARE\DEVICEMAP\SERIALCOMM')
if re.findall(r'QCUSB', str(proper_driver)):
self.__logger.warning(f'Please first install libusb_win32 driver from Zadig')
mode = ""
loop = 0
vid = int(args["--vid"], 16)
@ -280,52 +302,53 @@ class main(metaclass=LogBase):
else:
self.__logger.setLevel(logging.INFO)
cdc = usb_class(portconfig=portconfig,loglevel=self.__logger.level)
saharahdl = sahara(cdc,loglevel=self.__logger.level)
self.cdc = usb_class(portconfig=portconfig,loglevel=self.__logger.level)
self.sahara = sahara(self.cdc,loglevel=self.__logger.level)
if args["--loader"] == 'None':
self.__logger.info("Trying with no loader given ...")
saharahdl.programmer = ""
self.sahara.programmer = ""
else:
loader = args["--loader"]
self.__logger.info(f"Using loader {loader} ...")
saharahdl.programmer = loader
self.sahara.programmer = loader
self.__logger.info("Waiting for the device")
resp = None
cdc.timeout = 100
mode, resp = self.doconnect(cdc, loop, mode, resp, saharahdl)
self.cdc.timeout = 100
mode, resp = self.doconnect(loop, mode, resp)
if resp == -1:
mode, resp = self.doconnect(cdc, loop, mode, resp, saharahdl)
mode, resp = self.doconnect(loop, mode, resp)
if resp == -1:
self.__logger.error("USB desync, please rerun command !")
sys.exit()
self.exit()
# print((mode, resp))
if mode == "sahara":
if resp==None:
if mode=="sahara":
print("Sahara in error state, resetting ...")
saharahdl.cmd_reset()
exit(cdc)
self.sahara.cmd_reset()
data=self.cdc.read(5)
self.exit()
elif "mode" in resp:
mode = resp["mode"]
if mode == saharahdl.sahara_mode.SAHARA_MODE_MEMORY_DEBUG:
if mode == self.sahara.sahara_mode.SAHARA_MODE_MEMORY_DEBUG:
if args["memorydump"]:
time.sleep(0.5)
print("Device is in memory dump mode, dumping memory")
saharahdl.debug_mode()
exit(cdc)
self.sahara.debug_mode()
self.exit()
else:
print("Device is in streaming mode, uploading loader")
cdc.timeout = None
sahara_info = saharahdl.streaminginfo()
self.cdc.timeout = None
sahara_info = self.sahara.streaminginfo()
if sahara_info:
mode, resp = saharahdl.connect()
mode, resp = self.sahara.connect()
if mode == "sahara":
mode = saharahdl.upload_loader()
if "enprg" in saharahdl.programmer.lower():
mode = self.sahara.upload_loader()
if "enprg" in self.sahara.programmer.lower():
mode = "load_enandprg"
elif "nprg" in saharahdl.programmer.lower():
elif "nprg" in self.sahara.programmer.lower():
mode = "load_nandprg"
elif mode!="":
mode = "load_" + mode
@ -333,53 +356,54 @@ class main(metaclass=LogBase):
time.sleep(0.3)
else:
print("Error, couldn't find suitable enprg/nprg loader :(")
exit(cdc)
self.exit()
else:
print("Device is in EDL mode .. continuing.")
cdc.timeout = None
sahara_info = saharahdl.info()
self.cdc.timeout = None
sahara_info = self.sahara.info()
if sahara_info:
mode, resp = saharahdl.connect()
mode, resp = self.sahara.connect()
if mode == "sahara":
mode = saharahdl.upload_loader()
mode = self.sahara.upload_loader()
if mode == "firehose":
if "enprg" in saharahdl.programmer.lower():
if "enprg" in self.sahara.programmer.lower():
mode = "enandprg"
elif "nprg" in saharahdl.programmer.lower():
elif "nprg" in self.sahara.programmer.lower():
mode = "nandprg"
if mode != "":
if mode != "firehose":
streaming = Streaming(cdc, saharahdl, self.__logger.level)
streaming = Streaming(self.cdc, self.sahara, self.__logger.level)
if streaming.connect(1):
print("Successfully uploaded programmer :)")
mode = "nandprg"
else:
print("Device is in an unknown state")
exit(cdc)
self.exit()
else:
print("Successfully uploaded programmer :)")
else:
print("No suitable loader found :(")
exit(cdc)
self.exit()
else:
print("Device is in an unknown sahara state")
print("Device is in an unknown sahara state, resetting")
print("resp={0}".format(resp))
exit(cdc)
self.sahara.cmd_reset()
self.exit()
else:
print("Device is in an unknown state")
exit(cdc)
self.exit()
else:
saharahdl.bit64 = True
self.sahara.bit64 = True
if mode == "firehose":
cdc.timeout = None
fh = firehose_client(args, cdc, saharahdl, self.__logger.level,print)
self.cdc.timeout = None
fh = firehose_client(args, self.cdc, self.sahara, self.__logger.level,print)
cmd=self.parse_cmd(args)
options=self.parse_option(args)
if cmd!="":
fh.handle_firehose(cmd,options)
elif mode == "nandprg" or mode == "enandprg" or mode == "load_nandprg" or mode == "load_enandprg":
sc = streaming_client(args, cdc, saharahdl, self.__logger.level,print)
sc = streaming_client(args, self.cdc, self.sahara, self.__logger.level,print)
cmd = self.parse_cmd(args)
options = self.parse_option(args)
if "load_" in mode:
@ -390,7 +414,7 @@ class main(metaclass=LogBase):
else:
self.__logger.error("Sorry, couldn't talk to Sahara, please reboot the device !")
exit(cdc)
self.exit()
if __name__ == '__main__':

BIN
fastpwn Executable file

Binary file not shown.

View file

@ -26,6 +26,7 @@ class vendor(Enum):
quectel = 0x2c7c
zte = 0x19d2
netgear = 0x0846
telit = 0x413c
class deviceclass:
vid=0
@ -35,7 +36,7 @@ class deviceclass:
self.pid=pid
class connection:
class connection(metaclass=LogBase):
def __init__(self, port=""):
self.serial = None
self.tn = None
@ -84,7 +85,8 @@ class connection:
0x1199:["Sierra Wireless",3],
0x2c7c:["Quectel",3],
0x19d2:["ZTE",2],
0x0846:["Netgear", 2]
0x0846:["Netgear", 2],
0x413c:["Telit",0]
}
mode="Unknown"
for device in self.detectusbdevices():
@ -101,6 +103,19 @@ class connection:
if self.websend(url):
mode="AT"
break
elif device.vid==vendor.telit.value:
if device.pid==0x81d7:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Diag mode")
print("Sending download mode command")
interface = 5
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x413c, 0x81d7, interface]])
if diag.connect():
data=diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
if res[0]==0x4B:
print("Sending download mode succeeded")
diag.disconnect()
break
if mode=="AT" or mode=="Unknown":
for port in self.getserialports():
if port.vid in vendortable:
@ -173,6 +188,8 @@ class connection:
data["vendor"]="ZTE"
elif "Netgear" in data["manufacturer"]:
data["vendor"]="Netgear"
elif "Telit" in data["manufacturer"]:
data["vendor"]="Telit"
return data
class dwnloadtools(metaclass=LogBase):
@ -201,6 +218,15 @@ class dwnloadtools(metaclass=LogBase):
res = diag.send(b"\x4b\x65\x01\x00")
diag.disconnect()
print("Done switching to download mode")
elif info["vendor"]=="Telit":
print("Sending download mode command")
interface=0
diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x2c7c,0x0125,interface]])
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
diag.disconnect()
print("Done switching to download mode")
elif info["vendor"]=="ZTE":
print("Sending download mode command")
interface=0