#!/usr/bin/env python # # Interactive shell for communication with LG devices in download mode (LAF). # # Copyright (C) 2015 Peter Wu # Licensed under the MIT license . from __future__ import print_function from contextlib import closing import argparse, logging, re, struct, sys, binascii # Enhanced prompt with history try: import readline except ImportError: pass # Try USB interface try: import usb.core, usb.util except ImportError: pass # Windows registry for serial port detection try: import winreg except ImportError: try: import _winreg as winreg except ImportError: winreg = None _logger = logging.getLogger("LGLAF.py") # Python 2/3 compat try: input = raw_input except: pass if '\0' == b'\0': int_as_byte = chr else: int_as_byte = lambda x: bytes([x]) # laf crypto for KILO challenge/response try: import laf_crypto except ImportError: _logger.warning("LAF Crypto failed to import!") pass # Use Manufacturer key for KILO challenge/response USE_MFG_KEY = False laf_error_codes = { 0x80000000: "FAILED", 0x80000001: "INVALID_PARAMETER", 0x80000002: "INVALID_HANDLE", 0x80000003: "DEVICE_NOT_SUPPORTED", 0x80000004: "INTERNAL_ERROR", 0x80000005: "TIMEOUT", 0x8000000F: "MORE_HEADER_DATA", 0x80000010: "MORE_DATA", 0x80000011: "INVALID_DATA", 0x80000012: "INVALID_DATA_LENGTH", 0x80000013: "INVALID_PACKET", 0x80000016: "CRC_CHECKSUM", 0x80000017: "CMD_CODE", 0x80000018: "OUTOFMEMORY", 0x80000105: "INVALID_NAME", 0x80000106: "NOT_CONNECTED", 0x80000107: "CANNOT_MAKE", 0x80000108: "FILE_NOT_FOUND", 0x80000109: "NOT_ENOUGH_QUOTA", 0x8000010a: "ACCESS_DENIED", 0x8000010c: "CANCELLED", 0x8000010d: "CONNECTION_ABORTED", 0x8000010e: "CONTINUE", 0x8000010f: "GEN_FAILURE", 0x80000110: "INCORRECT_ADDRESS", 0x80000111: "INVALID_CATEGORY", 0x80000112: "REQUEST_ABORTED", 0x80000113: "RETRY", 0x80000116: "DEVICE_NOT_AVAILABLE", 0x80000201: "IDT_MISMATCH_MODELNAME", 0x80000202: "IDT_DECOMPRES_FAILED", 0x80000203: "IDT_INVALID_OPTION", 0x80000204: "IDT_DECOMPRESS_END_FAILED", 0x80000205: "IDT_DZ_HEADER", 0x80000206: "IDT_RETRY_COUNT", 0x80000207: "IDT_HEADER_SIZE", 0x80000208: "IDT_TOT_MAGIC", 0x80000209: "UDT_DZ_HEADER_SIZE", 0x80000302: "INVALID_RESPONSE", 0x80000305: "FAILED_INSERT_QUEUE", 0x80000306: "FAILED_POP_QUEUE", 0x80000307: "INVALID_LAF_PROTOCOL", 0x80000308: "ERASE_FAILED", 0x80000309: "WEBFLAG_RESET_FAIL", 0x80000401: "FLASHING_FAIL", 0x80000402: "SECURE_FAIL", 0x80000403: "BUILD_TYPE_FAIL", 0x80000404: "CHECK_USER_SPC", 0x80000405: "FBOOT_CHECK_FAIL", 0x80000406: "INIT_FAIL", 0x80000407: "FRST_FLAG_FAIL", 0x80000408: "POWER_OFF_FAIL", 0x8000040a: "PRL_READ_FAIL", 0x80000409: "PRL_WRITE_FAIL", } _ESCAPE_PATTERN = re.compile(b'''\\\\( x[0-9a-fA-F]{2} | [0-7]{1,3} | .)''', re.VERBOSE) _ESCAPE_MAP = { b'n': b'\n', b'r': b'\r', b't': b'\t', } _ESCAPED_CHARS = b'"\\\'' def text_unescape(text): """Converts a string with escape sequences to bytes.""" text_bin = text.encode("utf8") def sub_char(m): what = m.group(1) if what[0:1] == b'x' and len(what) == 3: return int_as_byte(int(what[1:], 16)) elif what[0:1] in b'01234567': return int_as_byte(int(what, 8)) elif what in _ESCAPE_MAP: return _ESCAPE_MAP[what] elif what in _ESCAPED_CHARS: return what else: raise RuntimeError('Unknown escape sequence \\%s' % what.decode('utf8')) return re.sub(_ESCAPE_PATTERN, sub_char, text_bin) def parse_number_or_escape(text): try: return int(text, 0) if text else 0 except ValueError: return text_unescape(text) ### Protocol-related stuff def crc16(data): """CRC-16-CCITT computation with LSB-first and inversion.""" crc = 0xffff for byte in data: crc ^= byte for bits in range(8): if crc & 1: crc = (crc >> 1) ^ 0x8408 else: crc >>= 1 return crc ^ 0xffff def invert_dword(dword_bin): dword = struct.unpack("I", dword_bin)[0] return struct.pack("I", dword ^ 0xffffffff) def make_request(cmd, args=[], body=b''): if not isinstance(cmd, bytes): cmd = cmd.encode('ascii') assert isinstance(body, bytes), "body must be bytes" # Header: command, args, ... body size, header crc16, inverted command header = bytearray(0x20) def set_header(offset, val): if isinstance(val, int): val = struct.pack('&1 -- ' argv += shell_command.encode('ascii') if len(argv) > 255: raise RuntimeError("Command length %d is larger than 255" % len(argv)) return make_request(b'EXEC', body=argv + b'\0') ### USB or serial port communication class Communication(object): def __init__(self): self.read_buffer = b'' def read(self, n, timeout=None): """Reads exactly n bytes.""" need = n - len(self.read_buffer) while need > 0: buff = self._read(need, timeout=timeout) self.read_buffer += buff if not buff: raise EOFError need -= len(buff) data, self.read_buffer = self.read_buffer[0:n], self.read_buffer[n:] return data def _read(self, n, timeout=None): """Try one read, possibly returning less or more than n bytes.""" raise NotImplementedError def write(self, data): raise NotImplementedError def close(self): raise NotImplementedError def reset(self): self.read_buffer = b'' def call(self, payload): """Sends a command and returns its response.""" validate_message(payload) self.write(payload) header = self.read(0x20) validate_message(header, ignore_crc=True) cmd = header[0:4] size = struct.unpack_from('') raise RuntimeError('Command failed with error code %#x (%s)' % (errCode, msg)) if cmd != payload[0:4]: raise RuntimeError("Unexpected response: %r" % header) return header, data class FileCommunication(Communication): def __init__(self, file_path): super(FileCommunication, self).__init__() if sys.version_info[0] >= 3: self.f = open(file_path, 'r+b', buffering=0) else: self.f = open(file_path, 'r+b') def _read(self, n, timeout=None): return self.f.read(n) def write(self, data): self.f.write(data) def close(self): self.f.close() class USBCommunication(Communication): VENDOR_ID_LG = 0x1004 # Read timeout. Set to 0 to disable timeouts READ_TIMEOUT_MS = 60000 def __init__(self): super(USBCommunication, self).__init__() # Match device using heuristics on the interface/endpoint descriptors, # this avoids hardcoding idProduct. self.usbdev = usb.core.find(idVendor=self.VENDOR_ID_LG, custom_match = self._match_device) if self.usbdev is None: raise RuntimeError("USB device not found") cfg = usb.util.find_descriptor(self.usbdev, custom_match=self._match_configuration) current_cfg = self.usbdev.get_active_configuration() if cfg.bConfigurationValue != current_cfg.bConfigurationValue: try: cfg.set() except usb.core.USBError as e: _logger.warning("Failed to set configuration, " "has a kernel driver claimed the interface?") raise e for intf in cfg: if self.usbdev.is_kernel_driver_active(intf.bInterfaceNumber): _logger.debug("Detaching kernel driver for intf %d", intf.bInterfaceNumber) self.usbdev.detach_kernel_driver(intf.bInterfaceNumber) if self._match_interface(intf): self._set_interface(intf) assert self.ep_in assert self.ep_out def _match_device(self, device): return any( usb.util.find_descriptor(cfg, custom_match=self._match_interface) for cfg in device ) def _set_interface(self, intf): for ep in intf: ep_dir = usb.util.endpoint_direction(ep.bEndpointAddress) if ep_dir == usb.util.ENDPOINT_IN: self.ep_in = ep.bEndpointAddress else: self.ep_out = ep.bEndpointAddress _logger.debug("Using endpoints %02x (IN), %02x (OUT)", self.ep_in, self.ep_out) def _match_interface(self, intf): return intf.bInterfaceClass == 255 and \ intf.bInterfaceSubClass == 255 and \ intf.bInterfaceProtocol == 255 and \ intf.bNumEndpoints == 2 and all( usb.util.endpoint_type(ep.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK for ep in intf ) def _match_configuration(self, config): return usb.util.find_descriptor(config, custom_match=self._match_interface) def _read(self, n, timeout=None): if timeout is None: timeout = self.READ_TIMEOUT_MS # device seems to use 16 KiB buffers. array = self.usbdev.read(self.ep_in, 2**14, timeout=timeout) try: return array.tobytes() except: return array.tostring() def write(self, data): # Reset read buffer for response if self.read_buffer: _logger.warn('non-empty read buffer %r', self.read_buffer) self.read_buffer = b'' self.usbdev.write(self.ep_out, data) def close(self): usb.util.dispose_resources(self.usbdev) def challenge_response(comm, mode): request_kilo = make_request(b'KILO', args=[b'CENT', b'\0\0\0\0', b'\0\0\0\0', b'\0\0\0\0']) kilo_header, kilo_response = comm.call(request_kilo) kilo_challenge = kilo_header[8:12] _logger.debug("Challenge: %s" % binascii.hexlify(kilo_challenge)) if USE_MFG_KEY: key = b'lgowvqnltpvtgogwswqn~n~mtjjjqxro' else: key = b'qndiakxxuiemdklseqid~a~niq,zjuxl' kilo_response = laf_crypto.encrypt_kilo_challenge(key, kilo_challenge) _logger.debug("Response: %s" % binascii.hexlify(kilo_response)) mode_bytes = struct.pack(' Header: %s, Body: %s" % ( binascii.hexlify(metr_header), binascii.hexlify(metr_response))) def try_hello(comm): """ Tests whether the device speaks the expected protocol. If desynchronization is detected, tries to read as much data as possible. """ # Wait for at most 5 seconds for a response... it shouldn't take that long # and otherwise something is wrong. HELLO_READ_TIMEOUT = 5000 hello_request = make_request(b'HELO', args=[b'\1\0\0\1']) comm.write(hello_request) data = comm.read(0x20, timeout=HELLO_READ_TIMEOUT) if data[0:4] != b'HELO': # Unexpected response, maybe some stale data from a previous execution? while data[0:4] != b'HELO': try: validate_message(data, ignore_crc=True) size = struct.unpack_from('