Beautify code

This commit is contained in:
Bjoern Kerler 2021-02-21 21:10:44 +01:00
parent 66042e61c5
commit 5c9244b973
3 changed files with 94 additions and 78 deletions

View file

@ -43,27 +43,36 @@ CDC_CMDS = {
}
class usb_class(metaclass=LogBase):
class UsbClass(metaclass=LogBase):
def __init__(self, loglevel=logging.INFO, portconfig=None, devclass=-1):
self.portconfig = portconfig
self.connected = False
self.devclass = devclass
self.timeout = None
self.vid = None
self.pid = None
self.info=self.__logger.info
self.error=self.__logger.error
self.warning=self.__logger.warning
self.device = None
self.EP_IN = None
self.EP_OUT = None
self.interface = None
self.stopbits = None
self.databits = None
self.baudrate = None
self.parity = None
self.configuration = None
self.portconfig = portconfig
self.devclass = devclass
self.info = self.__logger.info
self.error = self.__logger.error
self.warning = self.__logger.warning
self.debug = self.__logger.debug
self.__logger.setLevel(loglevel)
if loglevel==logging.DEBUG:
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
self.__logger.addHandler(fh)
def verify_data(self, data, pre="RX:"):
self.debug("",stack_info=True)
self.debug("", stack_info=True)
if isinstance(data, bytes) or isinstance(data, bytearray):
if data[:5] == b"<?xml":
try:
@ -72,11 +81,13 @@ class usb_class(metaclass=LogBase):
try:
self.debug(pre + line.decode('utf-8'))
rdata += line + b"\n"
except:
except Exception as e: # pylint: disable=broad-except
v = hexlify(line)
self.debug(str(e))
self.debug(pre + v.decode('utf-8'))
return rdata
except:
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
pass
if logging.DEBUG >= self.__logger.level:
self.debug(pre + hexlify(data).decode('utf-8'))
@ -85,7 +96,7 @@ class usb_class(metaclass=LogBase):
self.debug(pre + data)
return data
def getInterfaceCount(self):
def getinterfacecount(self):
if self.vid is not None:
self.device = usb.core.find(idVendor=self.vid, idProduct=self.pid)
if self.device is None:
@ -93,16 +104,17 @@ class usb_class(metaclass=LogBase):
return False
try:
self.device.set_configuration()
except:
except Exception(UsbClass) as e:
self.debug(str(e))
pass
self.configuration = self.device.get_active_configuration()
self.debug(2, self.configuration)
return self.configuration.bNumInterfaces
else:
self.__logger.error("No device detected. Is it connected ?")
self.error("No device detected. Is it connected ?")
return 0
def setLineCoding(self, baudrate=None, parity=0, databits=8, stopbits=1):
def setlinecoding(self, baudrate=None, parity=0, databits=8, stopbits=1):
sbits = {1: 0, 1.5: 1, 2: 2}
dbits = {5, 6, 7, 8, 16}
pmodes = {0, 1, 2, 3, 4}
@ -158,7 +170,7 @@ class usb_class(metaclass=LogBase):
data = bytearray(linecode)
wlen = self.device.ctrl_transfer(
req_type, CDC_CMDS["SET_LINE_CODING"],
data_or_wLength=data, wIndex=1)
data_or_wlength=data, windex=1)
self.debug("Linecoding set, {}b sent".format(wlen))
def setbreak(self):
@ -167,30 +179,30 @@ class usb_class(metaclass=LogBase):
recipient = 1 # 0:device, 1:interface, 2:endpoint, 3:other
req_type = (txdir << 7) + (req_type << 5) + recipient
wlen = self.device.ctrl_transfer(
bmRequestType=req_type, bRequest=CDC_CMDS["SEND_BREAK"],
wValue=0, data_or_wLength=0, wIndex=1)
bmrequesttype=req_type, brequest=CDC_CMDS["SEND_BREAK"],
wvalue=0, data_or_wlength=0, windex=1)
self.debug("Break set, {}b sent".format(wlen))
def setControlLineState(self, RTS=None, DTR=None, isFTDI=False):
ctrlstate = (2 if RTS else 0) + (1 if DTR else 0)
if isFTDI:
ctrlstate += (1 << 8) if DTR is not None else 0
ctrlstate += (2 << 8) if RTS is not None else 0
txdir = 0 # 0:OUT, 1:IN
req_type = 2 if isFTDI else 1 # 0:std, 1:class, 2:vendor
def setcontrollinestate(self, rts=None, dtr=None, isftdi=False):
ctrlstate = (2 if rts else 0) + (1 if dtr else 0)
if isftdi:
ctrlstate += (1 << 8) if dtr is not None else 0
ctrlstate += (2 << 8) if rts is not None else 0
txdir = 0 # 0:OUT, 1:IN
req_type = 2 if isftdi else 1 # 0:std, 1:class, 2:vendor
# 0:device, 1:interface, 2:endpoint, 3:other
recipient = 0 if isFTDI else 1
recipient = 0 if isftdi else 1
req_type = (txdir << 7) + (req_type << 5) + recipient
wlen = self.device.ctrl_transfer(
bmRequestType=req_type,
bRequest=1 if isFTDI else CDC_CMDS["SET_CONTROL_LINE_STATE"],
wValue=ctrlstate,
wIndex=1,
data_or_wLength=0)
bmrequesttype=req_type,
brequest=1 if isftdi else CDC_CMDS["SET_CONTROL_LINE_STATE"],
wvalue=ctrlstate,
windex=1,
data_or_wlength=0)
self.debug("Linecoding set, {}b sent".format(wlen))
def connect(self, EP_IN=-1, EP_OUT=-1):
def connect(self, ep_in=-1, ep_out=-1):
if self.connected:
self.close()
self.connected = False
@ -235,28 +247,24 @@ class usb_class(metaclass=LogBase):
if self.device.is_kernel_driver_active(self.interface):
self.debug("Detaching kernel driver")
self.device.detach_kernel_driver(self.interface)
except:
self.debug("No kernel driver supported.")
except Exception(UsbClass) as e:
self.debug(str(e))
usb.util.claim_interface(self.device, self.interface)
if EP_OUT == -1:
if ep_out == -1:
# match the first OUT endpoint
self.EP_OUT = usb.util.find_descriptor(itf,
# match the first OUT endpoint
custom_match= \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_OUT)
custom_match=lambda em: usb.util.endpoint_direction(
em.bEndpointAddress) == usb.util.ENDPOINT_OUT)
else:
self.EP_OUT = EP_OUT
if EP_IN == -1:
self.EP_OUT = ep_out
if ep_in == -1:
# match the first OUT endpoint
self.EP_IN = usb.util.find_descriptor(itf,
# match the first OUT endpoint
custom_match= \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_IN)
custom_match=lambda em: usb.util.endpoint_direction(
em.bEndpointAddress) == usb.util.ENDPOINT_IN)
else:
self.EP_IN = EP_IN
self.EP_IN = ep_in
self.connected = True
return True
@ -265,16 +273,17 @@ class usb_class(metaclass=LogBase):
self.connected = False
return False
def close(self,reset=False):
def close(self, reset=False):
if self.connected:
usb.util.dispose_resources(self.device)
try:
if not self.device.is_kernel_driver_active(self.interface):
#self.device.attach_kernel_driver(self.interface) #Do NOT uncomment
# self.device.attach_kernel_driver(self.interface) #Do NOT uncomment
self.device.attach_kernel_driver(0)
if reset:
self.device.reset()
except:
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
pass
del self.device
@ -291,7 +300,8 @@ class usb_class(metaclass=LogBase):
time.sleep(0.01)
try:
self.device.write(self.EP_OUT, b'')
except:
except Exception as e: # pylint: disable=broad-except
self.debug(str(e))
return False
return True
else:
@ -300,8 +310,9 @@ class usb_class(metaclass=LogBase):
try:
self.device.write(self.EP_OUT, command[pos:pos + pktsize])
pos += pktsize
except:
except Exception as e: # pylint: disable=broad-except
# print("Error while writing")
self.debug(str(e))
time.sleep(0.01)
i += 1
if i == 3:
@ -328,7 +339,7 @@ class usb_class(metaclass=LogBase):
self.debug(tmp)
return bytearray(tmp)
elif "Overflow" in error:
self.__logger.error("USB Overflow")
self.error("USB Overflow")
sys.exit(0)
elif e.errno is not None:
print(repr(e), type(e), e.errno)
@ -338,13 +349,13 @@ class usb_class(metaclass=LogBase):
self.verify_data(bytearray(tmp), "RX:")
return bytearray(tmp)
def ctrl_transfer(self, bmRequestType, bRequest, wValue, wIndex, data_or_wLength):
ret = self.device.ctrl_transfer(bmRequestType=bmRequestType, bRequest=bRequest, wValue=wValue, wIndex=wIndex,
data_or_wLength=data_or_wLength)
def ctrl_transfer(self, bmrequesttype, brequest, wvalue, windex, data_or_wlength):
ret = self.device.ctrl_transfer(bmrequesttype=bmrequesttype, brequest=brequest, wvalue=wvalue, windex=windex,
data_or_wlength=data_or_wlength)
return ret[0] | (ret[1] << 8)
class scsi_cmds(Enum):
class ScsiCmds(Enum):
SC_TEST_UNIT_READY = 0x00,
SC_REQUEST_SENSE = 0x03,
SC_FORMAT_UNIT = 0x04,
@ -392,7 +403,8 @@ command_status_wrapper = [
]
command_status_wrapper_len = 13
class scsi:
class Scsi:
"""
FIHTDC, PCtool
"""
@ -426,15 +438,16 @@ class scsi:
self.interface = interface
self.Debug = False
self.usb = None
self.loglevel=loglevel
self.loglevel = loglevel
def connect(self):
self.usb = usb_class(loglevel=self.loglevel,portconfig=[self.vid, self.pid,self.interface], devclass=8)
self.usb = UsbClass(loglevel=self.loglevel, portconfig=[self.vid, self.pid, self.interface], devclass=8)
if self.usb.connect():
return True
return False
# htcadb = "55534243123456780002000080000616687463800100000000000000000000"; // Len 0x6, Command 0x16, "HTC" 01 = Enable, 02 = Disable
# htcadb = "55534243123456780002000080000616687463800100000000000000000000";
# Len 0x6, Command 0x16, "HTC" 01 = Enable, 02 = Disable
def send_mass_storage_command(self, lun, cdb, direction, data_length):
global tag
cmd = cdb[0]
@ -493,17 +506,17 @@ class scsi:
def send_htc_ums_adbenable(self): # HTC10
# ums_ctrlrequest from f_mass_storage.c
print("Sending HTC ums adb enable command")
bRequestType = USB_DIR_IN | USB_TYPE_VENDOR | USB_RECIP_DEVICE
bRequest = 0xa0
wValue = 1
brequesttype = USB_DIR_IN | USB_TYPE_VENDOR | USB_RECIP_DEVICE
brequest = 0xa0
wvalue = 1
'''
wValue:
0: Disable adb daemon
1: Enable adb daemon
'''
wIndex = 0
windex = 0
w_length = 1
ret = self.usb.ctrl_transfer(bRequestType, bRequest, wValue, wIndex, w_length)
ret = self.usb.ctrl_transfer(brequesttype, brequest, wvalue, windex, w_length)
print("Sent HTC ums adb enable command: %x" % ret)
def send_zte_adbenable(self): # zte blade
@ -532,8 +545,8 @@ class scsi:
if self.usb.connect():
print("Sending FIH adb enable command")
datasize = 0x24
common_cmnd = bytes([self.SC_SWITCH_PORT]) + b"FI1" + struct.pack("<H",
datasize) # reserve_cmd + 'FI' + flag + len + none
# reserve_cmd + 'FI' + flag + len + none
common_cmnd = bytes([self.SC_SWITCH_PORT]) + b"FI1" + struct.pack("<H", datasize)
'''
Flag values:
common_cmnd[3]->1: Enable adb daemon from mass_storage
@ -542,7 +555,8 @@ class scsi:
lun = 0
# datasize=common_cmnd[4]
timeout = 5000
ret_tag = self.send_mass_storage_command(lun, common_cmnd, USB_DIR_IN, 0x600)
ret_tag = None
ret_tag += self.send_mass_storage_command(lun, common_cmnd, USB_DIR_IN, 0x600)
# ret_tag+=self.send_mass_storage_command(lun, common_cmnd, USB_DIR_IN, 0x600)
if datasize > 0:
data = self.usb.read(datasize, timeout)
@ -555,22 +569,24 @@ class scsi:
print("Sending alcatel adb enable command")
datasize = 0x24
common_cmnd = b"\x16\xf9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
lun=0
lun = 0
timeout = 5000
ret_tag = self.send_mass_storage_command(lun, common_cmnd, USB_DIR_IN, 0x600)
ret_tag = None
ret_tag += self.send_mass_storage_command(lun, common_cmnd, USB_DIR_IN, 0x600)
if datasize > 0:
data = self.usb.read(datasize, timeout)
print("DATA: " + hexlify(data).decode('utf-8'))
print("Sent alcatel adb enable command")
self.usb.close()
def send_fih_root(
self): # motorola xt560, nokia 3.1, huawei u8850, huawei Ideos X6, lenovo s2109, triumph M410, viewpad 7, #f_mass_storage.c
def send_fih_root(self):
# motorola xt560, nokia 3.1, huawei u8850, huawei Ideos X6,
# lenovo s2109, triumph M410, viewpad 7, #f_mass_storage.c
if self.usb.connect():
print("Sending FIH root command")
datasize = 0x24
common_cmnd = bytes([self.SC_SWITCH_ROOT]) + b"FIH" + struct.pack("<H",
datasize) # reserve_cmd + 'FIH' + len + flag + none
# reserve_cmd + 'FIH' + len + flag + none
common_cmnd = bytes([self.SC_SWITCH_ROOT]) + b"FIH" + struct.pack("<H", datasize)
lun = 0
# datasize = common_cmnd[4]
timeout = 5000

View file

@ -27,7 +27,7 @@ def main():
else:
interface = -1
usbscsi = scsi(vid, pid, interface)
usbscsi = Scsi(vid, pid, interface)
if usbscsi.connect():
if args.nokia:
usbscsi.send_fih_adbenable()

4
edl.py
View file

@ -120,7 +120,7 @@ import subprocess
import re
from docopt import docopt
from Library.utils import LogBase
from Library.usblib import usb_class
from Library.usblib import UsbClass
from Library.sahara import sahara
from Library.streaming_client import streaming_client
from Library.firehose_client import firehose_client
@ -240,7 +240,7 @@ class main(metaclass=LogBase):
else:
self.__logger.setLevel(logging.INFO)
self.cdc = usb_class(portconfig=portconfig, loglevel=self.__logger.level)
self.cdc = UsbClass(portconfig=portconfig, loglevel=self.__logger.level)
self.sahara = sahara(self.cdc, loglevel=self.__logger.level)
if args["--loader"] == 'None':