#!/usr/bin/env python # # Interactive shell for communication with LG devices in download mode (LAF). # # Copyright (C) 2015 Peter Wu # Licensed under the MIT license . from __future__ import print_function from contextlib import closing import argparse, logging, re, struct, sys # Enhanced prompt with history try: import readline except ImportError: pass # Try USB interface try: import usb.core, usb.util except ImportError: pass # Windows registry for serial port detection try: import winreg except ImportError: try: import _winreg as winreg except ImportError: winreg = None _logger = logging.getLogger("LGLAF.py") # Python 2/3 compat try: input = raw_input except: pass if '\0' == b'\0': int_as_byte = chr else: int_as_byte = lambda x: bytes([x]) _ESCAPE_PATTERN = re.compile(b'''\\\\( x[0-9a-fA-F]{2} | [0-7]{1,3} | .)''', re.VERBOSE) _ESCAPE_MAP = { b'n': b'\n', b'r': b'\r', b't': b'\t', } _ESCAPED_CHARS = b'"\\\'' def text_unescape(text): """Converts a string with escape sequences to bytes.""" text_bin = text.encode("utf8") def sub_char(m): what = m.group(1) if what[0:1] == b'x' and len(what) == 3: return int_as_byte(int(what[1:], 16)) elif what[0:1] in b'01234567': return int_as_byte(int(what, 8)) elif what in _ESCAPE_MAP: return _ESCAPE_MAP[what] elif what in _ESCAPED_CHARS: return what else: raise RuntimeError('Unknown escape sequence \\%s' % what.decode('utf8')) return re.sub(_ESCAPE_PATTERN, sub_char, text_bin) def parse_number_or_escape(text): try: return int(text, 0) if text else 0 except ValueError: return text_unescape(text) ### Protocol-related stuff def crc16(data): """CRC-16-CCITT computation with LSB-first and inversion.""" crc = 0xffff for byte in data: crc ^= byte for bits in range(8): if crc & 1: crc = (crc >> 1) ^ 0x8408 else: crc >>= 1 return crc ^ 0xffff def invert_dword(dword_bin): dword = struct.unpack("I", dword_bin)[0] return struct.pack("I", dword ^ 0xffffffff) def make_request(cmd, args=[], body=b''): if not isinstance(cmd, bytes): cmd = cmd.encode('ascii') assert isinstance(body, bytes), "body must be bytes" # Header: command, args, ... body size, header crc16, inverted command header = bytearray(0x20) def set_header(offset, val): if isinstance(val, int): val = struct.pack('&1 -- ' argv += shell_command.encode('ascii') if len(argv) > 255: raise RuntimeError("Command length %d is larger than 255" % len(argv)) return make_request(b'EXEC', body=argv + b'\0') ### USB or serial port communication class Communication(object): def __init__(self): self.read_buffer = b'' def read(self, n, timeout=None): """Reads exactly n bytes.""" need = n - len(self.read_buffer) while need > 0: buff = self._read(need, timeout=timeout) self.read_buffer += buff if not buff: raise EOFError need -= len(buff) data, self.read_buffer = self.read_buffer[0:n], self.read_buffer[n:] return data def _read(self, n, timeout=None): """Try one read, possibly returning less or more than n bytes.""" raise NotImplementedError def write(self, data): raise NotImplementedError def close(self): raise NotImplementedError def reset(self): self.read_buffer = b'' def call(self, payload): """Sends a command and returns its response.""" validate_message(payload) self.write(payload) header = self.read(0x20) validate_message(header, ignore_crc=True) cmd = header[0:4] size = struct.unpack_from('= 3: self.f = open(file_path, 'r+b', buffering=0) else: self.f = open(file_path, 'r+b') def _read(self, n, timeout=None): return self.f.read(n) def write(self, data): self.f.write(data) def close(self): self.f.close() class USBCommunication(Communication): EP_IN = 0x85 EP_OUT = 3 VENDOR_ID_LG = 0x1004 # Read timeout. Set to 0 to disable timeouts READ_TIMEOUT_MS = 60000 def __init__(self): super(USBCommunication, self).__init__() # Match device using heuristics on the interface/endpoint descriptors, # this avoids hardcoding idProduct. self.usbdev = usb.core.find(idVendor=self.VENDOR_ID_LG, custom_match = self._match_device) if self.usbdev is None: raise RuntimeError("USB device not found") cfg = usb.util.find_descriptor(self.usbdev, custom_match=self._match_configuration) current_cfg = self.usbdev.get_active_configuration() if cfg.bConfigurationValue != current_cfg.bConfigurationValue: try: cfg.set() except usb.core.USBError as e: _logger.warning("Failed to set configuration, " "has a kernel driver claimed the interface?") raise e def _match_device(self, device): return any( usb.util.find_descriptor(cfg, bInterfaceClass=255, bInterfaceSubClass=255, bInterfaceProtocol=255, custom_match=self._match_interface) for cfg in device ) def _match_interface(self, intf): return intf.bNumEndpoints == 2 and all( ep.bEndpointAddress in (self.EP_IN, self.EP_OUT) and usb.util.endpoint_type(ep.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK for ep in intf ) def _match_configuration(self, config): return usb.util.find_descriptor(config, custom_match=self._match_interface) def _read(self, n, timeout=None): if timeout is None: timeout = self.READ_TIMEOUT_MS # device seems to use 16 KiB buffers. array = self.usbdev.read(self.EP_IN, 2**14, timeout=timeout) try: return array.tobytes() except: return array.tostring() def write(self, data): # Reset read buffer for response if self.read_buffer: _logger.warn('non-empty read buffer %r', self.read_buffer) self.read_buffer = b'' self.usbdev.write(self.EP_OUT, data) def close(self): usb.util.dispose_resources(self.usbdev) def try_hello(comm): """ Tests whether the device speaks the expected protocol. If desynchronization is detected, tries to read as much data as possible. """ # Wait for at most 5 seconds for a response... it shouldn't take that long # and otherwise something is wrong. HELLO_READ_TIMEOUT = 5000 hello_request = make_request(b'HELO', args=[b'\1\0\0\1']) comm.write(hello_request) data = comm.read(0x20, timeout=HELLO_READ_TIMEOUT) if data[0:4] != b'HELO': # Unexpected response, maybe some stale data from a previous execution? while data[0:4] != b'HELO': try: validate_message(data, ignore_crc=True) size = struct.unpack_from('