#!/usr/bin/env python3 # -*- coding: utf-8 -*- # (c) B.Kerler 2018-2024 under GPLv3 license # If you use my code, make sure you refer to my name # # !!!!! If you use this code in commercial products, your product is automatically # GPLv3 and has to be open sourced under GPLv3 as well. !!!!! import argparse import hashlib import time import requests import serial import serial.tools.list_ports from Exscript.protocols.telnetlib import Telnet try: from edlclient.Tools.qc_diag import qcdiag except ImportError as e: pass import usb.core from enum import Enum from passlib.hash import md5_crypt try: from edlclient.Tools.sierrakeygen import SierraKeygen except ImportError: import os, sys, inspect current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) from sierrakeygen import SierraKeygen import logging import logging.config import logging.handlers import colorama itoa64 = bytearray(b"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz") def _crypt_to64(s, v, n): out = bytearray() while --n >= 0: out.append(itoa64[v & 0x3f]) v >>= 6 class ColorFormatter(logging.Formatter): LOG_COLORS = { logging.ERROR: colorama.Fore.RED, logging.DEBUG: colorama.Fore.LIGHTMAGENTA_EX, logging.WARNING: colorama.Fore.YELLOW, } def format(self, record, *args, **kwargs): # if the corresponding logger has children, they may receive modified # record, so we want to keep it intact new_record = copy.copy(record) if new_record.levelno in self.LOG_COLORS: pad = "" if new_record.name != "root": print(new_record.name) pad = "[LIB]: " # we want levelname to be in different color, so let's modify it new_record.msg = "{pad}{color_begin}{msg}{color_end}".format( pad=pad, msg=new_record.msg, color_begin=self.LOG_COLORS[new_record.levelno], color_end=colorama.Style.RESET_ALL, ) # now we can let standart formatting take care of the rest return super(ColorFormatter, self).format(new_record, *args, **kwargs) class LogBase(type): debuglevel = logging.root.level def __init__(cls, *args): super().__init__(*args) logger_attribute_name = '_' + cls.__name__ + '__logger' logger_debuglevel_name = '_' + cls.__name__ + '__debuglevel' logger_name = '.'.join([c.__name__ for c in cls.mro()[-2::-1]]) LOG_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "root": { "()": ColorFormatter, "format": "%(name)s - %(message)s", } }, "handlers": { "root": { # "level": cls.__logger.level, "formatter": "root", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", } }, "loggers": { "": { "handlers": ["root"], # "level": cls.debuglevel, "propagate": False } }, } logging.config.dictConfig(LOG_CONFIG) logger = logging.getLogger(logger_name) setattr(cls, logger_attribute_name, logger) setattr(cls, logger_debuglevel_name, cls.debuglevel) class vendor(Enum): sierra = 0x1199 quectel = 0x2c7c zte = 0x19d2 telit = 0x413c netgear = 0x0846 class deviceclass: vid = 0 pid = 0 def __init__(self, vid, pid): self.vid = vid self.pid = pid class connection: def __init__(self, port=""): self.serial = None self.tn = None self.connected = False if port == "": port = self.detect(port) if port != "": try: self.serial = serial.Serial(port=port, baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=1) self.connected = self.serial.is_open except: self.connected = False def waitforusb(self, vid, pid): timeout = 0 while timeout < 10: for device in self.detectusbdevices(): if device.vid == vid: if device.pid == pid: return True time.sleep(1) timeout += 1 return False def websend(self, url): headers = {'Referer': 'http://192.168.0.1/index.html', 'Accept-Charset': 'UTF-8'} r = requests.get(url, headers=headers) if b"FACTORY:ok" in r.content or b"success" in r.content: print( f"Detected a ZTE in web mode .... switching mode success (convert back by sending \"AT+ZCDRUN=F\" via AT port)") return self.waitforusb(vendor.zte.value, 0x0016) return False def getserialports(self): return [port for port in serial.tools.list_ports.comports()] def detectusbdevices(self): dev = usb.core.find(find_all=True) ids = [deviceclass(cfg.idVendor, cfg.idProduct) for cfg in dev] return ids def detect(self, port): atvendortable = { 0x1199: ["Sierra Wireless", 3], 0x2c7c: ["Quectel", 3], 0x19d2: ["ZTE", 2], 0x413c: ["Telit", 3], 0x0846: ["Netgear", 2], 0x04E8: ["Samsung", -1] } mode = "Unknown" try: for device in self.detectusbdevices(): if device.vid == vendor.zte.value: if device.pid == 0x0016: print(f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in AT mode") mode = "AT" break elif device.pid == 0x1403: print( f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode") mode = "Web" self.ZTE_Web() break elif device.vid == vendor.netgear.value: try: # vid 0846, netgear mr1100, mr5100 self.tn = Telnet("192.168.1.1", 5510) self.connected = True except: self.connected = False except: print("No libusb driver found. Trying Telnet instead.") try: # vid 0846, netgear mr1100, mr5100 self.tn = Telnet("192.168.1.1", 5510) self.connected = True except: self.connected = False print("Failed to connect to Telnet.") return pass if mode in ["AT", "Unknown"]: for port in self.getserialports(): if port.vid in atvendortable: portid = port.location[-1:] if int(portid) == atvendortable[port.vid][1]: print(f"Detected a {atvendortable[port.vid][0]} at interface at: " + port.device) return port.device return "" def ZTE_Web(self): url = 'http://192.168.0.1/goform/goform_set_cmd_process?goformId=USB_MODE_SWITCH&usb_mode=6' if self.websend(url): print("Successfully enabled adb.") def readreply(self): info = [] timeout = 0 if self.serial is not None: while True: tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '') if "OK" in tmp: info.append(tmp) return info elif "ERROR" in tmp: return -1 if tmp != "": info.append(tmp) else: timeout += 1 if timeout == 4: break return info def send(self, cmd): if self.tn is not None: self.tn.write(bytes(cmd + "\r", 'utf-8')) time.sleep(0.05) data = "" while True: tmp = self.tn.read_eager() if tmp != "": data += tmp.strip() else: break if "ERROR" in data: return -1 return data.split("\r\n") elif self.serial is not None: self.serial.write(bytes(cmd + "\r", 'utf-8')) time.sleep(0.05) resp = self.readreply() return resp def close(self): if self.tn is not None: self.tn.close() self.connected = False if self.serial is not None: self.serial.close() self.connected = False def ati(self): data = {} info = self.send("ATI") if info != -1: for line in info: if "Revision" in line: data["revision"] = line.split(":")[1].strip() if "Model" in line: data["model"] = line.split(":")[1].strip() if "Quectel" in line: data["vendor"] = "Quectel" if "Manufacturer" in line: data["manufacturer"] = line.split(":")[1].strip() if "Sierra Wireless" in data["manufacturer"]: data["vendor"] = "Sierra Wireless" elif "ZTE CORPORATION" in data["manufacturer"]: data["vendor"] = "ZTE" elif "SIMCOM INCORPORATED" in data["manufacturer"]: data["vendor"] = "Simcom" elif "Alcatel" in data["manufacturer"]: data["vendor"] = "Alcatel" elif "Netgear" in data["manufacturer"]: data["vendor"] = "Netgear" elif "SAMSUNG" in data["manufacturer"]: data["vendor"] = "Samsung" info = self.send("AT+CGMI") if info != -1: for line in info: if "Quectel" in line: data["vendor"] = "Quectel" break elif "Fibucom" in line: data["vendor"] = "Fibucom" break elif "Netgear" in line: data["vendor"] = "Netgear" break elif "SAMSUNG" in line: data["vendor"] = "Samsung" break info = self.send("AT+CGMR") if info != -1: if len(info) > 1: data["model"] = info[1] return data class adbtools(metaclass=LogBase): def sendcmd(self, tn, cmd): tn.write(bytes(cmd, 'utf-8') + b"\n") time.sleep(0.05) return tn.read_eager().strip().decode('utf-8') def qc_diag_auth(self, diag): if diag.connect(): res = diag.send(b"\x4B\xA3\x06\x00") if res[0] == 0x4B: challenge = res[4:4 + 8] response = hashlib.md5(challenge).digest() res = diag.send(b"\x4B\xA3\x07\x00" + response) if res[0] == 0x4B: if res[3] == 0x00: print("Auth success") res = diag.send(b"\x41" + b"\x30\x30\x30\x30\x30\x30") if res[1] == 0x01: print("SPC success") sp = b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE" res = diag.send(b"\x46" + sp) if res[0] == 0x46 and res[1] == 0x01: print("SP success") return True else: res = diag.send(b"\x25" + sp) if res[0] == 0x46 and res[1] == 0x01: print("SP success") return True return False def meta(self, port, mode=b"METAMETA"): while True: cn = connection(port) if cn.connected: while True: resp2 = cn.serial.read(8) if len(resp2) > 0: break cn.serial.write(mode) response = cn.serial.read(8) if len(response) == 0: print("Read timeout while switching to META mode") elif response == b'ATEMATEM' or response == b'READYATE': print("META Mode enabled") elif response == b'METAFORB': print("META mode forbidden") else: print("Invalid response: ", response) def run(self, port, enable): cn = connection(port) if cn.connected: info = cn.ati() res = False if "vendor" in info: if info["vendor"] == "Sierra Wireless" or info["vendor"] == "Netgear": res = self.SierraWireless(cn, info, enable) elif info["vendor"] == "Quectel": print("Sending at switch command") res = self.Quectel(cn, enable) elif info["vendor"] == "ZTE": print("Sending switch command via diag") res = self.ZTE(cn, enable) elif info["vendor"] == "Simcom": res = self.Simcom(cn, enable) elif info["vendor"] == "Fibocom": res = self.Fibocom(cn, enable) elif info["vendor"] == "Alcatel": res = self.Alcatel(enable) elif info["vendor"] == "Samsung": res = self.Samsung(cn, enable) mode = "enabled" if enable else "disabled" if res: print("ADB successfully " + mode) else: print("ADB couldn't be " + mode) cn.close() else: print("No device detected") def SierraWireless(self, cn, info, enable): print("Sending at switch command") kg = SierraKeygen(cn=cn, devicegeneration=None) kg.detectdevicegeneration() if kg.openlock(): if enable: if cn.send('AT!CUSTOM="ADBENABLE",1\r') != -1: return True kg.openlock() if cn.send('AT!CUSTOM="TELNETENABLE",1\r') != -1: time.sleep(5) tn = Telnet("192.168.1.1", 23) tn.write(b"adbd &\r\n") info = tn.read_eager() print(info) return True if kg.openlock(): if info["vendor"] == "Netgear": print("Enabling new port config") if cn.send("AT!UDPID=68E2"): print("Successfully enabled PID 68E2") return True index = -1 type = -1 bitmask = -1 resp = cn.send("AT!USBCOMP?") if resp != -1: print(resp) for val in resp: if "Config Index" in val: index = val[val.find("Config Index: ") + 14:] elif "Config Type" in val: type = val[val.find("Config Type: ") + 14:].replace(" (Generic)", "") elif "Interface bitmask" in val: bitmask = val[val.find("Interface bitmask: ") + 19:] if " " in bitmask: bitmask = "0x" + bitmask.split(" ")[0] if index != -1 and type != -1 and bitmask != 1: index = int(index) type = int(type) bitmask = int(bitmask, 16) # AT!USBCOMP=,, # - configuration index to which the composition applies, should be 1 # - 1:Generic, 2:USBIF-MBIM, 3:RNDIS # config type 2/3 should only be used for specific Sierra PIDs: 68B1, 9068 # customized VID/PID should use config type 1 # - DIAG - 0x00000001, # ADB - 0x00000002, # NMEA - 0x00000004, # MODEM - 0x00000008, # RMNET0 - 0x00000100, # RMNET1 - 0x00000400, # RMNET2 - 0x00000800, # MBIM - 0x00001000, # RNDIS - 0x00004000, # AUDIO - 0x00010000, # ECM - 0x00080000, # UBIST - 0x00200000 #if enable: cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0080010E #else: # cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0000010D resp = cn.send(cmd) if resp != -1: resp = cn.send("AT!RESET") if resp != -1: return True return False return True else: if cn.send('AT!CUSTOM="ADBENABLE",0\r') != -1: return True kg.openlock() if cn.send('AT!CUSTOM="TELNETENABLE",0\r') != -1: return True return False def Samsung(self, cn, enable): if enable: if cn.send("AT+USBMODEM=1"): return True elif cn.send("AT+SYSSCOPE=1,0,0"): return True else: if cn.send("AT+USBMODEM=0"): return True elif cn.send("AT+SYSSCOPE=1,0,0"): return True return False def Alcatel(self, enable): print("Send scsi switch command") print("Run \"sudo sg_raw /dev/sg0 16 f9 00 00 00 00 00 00 00 00 00 00 00 00 00 00 -v\" to enable adb") def Fibocom(self, cn, enable): print("Sending at switch command") if enable: # FibocomL718: if cn.send("AT+ADBDEBUG=1") != -1: return True else: if cn.send("AT+ADBDEBUG=0") != -1: return True return False def Simcom(self, cn, enable): print("Sending at switch command") if enable: # Simcom7600 if cn.send("AT+CUSBADB=1,1") != -1: return True else: if cn.send("AT+CUSBADB=1,1") != -1: return True return False def ZTE(self, cn, enable): if enable: if cn.send("AT+ZMODE=1") != -1: return True else: interface = 0 diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x19d2, 0x0016, interface]]) if self.qc_diag_auth(diag): res = diag.send(b"\x4B\xFA\x0B\x00\x01") # Enable adb serial if res[0] != 0x13: print("Success enabling adb serial") res = diag.send(b"\x4B\x5D\x05\x00") # Operate ADB if res[0] != 0x13: print("Success enabling adb") return True diag.disconnect() else: if cn.send("AT+ZMODE=F") != -1: return True else: interface = 0 diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x19d2, 0x0016, interface]]) if self.qc_diag_auth(diag): res = diag.send(b"\x4B\xFA\x0B\x00\x00") # Enable adb serial if res[0] != 0x13: print("Success enabling adb serial") res = diag.send(b"\x4B\x5D\x05\x00") # Operate ADB if res[0] != 0x13: print("Success enabling adb") diag.disconnect() return True return False def Quectel(self, cn, enable: bool = True): sn = cn.send("AT+QADBKEY?\r") if sn != -1: if len(sn) > 1: sn = sn[1] cc = md5_crypt(salt="") code = cc.encrypt("SH_adb_quectel", salt=str(sn)) code = code[12:28] cn.send("AT+QADBKEY=\"%s\"\r" % code) if enable: if cn.send("AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,1,0\r") == -1: if cn.send("AT+QLINUXCMD=\"adbd\"") != -1: # echo test > /dev/ttyGS0 return True else: return True else: if cn.send("AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,0,0\r") != -1: return True return False def main(): version = "1.2" info = '\nModem Gimme-ADB ' + version + ' (c) B. Kerler 2020-2023\n-------------------------------------------\n' parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description=info) parser.add_argument( '-mode', help='Mode: enable or disable', default="enable") parser.add_argument( '-port', '-p', help='[Optional] use com port for at', default="") parser.add_argument( '-logfile', '-l', help='use logfile for debug log', default="") args = parser.parse_args() ad = adbtools() print(info) print("Supported modules: ZTE,Netgear,Sierra Wireless,Samsung,Alcatel,Quectel,Fibucom") if args.mode.lower() == "enable": enable = True else: enable = False ad.run(port=args.port, enable=enable) # ad.meta(port=args.port) if __name__ == "__main__": main()