#!/usr/bin/env python # # Interactive shell for communication with LG devices in download mode (LAF). # # Copyright (C) 2015 Peter Wu # Licensed under the MIT license . from __future__ import print_function from contextlib import closing import argparse, logging, re, struct, sys # Enhanced prompt with history try: import readline except ImportError: pass # Try USB interface try: import usb.core except ImportError: pass # Windows registry for serial port detection try: import winreg except ImportError: try: import _winreg as winreg except ImportError: winreg = None _logger = logging.getLogger("LGLAF.py") # Python 2/3 compat try: input = raw_input except: pass if '\0' == b'\0': int_as_byte = chr else: int_as_byte = lambda x: bytes([x]) _ESCAPE_PATTERN = re.compile(b'''\\\\( x[0-9a-fA-F]{2} | [0-7]{1,3} | .)''', re.VERBOSE) _ESCAPE_MAP = { b'n': b'\n', b'r': b'\r', b't': b'\t', } _ESCAPED_CHARS = b'"\\\'' def text_unescape(text): """Converts a string with escape sequences to bytes.""" text_bin = text.encode("utf8") def sub_char(m): what = m.group(1) if what[0:1] == b'x' and len(what) == 3: return int_as_byte(int(what[1:], 16)) elif what[0:1] in b'01234567': return int_as_byte(int(what, 8)) elif what in _ESCAPE_MAP: return _ESCAPE_MAP[what] elif what in _ESCAPED_CHARS: return what else: raise RuntimeError('Unknown escape sequence \\%s' % what.decode('utf8')) return re.sub(_ESCAPE_PATTERN, sub_char, text_bin) def parse_number_or_escape(text): try: return int(text, 0) if text else 0 except ValueError: return text_unescape(text) ### Protocol-related stuff def crc16(data): """CRC-16-CCITT computation with LSB-first and inversion.""" crc = 0xffff for byte in data: crc ^= byte for bits in range(8): if crc & 1: crc = (crc >> 1) ^ 0x8408 else: crc >>= 1 return crc ^ 0xffff def invert_dword(dword_bin): dword = struct.unpack("I", dword_bin)[0] return struct.pack("I", dword ^ 0xffffffff) def make_request(cmd, args=[], body=b''): if not isinstance(cmd, bytes): cmd = cmd.encode('ascii') assert isinstance(body, bytes), "body must be bytes" # Header: command, args, ... body size, header crc16, inverted command header = bytearray(0x20) def set_header(offset, val): if isinstance(val, int): val = struct.pack('&1 ' body += shell_command.encode('ascii') + b'\0' return make_request(b'EXEC', body=body) ### USB or serial port communication class Communication(object): def __init__(self): self.read_buffer = b'' def read(self, n): """Reads exactly n bytes.""" need = n - len(self.read_buffer) while need > 0: buff = self._read(need) self.read_buffer += buff if not buff: raise EOFError need -= len(buff) data, self.read_buffer = self.read_buffer[0:n], self.read_buffer[n:] return data def _read(self, n): """Try one read, possibly returning less or more than n bytes.""" raise NotImplementedError def write(self, data): raise NotImplementedError def close(self): raise NotImplementedError def reset(self): self.read_buffer = b'' def call(self, payload): """Sends a command and returns its response.""" validate_message(payload) self.write(payload) header = self.read(0x20) validate_message(header, ignore_crc=True) cmd = header[0:4] size = struct.unpack_from('= 3: self.f = open(file_path, 'r+b', buffering=0) else: self.f = open(file_path, 'r+b') def _read(self, n): return self.f.read(n) def write(self, data): self.f.write(data) def close(self): self.f.close() class USBCommunication(Communication): EP_IN = 0x85 EP_OUT = 3 # Read timeout. Set to 0 to disable timeouts READ_TIMEOUT_MS = 0 def __init__(self): super(USBCommunication, self).__init__() self.usbdev = usb.core.find(idVendor=0x1004, idProduct=0x633e) if self.usbdev is None: raise RuntimeError("USB device not found") def _read(self, n): # device seems to use 16 KiB buffers. array = self.usbdev.read(self.EP_IN, 2**14, timeout=self.READ_TIMEOUT_MS) try: return array.tobytes() except: return array.tostring() def write(self, data): # Reset read buffer for response if self.read_buffer: _logger.warn('non-empty read buffer %r', self.read_buffer) self.read_buffer = b'' self.usbdev.write(self.EP_OUT, data) def close(self): usb.util.dispose_resources(self.usbdev) def try_hello(comm): """ Tests whether the device speaks the expected protocol. If desynchronization is detected, tries to read as much data as possible. """ hello_request = make_request(b'HELO', args=[b'\1\0\0\1']) comm.write(hello_request) data = comm.read(0x20) if data[0:4] != b'HELO': # Unexpected response, maybe some stale data from a previous execution? while data[0:4] != b'HELO': try: validate_message(data, ignore_crc=True) size = struct.unpack_from('