Version 3.1. Add old device support

This commit is contained in:
Bjoern Kerler 2021-01-07 14:42:49 +01:00
parent 967220426a
commit de9975b78f
20 changed files with 778 additions and 242 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -127,10 +127,17 @@ class qualcomm_firehose:
except Exception as e:
status = True
self.log.debug(str(e))
self.log.debug("Error on getting xml response:" + rdata.decode('utf-8'))
if isinstance(rdata,bytes) or isinstance(rdata,bytearray):
try:
self.log.debug("Error on getting xml response:" + rdata.decode('utf-8'))
except:
self.log.debug("Error on getting xml response:" + hexlify(rdata).decode('utf-8'))
elif isinstance(rdata,str):
self.log.debug("Error on getting xml response:" + rdata)
return [status, {"value": "NAK"}, rdata]
else:
status = True
resp = {"value":"ACK"}
return [status, resp, rdata]
def cmd_reset(self):
@ -506,63 +513,68 @@ class qualcomm_firehose:
return b""
def cmd_read_buffer(self, physical_partition_number, start_sector, num_partition_sectors, display=True):
maxsectors=self.cfg.MaxPayloadSizeToTargetInBytes//self.cfg.SECTOR_SIZE_IN_BYTES
total = num_partition_sectors*self.cfg.SECTOR_SIZE_IN_BYTES
dataread = 0
old = 0
prog = 0
if display:
self.log.info(
f"\nReading from physical partition {str(physical_partition_number)}, " + \
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
rsp = self.xmlsend(data, self.skipresponse)
resData = bytearray()
if rsp[0]:
if "value" in rsp[1]:
if rsp[1]["value"] == "NAK":
if display:
self.log.error(rsp[2].decode('utf-8'))
return b""
bytesToRead = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = bytesToRead
dataread = 0
old = 0
prog = 0
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
while bytesToRead > 0:
tmp = self.cdc.read(self.cfg.MaxPayloadSizeToTargetInBytes)
bytesToRead -= len(tmp)
dataread += len(tmp)
resData += tmp
prog = int(float(dataread) / float(total) * float(100))
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
if display and prog != 100:
print_progress(100, 100, prefix='Progress:', suffix='Complete', bar_length=50)
# time.sleep(0.2)
info = self.xml.getlog(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
rsp = self.xml.getresponse(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
if "value" in rsp:
if rsp["value"] == "ACK":
return resData
else:
self.log.error(f"Error:")
for line in info:
self.log.error(line)
return b""
if num_partition_sectors<maxsectors:
maxsectors=num_partition_sectors
resData=bytearray()
for cursector in range(start_sector,start_sector+num_partition_sectors,maxsectors):
bytesToRead = self.cfg.SECTOR_SIZE_IN_BYTES * maxsectors
data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{maxsectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{cursector}\"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
if rsp[0]:
if "value" in rsp[1]:
if rsp[1]["value"] == "NAK":
if display:
self.log.error(rsp[2].decode('utf-8'))
return resData
while bytesToRead > 0:
tmp = self.cdc.read(self.cfg.MaxPayloadSizeToTargetInBytes)
resData += tmp
bytesToRead -= len(tmp)
dataread += len(tmp)
prog = int(float(dataread) / float(total) * float(100))
if prog > old:
if display:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
info = self.xml.getlog(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
rsp = self.xml.getresponse(self.cdc.read(self.cfg.MaxXMLSizeInBytes))
if "value" in rsp:
if rsp["value"] != "ACK":
self.log.error(f"Error:")
for line in info:
self.log.error(line)
return resData
else:
return resData
else:
if display:
self.log.error(f"Error:{rsp[2]}")
return b""
if display:
self.log.error(f"Error:{rsp[2]}")
if display and prog != 100:
print_progress(100, 100, prefix='Progress:', suffix='Complete', bar_length=50)
return resData # Do not remove, needed for oneplus
def get_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba):
data = self.cmd_read_buffer(lun, 0, 2, False)
try:
data = self.cmd_read_buffer(lun, 0, 2, False)
except:
self.skipresponse=True
data = self.cmd_read_buffer(lun, 0, 2, False)
if data == b"":
return None, None
guid_gpt = gpt(
@ -575,6 +587,8 @@ class qualcomm_firehose:
sectors = header["first_usable_lba"]
if sectors == 0:
return None, None
if sectors>34:
sectors=34
data = self.cmd_read_buffer(lun, 0, sectors, False)
if data == b"":
return None, None

View file

@ -42,11 +42,13 @@ class firehose_client:
self.target_name = self.firehose.cfg.TargetName
if "hwid" in dir(sahara):
if sahara.hwid is not None:
hwid = sahara.hwid >> 32
hwid = (sahara.hwid >> 32) & 0xFFFFFF
socid = ((sahara.hwid >> 32)>>16)
if hwid in msmids:
self.target_name = msmids[hwid]
elif hwid in sochw:
self.target_name = sochw[hwid].split(",")[0]
self.LOGGER.info(f"Target detected: {self.target_name}")
elif socid in sochw:
self.target_name = sochw[socid].split(",")[0]
def check_cmd(self, func):
if not self.supported_functions:

View file

@ -118,17 +118,19 @@ class hdlc:
return None
return out
def receive_reply(self):
def receive_reply(self,timeout=None):
replybuf = bytearray()
tmp = self.cdc.read(MAX_PACKET_LEN, self.timeout)
if timeout==None:
timeout=self.timeout
tmp = self.cdc.read(MAX_PACKET_LEN, timeout)
if tmp == bytearray():
return 0
if tmp == b"":
return 0
retry = 0
while tmp[-1] != 0x7E:
time.sleep(0.05)
tmp += self.cdc.read(MAX_PACKET_LEN, self.timeout)
time.sleep(0.01)
tmp += self.cdc.read(MAX_PACKET_LEN, timeout)
retry += 1
if retry > 5:
break
@ -141,8 +143,8 @@ class hdlc:
if crc16val != reccrc:
return -1
else:
time.sleep(0.5)
data = self.cdc.read(MAX_PACKET_LEN, self.timeout)
time.sleep(0.01)
data = self.cdc.read(MAX_PACKET_LEN, timeout)
if len(data) > 3:
crc16val = self.crc16(0xFFFF, data[:-3])
reccrc = int(data[-3]) + (int(data[-2]) << 8)
@ -151,9 +153,11 @@ class hdlc:
return data
return data[:-3]
def receive_reply_nocrc(self):
def receive_reply_nocrc(self,timeout=None):
replybuf = bytearray()
tmp = self.cdc.read(MAX_PACKET_LEN, self.timeout)
if timeout==None:
timeout=self.timeout
tmp = self.cdc.read(MAX_PACKET_LEN, timeout)
if tmp == bytearray():
return 0
if tmp == b"":
@ -161,7 +165,7 @@ class hdlc:
retry = 0
while tmp[-1] != 0x7E:
# time.sleep(0.05)
tmp += self.cdc.read(MAX_PACKET_LEN, self.timeout)
tmp += self.cdc.read(MAX_PACKET_LEN, timeout)
retry += 1
if retry > 5:
break
@ -174,7 +178,7 @@ class hdlc:
return data[:-3]
else:
time.sleep(0.5)
data = self.cdc.read(MAX_PACKET_LEN, self.timeout)
data = self.cdc.read(MAX_PACKET_LEN, timeout)
if len(data) > 3:
# crc16val = self.crc16(0xFFFF, data[:-3])
# reccrc = int(data[-3]) + (int(data[-2]) << 8)

View file

@ -270,11 +270,11 @@ class SettingsOpt:
self.chipname, self.bam, self.nandbase, self.bcraddr, self.secureboot, self.pbl, self.qfprom, self.memtbl=config_tbl[chipset]
self.bad_loader = 0
else:
loadername = parent.sahara.programmer.lower()
for chipid in config_tbl:
loadername=parent.sahara.loader
if config_tbl[chipid][0] in loadername:
self.chipname, self.bam, self.nandbase, self.bcraddr, self.secureboot, self.pbl, self.qfprom, self.memtbl = \
config_tbl[chipset]
config_tbl[chipid]
self.bad_loader = 0
class nand_toshiba_ids(ctypes.LittleEndianStructure):

View file

@ -1,21 +1,29 @@
import binascii
import time
from Library.utils import *
from Config.qualcomm_config import *
import os
import sys
from struct import unpack, pack
from Library.utils import logging, read_object, print_progress, rmrf
from Config.qualcomm_config import sochw, msmids
logger = logging.getLogger(__name__)
def convertmsmid(msmid):
if int(msmid, 16) in sochw:
names = sochw[int(msmid, 16)].split(",")
msmiddb=[]
if int(msmid,16)&0xFF==0xe1:
return [msmid]
socid=(int(msmid, 16)>>16)
if socid in sochw:
names = sochw[socid].split(",")
for name in names:
for ids in msmids:
if msmids[ids] == name:
msmid = hex(ids)[2:].lower()
while len(msmid) < 8:
msmid = '0' + msmid
return msmid
rmsmid = hex(ids)[2:].lower()
while len(rmsmid) < 8:
rmsmid = '0' + rmsmid
msmiddb.append(rmsmid)
return msmiddb
class qualcomm_sahara:
@ -160,14 +168,14 @@ class qualcomm_sahara:
msmid = hwid[:8]
devid = hwid[8:]
pkhash = filename.split("_")[1].lower()
msmid = convertmsmid(msmid)
mhwid = convertmsmid(msmid) + devid
if mhwid not in loaderdb:
loaderdb[mhwid] = {}
if pkhash not in loaderdb[mhwid]:
loaderdb[mhwid][pkhash] = fn
else:
loaderdb[mhwid][pkhash].append(fn)
for msmid in convertmsmid(msmid):
mhwid = (msmid + devid).lower()
if mhwid not in loaderdb:
loaderdb[mhwid] = {}
if pkhash not in loaderdb[mhwid]:
loaderdb[mhwid][pkhash] = fn
else:
loaderdb[mhwid][pkhash].append(fn)
except:
continue
self.loaderdb = loaderdb
@ -285,7 +293,6 @@ class qualcomm_sahara:
self.oem_id = None
self.model_id = None
self.oem_str = None
self.model_id = None
self.msm_str = None
def get_rsp(self):
@ -327,7 +334,7 @@ class qualcomm_sahara:
cmd = self.cmd.SAHARA_HELLO_RSP
length = 0x30
version = self.SAHARA_VERSION
responsedata = struct.pack("<IIIIIIIIIIII", cmd, length, version, version_min, max_cmd_len, mode, 0, 0, 0, 0, 0, 0)
responsedata = pack("<IIIIIIIIIIII", cmd, length, version, version_min, max_cmd_len, mode, 0, 0, 0, 0, 0, 0)
try:
self.cdc.write(responsedata)
return True
@ -404,12 +411,12 @@ class qualcomm_sahara:
def cmdexec_get_serial_num(self):
res = self.cmd_exec(self.exec_cmd.SAHARA_EXEC_CMD_SERIAL_NUM_READ)
return struct.unpack("<I", res)[0]
return unpack("<I", res)[0]
def cmdexec_get_msm_hwid(self):
res = self.cmd_exec(self.exec_cmd.SAHARA_EXEC_CMD_MSM_HW_ID_READ)
try:
return struct.unpack("<Q", res[0:0x8])[0]
return unpack("<Q", res[0:0x8])[0]
except Exception as e:
return None
@ -423,7 +430,7 @@ class qualcomm_sahara:
def cmdexec_get_sbl_version(self):
res = self.cmd_exec(self.exec_cmd.SAHARA_EXEC_CMD_GET_SOFTWARE_VERSION_SBL)
return struct.unpack("<I", res)[0]
return unpack("<I", res)[0]
def cmdexec_switch_to_dmss_dload(self):
res = self.cmd_exec(self.exec_cmd.SAHARA_EXEC_CMD_SWITCH_TO_DMSS_DLOAD)
@ -447,7 +454,7 @@ class qualcomm_sahara:
self.sblversion = "{:08x}".format(self.cmdexec_get_sbl_version())
if self.hwid is not None:
self.hwidstr = "{:016x}".format(self.hwid)
self.msm_id = int(self.hwidstr[0:8], 16)
self.msm_id = int(self.hwidstr[2:8], 16)
self.oem_id = int(self.hwidstr[-8:-4], 16)
self.model_id = int(self.hwidstr[-4:], 16)
self.oem_str = "{:04x}".format(self.oem_id)
@ -518,7 +525,7 @@ class qualcomm_sahara:
return False
def cmd_done(self):
if self.cdc.write(struct.pack("<II", self.cmd.SAHARA_DONE_REQ, 0x8)):
if self.cdc.write(pack("<II", self.cmd.SAHARA_DONE_REQ, 0x8)):
cmd, pkt = self.get_rsp()
time.sleep(0.3)
if cmd["cmd"] == self.cmd.SAHARA_DONE_RSP:
@ -531,7 +538,7 @@ class qualcomm_sahara:
return False
def cmd_reset(self):
self.cdc.write(struct.pack("<II", self.cmd.SAHARA_RESET_REQ, 0x8))
self.cdc.write(pack("<II", self.cmd.SAHARA_RESET_REQ, 0x8))
cmd, pkt = self.get_rsp()
if cmd["cmd"] == self.cmd.SAHARA_RESET_RSP:
return True
@ -558,12 +565,12 @@ class qualcomm_sahara:
except Exception as e:
pass
if self.bit64:
if not self.cdc.write(struct.pack("<IIQQ", self.cmd.SAHARA_64BIT_MEMORY_READ, 0x8 + 8 + 8, addr + pos,
if not self.cdc.write(pack("<IIQQ", self.cmd.SAHARA_64BIT_MEMORY_READ, 0x8 + 8 + 8, addr + pos,
length)):
return None
else:
if not self.cdc.write(
struct.pack("<IIII", self.cmd.SAHARA_MEMORY_READ, 0x8 + 4 + 4, addr + pos, length)):
pack("<IIII", self.cmd.SAHARA_MEMORY_READ, 0x8 + 4 + 4, addr + pos, length)):
return None
while length > 0:
try:
@ -677,7 +684,6 @@ class qualcomm_sahara:
return False
def upload_loader(self):
programmer = None
if self.programmer == "":
return ""
try:
@ -722,10 +728,13 @@ class qualcomm_sahara:
data_offset = pkt["data_offset"]
data_len = pkt["data_len"]
if data_offset+data_len>len(programmer):
while len(programmer)<data_offset+data_len:
programmer+=b"\xFF"
data_to_send = programmer[data_offset:data_offset + data_len]
self.cdc.write(data_to_send, self.pktsize)
datalen -= data_len
print("Ended")
logger.info("Loader uploaded.")
cmd, pkt = self.get_rsp()
if cmd["cmd"] == self.cmd.SAHARA_END_TRANSFER:
if pkt["status"] == self.status.SAHARA_STATUS_SUCCESS:
@ -734,21 +743,21 @@ class qualcomm_sahara:
return ""
except Exception as e:
logger.error("Unexpected error on uploading, maybe signature of loader wasn't accepted ?\n" + str(e))
return False
return ""
def cmd_modeswitch(self, mode):
data = struct.pack("<III", self.cmd.SAHARA_SWITCH_MODE, 0xC, mode)
data = pack("<III", self.cmd.SAHARA_SWITCH_MODE, 0xC, mode)
self.cdc.write(data)
def cmd_exec(self, mcmd): # CMD 0xD, RSP 0xE, CMD2 0xF
# Send request
data = struct.pack("<III", self.cmd.SAHARA_EXECUTE_REQ, 0xC, mcmd)
data = pack("<III", self.cmd.SAHARA_EXECUTE_REQ, 0xC, mcmd)
self.cdc.write(data)
# Get info about request
cmd, pkt = self.get_rsp()
if cmd["cmd"] == self.cmd.SAHARA_EXECUTE_RSP:
# Ack
data = struct.pack("<III", self.cmd.SAHARA_EXECUTE_DATA, 0xC, mcmd)
data = pack("<III", self.cmd.SAHARA_EXECUTE_DATA, 0xC, mcmd)
self.cdc.write(data)
payload = self.cdc.read(pkt["data_len"])
return payload

View file

@ -11,6 +11,7 @@ class QualcommStreaming:
self.cdc = cdc
self.hdlc = hdlc(self.cdc)
self.mode = sahara.mode
self.sahara=sahara
self.settings = None
self.flashinfo = None
self.bbtbl = {}
@ -631,9 +632,12 @@ class QualcommStreaming:
self.nanddevice = NandDevice(self.settings)
self.setupregs()
if self.cdc.pid == 0x900e:
if self.cdc.pid == 0x900e or self.cdc.pid==0x0076:
print("Boot to 0x9008")
self.mempoke(0x193d100, 1)
# dload-mode-addr, TCSR_BOOT_MISC_DETECT, iomap.h
# msm8916,8939,8953 0x193d100
# msm8996 0x7b3000
self.mempeek(0x7980000)
self.cdc.close()
sys.exit(0)
@ -796,7 +800,13 @@ def test_nand_config():
[0x1590acc8, 8, 512, 2048, 131072, 64, 4, 0x2a0408c0, 0x0804745c, 0x00000203, 0x42040700, 0x000001d1],
[0x1d00f101, 8, 128, 2048, 131072, 64, 4, 0x2a0408c0, 0x0804745c, 0x00000203, 0x42040700, 0x000001d1],
[0x1d80f101, 8, 128, 2048, 131072, 64, 4, 0x2a0408c0, 0x0804745c, 0x00000203, 0x42040700, 0x000001d1],
#Sierra 9x15
[0x1900aaec, 8, 256, 2048, 131072, 64, 8, 0x2a0408c0, 0x0804745c, 0x00000203, 0x42040700, 0x000001d1],
#[0x1590aa98, 8, 256, 2048, 131072, 64, 8, 0xa8d408c0, 0x0004745c, 0x00000203, 0x42040d10, 0x000001d1],
[0x1590aa98, 8, 256, 2048, 131072, 64, 8, 0x2a0408c0, 0x0804745c, 0x00000203, 0x42040700, 0x000001d1],
[0x2690ac2c, 8, 512, 4096, 262144, 224, 8, 0x290409c0, 0x08045d5c, 0x00000203, 0x42040d10, 0x00000175],
#End
[0x2690dc98, 8, 512, 4096, 262144, 128, 4, 0x2a0409c0, 0x0804645c, 0x00000203, 0x42040700, 0x00000191],
# Sierra Wireless EM7455, MDM9x35, Quectel EC25, Toshiba KSLCMBL2VA2M2A
[0x2690ac98, 8, 512, 4096, 262144, 256, 8, 0x290409c0, 0x08045d5c, 0x00000203, 0x42040d10, 0x00000175],
@ -813,13 +823,13 @@ def test_nand_config():
nandid, buswidth, density, pagesize, blocksize, oobsize, bchecc, cfg0, cfg1, eccbufcfg, bccbchcfg, badblockbyte = test
res_cfg0, res_cfg1, res_ecc_buf_cfg, res_ecc_bch_cfg = qs.nanddevice.nand_setup(nandid)
if cfg0 != res_cfg0 or cfg1 != res_cfg1 or eccbufcfg != res_ecc_buf_cfg or res_ecc_bch_cfg != bccbchcfg:
errorids.append(nandid)
errorids.append([nandid,res_cfg0,res_cfg1,res_ecc_buf_cfg,res_ecc_bch_cfg])
res_cfg0, res_cfg1, res_ecc_buf_cfg, res_ecc_bch_cfg = qs.nanddevice.nand_setup(nandid)
if len(errorids) > 0:
st = ""
for id in errorids:
st += hex(id) + ","
st += hex(id[0]) + f" {hex(id[1]),hex(id[2]),hex(id[3]),hex(id[4])},"
st = st[:-1]
print("Error at: "+st)
assert ("Error at : " + st)

View file

@ -152,15 +152,18 @@ class usb_class:
# pass
self.configuration = self.device.get_active_configuration()
if self.interface == -1:
for interfacenum in range(0, self.configuration.bNumInterfaces):
itf = usb.util.find_descriptor(self.configuration, bInterfaceNumber=interfacenum)
if self.devclass != -1:
if itf.bInterfaceClass == self.devclass: # MassStorage
if vid==0x413c and pid==0x81d7: #Telit LN940
self.interface=5
else:
for interfacenum in range(0, self.configuration.bNumInterfaces):
itf = usb.util.find_descriptor(self.configuration, bInterfaceNumber=interfacenum)
if self.devclass != -1:
if itf.bInterfaceClass == self.devclass: # MassStorage
self.interface = interfacenum
break
else:
self.interface = interfacenum
break
else:
self.interface = interfacenum
break
self.log.debug(self.configuration)
if self.interface > self.configuration.bNumInterfaces:

View file

@ -22,13 +22,15 @@ Linux/Windows:
Windows:
- Boot device into 9008 mode, install Qualcomm_Diag_QD_Loader_2016_driver.exe from Drivers\Windows,
then install libusb-win32-devel-filter-1.2.6.0.exe.
then install libusb-win32-devel-filter-1.2.7.1.exe.
Run Filter Wizard, "Install a device filter", select "Qualcomm HS-USB QDLoader 9008"
and press "Install" to install libusb filter driver otherwise we won't detect the device.
Then use the edl tool.
- Get latest libusb devel filter [here](https://sourceforge.net/projects/libusb-win32/files/)
or
- Use Zadig 2.4 or higher, list all devices, select QUSB_BULK device and replace
driver with libusb 1.2.6 one (will replace original driver)
- Use Zadig 2.5 or higher, list all devices, select QUSB_BULK device and replace
driver with libusb 1.2.7.1 one (will replace original driver)
- Get latest Zadig release [here] (https://zadig.akeo.ie/)
## Convert EDL loaders for automatic usage
@ -99,23 +101,24 @@ or
### For generic unlocking
- ```./edl.py modules oemunlock enable``` -> Unlocks OEM if partition "config" exists, fastboot oem unlock is still needed afterwards
### Streaming mode (credits to forth32)
#### Dump memory (0x900E mode)
- ```./edl.py dumpmemory```
- ```./edl.py memorydump```
### Streaming mode (credits to forth32)
#### Sierra Wireless Modem
- Send AT!BOOTHOLD and AT!QPSTDLOAD to modem port or use ```modem/sierra_boottodwnload.py``` script
- Send AT!BOOTHOLD and AT!QPSTDLOAD to modem port or use ```modem/boottodwnload.py``` script
- Send AT!ENTERCND="A710" and then AT!EROPTION=0 for memory dump
- ```./edl.py --vid 1199 --pid 9070 --loader=loaders/NPRG9x35p.bin printgpt``` -> To show the partition table
#### Netgear MR1100
- run ```modem/netgear_boottodownload.py```, device will enter download mode (0x900E pid)
- run ```modem/boottodownload.py```, device will enter download mode (0x900E pid)
- ```./edl.py printgpt --loader=Loaders/qualcomm/patched/mdm9x5x/NPRG9x55p.bin```, device will reboot to 0x9008
- now use ./edl.py regulary such as ```./edl.py printgpt``` (do not use loader option)
#### ZTE MF920V
- run ```modem/zte_enable_adb.sh```, or send to at port "AT+ZCDRUN=E", or send via ```./diag.py -sahara```
#### ZTE MF920V, Quectel, Telit, etc.. Modem
- run ```modem/enableadb.sh```, or send to at port "AT+ZCDRUN=E", or send via ```./diag.py -sahara```
- ```adb reboot edl```
- ```./edl.py printgpt``` -> To show the partition table

55
diag.py
View file

@ -364,6 +364,7 @@ class qcdiag():
self.hdlc = None
if self.cdc.connect(self.ep_in, self.ep_out):
self.hdlc = hdlc(self.cdc)
data=self.hdlc.receive_reply(1)
return True
return False
@ -379,7 +380,9 @@ class qcdiag():
return self.prettyprint(reply)
def enforce_crash(self):
res = self.send(b"\x75\x37\x03\x00")
#./diag.py -nvwrite 1027,01 enable adsp log
#./diag.py -nvwrite 4399,01 enable download on reboot
res = self.send(b"\x4B\x25\x03\x00")
print(self.decodestatus(res))
def enter_downloadmode(self):
@ -529,22 +532,25 @@ class qcdiag():
def write_nvitem(self, item, data):
rawdata = bytes(data)
if len(data) < 128:
while len(rawdata) < 128:
rawdata += b"\x00"
status = 0x0000
nvrequest = b"\x27" + write_object(nvitem_type, item, rawdata, status)
nvrequest = b"\x27" + write_object(nvitem_type, item, rawdata, status)['raw_data']
res = self.send(nvrequest)
# verify res status result here... todo
res, nvitem = self.read_nvitem(item)
if res == False:
print(f"Error while writing nvitem {hex(item)} data, %s" % nvitem)
else:
if nvitem.data != data:
print(f"Error while writing nvitem {hex(item)} data, verified data doesn't match")
if len(res) > 0:
if res[0] == 0x27:
res, nvitem = self.read_nvitem(item)
if res == False:
print(f"Error while writing nvitem {hex(item)} data, %s" % data)
else:
if nvitem.data != data:
print(f"Error while writing nvitem {hex(item)} data, verified data doesn't match")
else:
print(f"Successfully wrote nvitem {hex(item)}.")
return True
return False
else:
return True
return False
print(f"Error while writing nvitem {hex(item)} data, %s" % data)
def efsread(self, filename):
alternateefs = b"\x4B\x3E\x19\x00"
@ -1027,6 +1033,7 @@ def main():
parser.add_argument('-spc', metavar=("<SPC>"), help='[Option] Security code to send, default: 303030303030',
default="", const='303030303030', nargs="?")
parser.add_argument('-nvread', metavar=("<nvitem>"), help='[Option] Read nvitem', default="")
parser.add_argument('-nvwrite', metavar=("<nvitem,data>"), help='[Option] Write nvitem', default="")
parser.add_argument('-nvbackup', metavar=("<filename>"), help='[Option] Make nvitem backup as json', default="")
parser.add_argument('-efsread', metavar=("<filename>"), help='[Option] Read efs', default="")
parser.add_argument('-efslistdir', metavar=("<path>"), help='[Option] List efs directory', default="")
@ -1054,7 +1061,16 @@ def main():
connected=False
diag=None
default_vid_pid=[[0x19d2,0x0016],[0x2c7c,0x0125],[0x1199, 0x9071], [0x1199, 0x9091],[0x05C6, 0x9008], [0x19d2, 0x16], [0x19d2, 0x0076],[0x12d1,0x1506]]
default_vid_pid=[
[0x2c7c,0x0125], #Quectel EC25
[0x1199,0x9071], #Sierra Wireless
[0x1199,0x9091], #Sierra Wireless
[0x05C6,0x9008], #QC EDL
[0x19d2,0x0016], #ZTE Diag
[0x19d2,0x0076], #ZTE Download
[0x12d1,0x1506],
[0x413c,0x81d7] #Telit LN940
]
if vid==None or pid==None:
diag = qcdiag(LOGGER,default_vid_pid, interface)
connected = diag.connect()
@ -1084,6 +1100,17 @@ def main():
else:
nvitem = int(args.nvread)
diag.print_nvitem(nvitem)
elif args.nvwrite:
if not "," in args.nvwrite:
print("NvWrite requires data to write")
sys.exit()
nv=args.nvwrite.split(",")
if "0x" in args.nvwrite:
nvitem = int(nv[0], 16)
else:
nvitem = int(nv[0])
data=unhexlify(nv[1])
diag.write_nvitem(nvitem,data)
elif args.nvbackup:
diag.backup_nvitems(args.nvbackup, "error.log")
elif args.efsread:

6
edl.py
View file

@ -117,7 +117,7 @@ from Library.streaming_client import streaming_client
from Library.firehose_client import firehose_client
from Library.streaming import QualcommStreaming
print("Qualcomm Sahara / Firehose Client V3 (c) B.Kerler 2018-2020.")
print("Qualcomm Sahara / Firehose Client V3.1 (c) B.Kerler 2018-2021.")
LOGGER = None
@ -235,6 +235,8 @@ def parse_cmd(args):
cmd = "nop"
elif args["modules"]:
cmd = "modules"
elif args["memorydump"]:
cmd = "memorydump"
return cmd
def main():
@ -243,7 +245,7 @@ def main():
loop = 0
vid = int(args["--vid"], 16)
pid = int(args["--pid"], 16)
usbids = [[vid, pid], [0x05c6,0x9008], [0x05c6,0x900e] , [0x05c6, 0x9025], [0x1199, 0x9062], [0x1199, 0x9070], [0x1199, 0x9090], [0x0846, 0x68e0]]
usbids = [[vid, pid], [0x05c6,0x9008], [0x05c6,0x900e] , [0x05c6, 0x9025], [0x1199, 0x9062], [0x1199, 0x9070], [0x1199, 0x9090], [0x0846, 0x68e0], [0x19d2, 0x0076]]
filename = "log.txt"
if args["--debugmode"]:
LOGGER = log_class(logging.DEBUG, filename)

View file

@ -3,9 +3,8 @@ import os
import sys
from os import walk
import hashlib
import struct
from struct import unpack, pack
from shutil import copyfile
from Config.qualcomm_config import sochw, msmids
from Library.utils import elf
from Library.sahara import convertmsmid
@ -68,9 +67,9 @@ def grabtext(data):
def extract_hdr(memsection, sign_info, mem_section, code_size, signature_size):
try:
md_size = \
struct.unpack("<I", mem_section[memsection.file_start_addr + 0x2C:memsection.file_start_addr + 0x2C + 0x4])[0]
unpack("<I", mem_section[memsection.file_start_addr + 0x2C:memsection.file_start_addr + 0x2C + 0x4])[0]
md_offset = memsection.file_start_addr + 0x2C + 0x4
major, minor, sw_id, hw_id, oem_id, model_id, app_id = struct.unpack("<IIIIIII",
major, minor, sw_id, hw_id, oem_id, model_id, app_id = unpack("<IIIIIII",
mem_section[md_offset:md_offset + (7 * 4)])
sign_info.hw_id = "%08X" % hw_id
sign_info.sw_id = "%08X" % sw_id
@ -79,7 +78,7 @@ def extract_hdr(memsection, sign_info, mem_section, code_size, signature_size):
sign_info.hw_id += sign_info.oem_id + sign_info.model_id
sign_info.app_id = "%08X" % app_id
md_offset += (7 * 4)
# v=struct.unpack("<I", mem_section[md_offset:md_offset + 4])[0]
# v=unpack("<I", mem_section[md_offset:md_offset + 4])[0]
'''
rot_en=(v >> 0) & 1
in_use_soc_hw_version=(v >> 1) & 1
@ -93,9 +92,9 @@ def extract_hdr(memsection, sign_info, mem_section, code_size, signature_size):
md_offset+=12*4
multi_serial_numbers=hexlify(mm[md_offset:md_offset + (8*4)])
md_offset += 8 * 4
mrc_index=struct.unpack("<I", mm[md_offset:md_offset + 4])[0]
mrc_index=unpack("<I", mm[md_offset:md_offset + 4])[0]
md_offset+=4
anti_rollback_version=struct.unpack("<I", mm[md_offset:md_offset + 4])[0]
anti_rollback_version=unpack("<I", mm[md_offset:md_offset + 4])[0]
'''
signatureoffset = memsection.file_start_addr + 0x30 + md_size + code_size + signature_size
@ -108,11 +107,11 @@ def extract_hdr(memsection, sign_info, mem_section, code_size, signature_size):
if len(mem_section) < signatureoffset + 4:
print("Signature error on " + sign_info.filename)
return None
len1 = struct.unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
casignature2offset = signatureoffset + len1
len2 = struct.unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
rootsignature3offset = casignature2offset + len2
len3 = struct.unpack(">H", mem_section[rootsignature3offset + 2:rootsignature3offset + 4])[0] + 4
len3 = unpack(">H", mem_section[rootsignature3offset + 2:rootsignature3offset + 4])[0] + 4
sign_info.pk_hash = hashlib.sha384(mem_section[rootsignature3offset:rootsignature3offset + len3]).hexdigest()
except:
return None
@ -129,11 +128,11 @@ def extract_old_hdr(memsection, sign_info, mem_section, code_size, signature_siz
if len(mem_section) < signatureoffset + 4:
print("Signature error on " + sign_info.filename)
return None
len1 = struct.unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
len1 = unpack(">H", mem_section[signatureoffset + 2:signatureoffset + 4])[0] + 4
casignature2offset = signatureoffset + len1
len2 = struct.unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
len2 = unpack(">H", mem_section[casignature2offset + 2:casignature2offset + 4])[0] + 4
rootsignature3offset = casignature2offset + len2
len3 = struct.unpack(">H", mem_section[rootsignature3offset + 2:rootsignature3offset + 4])[0] + 4
len3 = unpack(">H", mem_section[rootsignature3offset + 2:rootsignature3offset + 4])[0] + 4
sign_info.pk_hash = hashlib.sha256(mem_section[rootsignature3offset:rootsignature3offset + len3]).hexdigest()
idx = signatureoffset
@ -195,14 +194,15 @@ def init_loader_db():
msmid = hwid[:8]
devid = hwid[8:]
pkhash = filename.split("_")[1].lower()
msmid = convertmsmid(msmid)
mhwid = convertmsmid(msmid) + devid
if mhwid not in loaderdb:
loaderdb[mhwid] = {}
if pkhash not in loaderdb[mhwid]:
loaderdb[mhwid][pkhash] = file_name
else:
loaderdb[mhwid][pkhash].append(file_name)
msmdb = convertmsmid(msmid)
for msmid in msmdb:
mhwid = (msmid + devid).lower()
if mhwid not in loaderdb:
loaderdb[mhwid] = {}
if pkhash not in loaderdb[mhwid]:
loaderdb[mhwid][pkhash] = file_name
else:
loaderdb[mhwid][pkhash].append(file_name)
except:
continue
return loaderdb
@ -213,15 +213,16 @@ def is_duplicate(loaderdb, sign_info):
msmid = sign_info.hw_id[:8].lower()
devid = sign_info.hw_id[8:].lower()
hwid = sign_info.hw_id.lower()
rid = (convertmsmid(msmid) + devid).lower()
if hwid in loaderdb:
loader = loaderdb[hwid]
if lhash in loader:
return True
if rid in loaderdb:
loader = loaderdb[rid]
if lhash in loader:
return True
for msmid in convertmsmid(msmid):
rid = (msmid + devid).lower()
if hwid in loaderdb:
loader = loaderdb[hwid]
if lhash in loader:
return True
if rid in loaderdb:
loader = loaderdb[rid]
if lhash in loader:
return True
return False
@ -276,12 +277,12 @@ def main(argv):
signinfo.filesize = os.stat(filename).st_size
if len(mem_section) < 4:
continue
hdr = struct.unpack("<I", mem_section[0:4])[0]
hdr = unpack("<I", mem_section[0:4])[0]
'''
if hdr == 0x844BDCD1: # mbn
signatureoffset = struct.unpack("<I", mem_section[0x14:0x18])[0] + struct.unpack("<I", mem_section[0x20:0x24])[0] + \
struct.unpack("<I", mem_section[0x28:0x2C])[0]
if struct.unpack("<I", mem_section[0x28:0x2C])[0] == 0:
signatureoffset = unpack("<I", mem_section[0x14:0x18])[0] + unpack("<I", mem_section[0x20:0x24])[0] + \
unpack("<I", mem_section[0x28:0x2C])[0]
if unpack("<I", mem_section[0x28:0x2C])[0] == 0:
signatureoffset = -1
'''
if hdr == 0x464C457F:
@ -289,18 +290,18 @@ def main(argv):
if 'memorylayout' in dir(elfheader):
memsection = elfheader.memorylayout[1]
try:
version = struct.unpack("<I", mem_section[
version = unpack("<I", mem_section[
memsection.file_start_addr + 0x04:memsection.file_start_addr + 0x04 + 0x4])[
0]
code_size = \
struct.unpack("<I", mem_section[
unpack("<I", mem_section[
memsection.file_start_addr + 0x14:memsection.file_start_addr + 0x14 + 0x4])[
0]
signature_size = \
struct.unpack("<I", mem_section[
unpack("<I", mem_section[
memsection.file_start_addr + 0x1C:memsection.file_start_addr + 0x1C + 0x4])[
0]
# cert_chain_size=struct.unpack("<I", mem_section[memsection.file_start_addr + 0x24:memsection.file_start_addr + 0x24 + 0x4])[0]
# cert_chain_size=unpack("<I", mem_section[memsection.file_start_addr + 0x24:memsection.file_start_addr + 0x24 + 0x4])[0]
except:
continue
if signature_size == 0:
@ -347,9 +348,10 @@ def main(argv):
print(info)
msmid = loader_info.hw_id[:8]
devid = loader_info.hw_id[8:]
hwid = convertmsmid(msmid) + devid
copyfile(item.filename,
os.path.join(outputdir, hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG.bin"))
for msmid in convertmsmid(msmid):
hwid = (msmid + devid).lower()
copyfile(item.filename,
os.path.join(outputdir, hwid + "_" + loader_info.pk_hash[0:16] + "_FHPRG.bin"))
else:
print("Duplicate: " + info)
copyfile(item.filename, os.path.join(outputdir, "Duplicate",

244
modem/boottodwnload.py Executable file
View file

@ -0,0 +1,244 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2020 under MIT license
# If you use my code, make sure you refer to my name
# If you want to use in a commercial product, ask me before integrating it
import time
from telnetlib import Telnet
import serial
import serial.tools.list_ports
import argparse
import logging
import requests
from diag import qcdiag
from Library.utils import print_progress, read_object, write_object, log_class
from Library.usblib import usb_class
from Library.hdlc import hdlc
import sys
import usb.core
from enum import Enum
class vendor(Enum):
sierra = 0x1199
quectel = 0x2c7c
zte = 0x19d2
class deviceclass:
vid=0
pid=0
def __init__(self,vid,pid):
self.vid=vid
self.pid=pid
class connection:
def __init__(self, port=""):
self.serial = None
self.tn = None
self.connected = False
if port == "":
port = self.detect(port)
if port == "":
try:
self.tn = Telnet("192.168.1.1", 5510, 5)
self.connected = True
except:
self.connected = False
if port != "":
self.serial = serial.Serial(port=port, baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=1)
self.connected = self.serial.is_open
def waitforusb(self,vid,pid):
timeout = 0
while timeout < 10:
for device in self.detectusbdevices():
if device.vid == vid:
if device.pid == pid:
return True
time.sleep(1)
timeout += 1
return False
def websend(self,url):
headers = {'Referer': 'http://192.168.0.1/index.html', 'Accept-Charset': 'UTF-8'}
r = requests.get(url, headers=headers)
if b"FACTORY:ok" in r.content or b"success" in r.content:
print(f"Detected a ZTE in web mode .... switching mode success (convert back by sending \"AT+ZCDRUN=F\" via AT port)")
return self.waitforusb(vendor.zte.value,0x0016)
return False
def getserialports(self):
return [port for port in serial.tools.list_ports.comports()]
def detectusbdevices(self):
dev = usb.core.find(find_all=True)
ids=[deviceclass(cfg.idVendor,cfg.idProduct) for cfg in dev]
return ids
def detect(self, port):
vendortable={
0x1199:["Sierra Wireless",3],
0x2c7c:["Quectel",3],
0x19d2:["ZTE",2]
}
mode="Unknown"
for device in self.detectusbdevices():
if device.vid==vendor.zte.value:
if device.pid==0x0016:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Diag mode")
mode="AT"
break
elif device.pid==0x1403:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
mode="Web"
# url = 'http://192.168.0.1/goform/goform_set_cmd_process?goformId=USB_MODE_SWITCH&usb_mode=1' #adb
url = 'http://192.168.0.1/goform/goform_process?goformId=MODE_SWITCH&switchCmd=FACTORY'
if self.websend(url):
mode="AT"
break
if mode=="AT" or mode=="Unknown":
for port in self.getserialports():
if port.vid in vendortable:
portid = port.location[-1:]
if int(portid) == vendortable[port.vid][1]:
print(f"Detected a {vendortable[port.vid][0]} at interface at: " + port.device)
return port.device
return ""
def readreply(self):
info = []
timeout=0
if self.serial is not None:
while True:
tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '')
if "OK" in tmp:
info.append(tmp)
return info
elif "ERROR" in tmp:
return -1
if tmp!="":
info.append(tmp)
else:
timeout+=1
if timeout==20:
break
return info
def send(self, cmd):
if self.tn is not None:
self.tn.write(bytes(cmd + "\r", 'utf-8'))
time.sleep(0.05)
data = ""
while True:
tmp = self.tn.read_eager()
if tmp != b"":
data += tmp.strip().decode('utf-8')
else:
break
return data.split("\r\n")
elif self.serial is not None:
self.serial.write(bytes(cmd + "\r", 'utf-8'))
time.sleep(0.05)
return self.readreply()
def close(self):
if self.tn is not None:
self.tn.close()
self.connected = False
if self.serial is not None:
self.serial.close()
self.connected = False
def ati(self):
data={}
info = self.send("ATI")
if info != -1:
for line in info:
if "Revision" in line:
data["revision"] = line.split(":")[1].strip()
if "Model" in line:
data["model"] = line.split(":")[1].strip()
if "Quectel" in line:
data["vendor"] = "Quectel"
if "Manufacturer" in line:
data["manufacturer"]=line.split(":")[1].strip()
if "Sierra Wireless" in data["manufacturer"]:
data["vendor"]="Sierra Wireless"
elif "ZTE CORPORATION" in data["manufacturer"]:
data["vendor"]="ZTE"
return data
def sendcmd(tn,cmd):
tn.write(bytes(cmd,'utf-8')+b"\n")
time.sleep(0.05)
return tn.read_eager().strip().decode('utf-8')
def main():
version = "1.0"
info = 'Modem Gimme-EDL ' + version + ' (c) B. Kerler 2020'
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description=info)
parser.add_argument(
'-port', '-p',
help='use com port for auto unlock',
default="")
parser.add_argument(
'-logfile', '-l',
help='use logfile for debug log',
default="")
args = parser.parse_args()
LOGGER=None
if args.logfile!="":
LOGGER = log_class(logging.DEBUG, args.logfile)
else:
LOGGER = log_class(logging.INFO, "")
port = args.port
cn = connection(port)
if cn.connected:
info=cn.ati()
if "vendor" in info:
if info["vendor"]=="Sierra Wireless":
print("Sending download mode command")
print(cn.send("AT!BOOTHOLD\r"))
print(cn.send('AT!QPSTDLOAD\r'))
print("Done switching to download mode")
elif info["vendor"]=="Quectel":
print("Sending download mode command")
interface=0
diag = qcdiag(LOGGER, [[0x2c7c,0x0125]], interface)
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
diag.disconnect()
print("Done switching to download mode")
elif info["vendor"]=="ZTE":
print("Sending download mode command")
interface=0
diag = qcdiag(LOGGER, [[0x19d2,0x0016]], interface)
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4b\x65\x01\x00")
if res[0]==0x4B:
print("Done switching to ENANDPRG mode")
else:
res = diag.send(b"\x3a")
if res[0]==0x3A:
while True:
state=cn.waitforusb(vendor.zte.value,0x0076)
if not state:
diag.disconnect()
if diag.connect():
res = diag.send(b"\x3a")
else:
break
if state:
print("Done switching to NANDPRG mode")
else:
print("Failed switching to download mode")
diag.disconnect()
cn.close()
if __name__=="__main__":
main()

286
modem/enableadb.py Executable file
View file

@ -0,0 +1,286 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2020 under MIT license
# If you use my code, make sure you refer to my name
# If you want to use in a commercial product, ask me before integrating it
import time
from telnetlib import Telnet
import serial
import serial.tools.list_ports
import argparse
import logging
import requests
import hashlib
from diag import qcdiag
from Library.utils import print_progress, read_object, write_object, log_class
from Library.usblib import usb_class
from Library.hdlc import hdlc
import sys
import usb.core
from enum import Enum
import crypt
class vendor(Enum):
sierra = 0x1199
quectel = 0x2c7c
zte = 0x19d2
class deviceclass:
vid=0
pid=0
def __init__(self,vid,pid):
self.vid=vid
self.pid=pid
class connection:
def __init__(self, port=""):
self.serial = None
self.tn = None
self.connected = False
if port == "":
port = self.detect(port)
if port == "":
try:
self.tn = Telnet("192.168.1.1", 5510, 5)
self.connected = True
except:
self.connected = False
if port != "":
self.serial = serial.Serial(port=port, baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=1)
self.connected = self.serial.is_open
def waitforusb(self,vid,pid):
timeout = 0
while timeout < 10:
for device in self.detectusbdevices():
if device.vid == vid:
if device.pid == pid:
return True
time.sleep(1)
timeout += 1
return False
def websend(self,url):
headers = {'Referer': 'http://192.168.0.1/index.html', 'Accept-Charset': 'UTF-8'}
r = requests.get(url, headers=headers)
if b"FACTORY:ok" in r.content or b"success" in r.content:
print(f"Detected a ZTE in web mode .... switching mode success (convert back by sending \"AT+ZCDRUN=F\" via AT port)")
return self.waitforusb(vendor.zte.value,0x0016)
return False
def getserialports(self):
return [port for port in serial.tools.list_ports.comports()]
def detectusbdevices(self):
dev = usb.core.find(find_all=True)
ids=[deviceclass(cfg.idVendor,cfg.idProduct) for cfg in dev]
return ids
def detect(self, port):
vendortable={
0x1199:["Sierra Wireless",3],
0x2c7c:["Quectel",3],
0x19d2:["ZTE",2]
}
mode="Unknown"
for device in self.detectusbdevices():
if device.vid==vendor.zte.value:
if device.pid==0x0016:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in AT mode")
mode="AT"
break
elif device.pid==0x1403:
print(f"Detected a {vendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
mode="Web"
url = 'http://192.168.0.1/goform/goform_set_cmd_process?goformId=USB_MODE_SWITCH&usb_mode=6'
if self.websend(url):
print("Successfully enabled adb.")
break
if mode=="AT" or mode=="Unknown":
for port in self.getserialports():
if port.vid in vendortable:
portid = port.location[-1:]
if int(portid) == vendortable[port.vid][1]:
print(f"Detected a {vendortable[port.vid][0]} at interface at: " + port.device)
return port.device
return ""
def readreply(self):
info = []
timeout=0
if self.serial is not None:
while True:
tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '')
if "OK" in tmp:
info.append(tmp)
return info
elif "ERROR" in tmp:
return -1
if tmp!="":
info.append(tmp)
else:
timeout+=1
if timeout==20:
break
return info
def send(self, cmd):
if self.tn is not None:
self.tn.write(bytes(cmd + "\r", 'utf-8'))
time.sleep(0.05)
data = ""
while True:
tmp = self.tn.read_eager()
if tmp != b"":
data += tmp.strip().decode('utf-8')
else:
break
return data.split("\r\n")
elif self.serial is not None:
self.serial.write(bytes(cmd + "\r", 'utf-8'))
time.sleep(0.05)
return self.readreply()
def close(self):
if self.tn is not None:
self.tn.close()
self.connected = False
if self.serial is not None:
self.serial.close()
self.connected = False
def ati(self):
data={}
info = self.send("ATI")
if info != -1:
for line in info:
if "Revision" in line:
data["revision"] = line.split(":")[1].strip()
if "Model" in line:
data["model"] = line.split(":")[1].strip()
if "Quectel" in line:
data["vendor"] = "Quectel"
if "Manufacturer" in line:
data["manufacturer"]=line.split(":")[1].strip()
if "Sierra Wireless" in data["manufacturer"]:
data["vendor"]="Sierra Wireless"
elif "ZTE CORPORATION" in data["manufacturer"]:
data["vendor"]="ZTE"
elif "SIMCOM INCORPORATED" in data["manufacturer"]:
data["vendor"]="Simcom"
info = self.send("AT+CGMI")
if info!=-1:
for line in info:
if "Quectel" in line:
data["vendor"] = "Quectel"
break
if "Fibucom" in line:
data["vendor"]="Fibucom"
break
info = self.send("AT+CGMR")
if info!=-1:
if len(info)>1:
data["model"]=info[1]
return data
def sendcmd(tn,cmd):
tn.write(bytes(cmd,'utf-8')+b"\n")
time.sleep(0.05)
return tn.read_eager().strip().decode('utf-8')
def main():
version = "1.0"
info = 'Modem Gimme-ADB ' + version + ' (c) B. Kerler 2021'
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description=info)
parser.add_argument(
'-port', '-p',
help='use com port for at',
default="")
parser.add_argument(
'-logfile', '-l',
help='use logfile for debug log',
default="")
args = parser.parse_args()
LOGGER=None
if args.logfile!="":
LOGGER = log_class(logging.DEBUG, args.logfile)
else:
LOGGER = log_class(logging.INFO, "")
port = args.port
cn = connection(port)
if cn.connected:
info=cn.ati()
if "vendor" in info:
if info["vendor"]=="Sierra Wireless":
print("Sending at switch command")
if cn.send('AT!CUSTOM="ADBENABLE",1\r')!=-1:
if cn.send('AT!CUSTOM="TELNETENABLE",1\r')!=-1:
tn = Telnet("192.168.1.1", 21, 5)
tn.write(b"adbd\n")
print("Enabled adb via telnet")
elif info["vendor"]=="Quectel":
print("Sending at switch command")
salt=cn.send("AT+QADBKEY?\r")
if salt!=-1:
if len(salt)>1:
salt=salt[1]
code = crypt.crypt("SH_adb_quectel", "$1$" + salt)
code = code[12:]
cn.send("AT+QADBKEY=\"%s\"\r" % code)
if cn.send("AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,1,0\r")==-1:
if cn.send("AT+QLINUXCMD=\"adbd\"")!=-1: #echo test > /dev/ttyGS0
print("Success enabling adb")
else:
print("Success enabling adb")
print("In order to disable adb, send AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,0,0")
elif info["vendor"]=="ZTE":
print("Sending switch command via diag")
if cn.send("AT+ZMODE=1")!=-1:
print("Success enabling adb")
else:
interface = 0
diag = qcdiag(LOGGER, [[0x19d2, 0x0016]], interface)
if diag.connect():
diag.hdlc.receive_reply()
res = diag.send(b"\x4B\xA3\x06\x00")
if res[0]==0x4B:
challenge=res[4:4+8]
response=hashlib.md5(challenge).digest()
res = diag.send(b"\x4B\xA3\x07\x00"+response)
if res[0]==0x4B:
if res[3]==0x00:
print("Auth success")
res=diag.send(b"\x41" + b"\x30\x30\x30\x30\x30\x30")
if res[1]==0x01:
print("SPC success")
sp=b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE"
res = diag.send(b"\x46" + sp)
if res[0] == 0x46 and res[1]==0x01:
print("SP success")
else:
res = diag.send(b"\x25" + sp)
if res[0]==0x46 and res[1]==0x01:
print("SP success")
res = diag.send(b"\x4B\xFA\x0B\x00\x01") #Enable adb serial
if res[0]!=0x13:
print("Success enabling adb serial")
res = diag.send(b"\x4B\x5D\x05\x00") #Operate ADB
if res[0]!=0x13:
print("Success enabling adb")
diag.disconnect()
elif info["vendor"]=="Simcom":
print("Sending at switch command")
# Simcom7600
if cn.send("AT+CUSBADB=1,1")!=-1:
print("Success enabling adb")
elif info["vendor"]=="Fibocom":
print("Sending at switch command")
# FibocomL718:
if cn.send("AT+ADBDEBUG=1")!=-1:
print("Success enabling adb")
cn.close()
if __name__=="__main__":
main()

View file

@ -1,23 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2020 under MIT license
# If you use my code, make sure you refer to my name
# If you want to use in a commercial product, ask me before integrating it
import time
from telnetlib import Telnet
def sendcmd(tn,cmd):
tn.write(bytes(cmd,'utf-8')+b"\n")
time.sleep(0.05)
return tn.read_eager().strip().decode('utf-8')
def main():
tn = Telnet("192.168.1.1", 5510)
print("Sending download mode command to 192.168.1.1:5510")
print(sendcmd(tn,"AT!BOOTHOLD\r"))
print(sendcmd(tn,'AT!QPSTDLOAD\r'))
print("Done switching to download mode")
tn.close()
if __name__=="__main__":
main()

View file

@ -1,29 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2020 under MIT license
# If you use my code, make sure you refer to my name
# If you want to use in a commercial product, ask me before integrating it
import crypt
import sys
import serial
if len(sys.argv)<2:
print("Usage: ./quectel_adb.py [enable,disable]")
sys.exit()
ser = serial.Serial('/dev/ttyUSB2')
ser.baudrate=115200
if sys.argv[1]=="enable":
print("Sending download mode command to /dev/ttyUSB2")
ser.write(b"AT+QADBKEY?\r")
salt=ser.readline()
salt+=ser.readline()
if not b"ERROR" in salt:
salt=sys.argv[1]
code=crypt.crypt("SH_adb_quectel","$1$"+salt)
code=code[12:]
ser.write(b"AT+QADBKEY=\"%s\"\n" % code)
ser.write(b"AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,1,0\n\n")
else:
print("In order to disable adb:")
ser.write("AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,0,0\n")
print(ser.readline())

View file

@ -1,18 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2019 under MIT license
# If you use my code, make sure you refer to my name
# If you want to use in a commercial product, ask me before integrating it
import serial
print("Sending download mode command to /dev/ttyUSB2")
ser = serial.Serial('/dev/ttyUSB2')
ser.baudrate=115200
ser.write(b'AT!BOOTHOLD\r')
print(ser.readline())
ser.write(b'AT!QPSTDLOAD\r')
print(ser.readline())
#ser.write(b'AT!QPSTDLOAD\r')
#print(ser.readline())
print("Done switching to download mode")
ser.close()