#!/usr/bin/env python # # Interactive shell for communication with LG devices in download mode (LAF). # # Copyright (C) 2015 Peter Wu # Licensed under the MIT license . from __future__ import print_function from contextlib import closing import argparse, logging, re, struct, sys # Enhanced prompt with history try: import readline except ImportError: pass # Try USB interface try: import usb.core except ImportError: pass _logger = logging.getLogger("LGLAF.py") # Python 2/3 compat try: input = raw_input except: pass if '\0' == b'\0': int_as_byte = chr else: int_as_byte = lambda x: bytes([x]) _ESCAPE_PATTERN = re.compile(b'''\\\\( x[0-9a-fA-F]{2} | [0-7]{1,3} | .)''', re.VERBOSE) _ESCAPE_MAP = { b'n': b'\n', b'r': b'\r', b't': b'\t', } _ESCAPED_CHARS = b'"\\\'' def text_unescape(text): """Converts a string with escape sequences to bytes.""" text_bin = text.encode("utf8") def sub_char(m): what = m.group(1) if what[0:1] == b'x' and len(what) == 3: return int_as_byte(int(what[1:], 16)) elif what[0:1] in b'01234567': return int_as_byte(int(what, 8)) elif what in _ESCAPE_MAP: return _ESCAPE_MAP[what] elif what in _ESCAPED_CHARS: return what else: raise RuntimeError('Unknown escape sequence \\%s' % what.decode('utf8')) return re.sub(_ESCAPE_PATTERN, sub_char, text_bin) def parse_number_or_escape(text): try: return int(text, 0) if text else 0 except ValueError: return text_unescape(text) ### Protocol-related stuff def crc16(data): """CRC-16-CCITT computation with LSB-first and inversion.""" crc = 0xffff for byte in data: crc ^= byte for bits in range(8): if crc & 1: crc = (crc >> 1) ^ 0x8408 else: crc >>= 1 return crc ^ 0xffff def invert_dword(dword_bin): dword = struct.unpack("I", dword_bin)[0] return struct.pack("I", dword ^ 0xffffffff) def make_request(cmd, args=[], body=b''): if not isinstance(cmd, bytes): cmd = cmd.encode('ascii') assert isinstance(body, bytes) # Header: command, args, ... body size, header crc16, inverted command header = bytearray(0x20) def set_header(offset, val): if isinstance(val, int): val = struct.pack('&1 ' body += shell_command.encode('ascii') + b'\0' return make_request(b'EXEC', body=body) ### USB or serial port communication class Communication(object): def read(self, n): raise NotImplementedError def write(self, data): raise NotImplementedError def close(self): raise NotImplementedError def reset(self): """Tries to consume all outstanding incoming data.""" raise NotImplementedError def call(self, payload): """Sends a command and returns its response.""" validate_message(payload) self.write(payload) header = self.read(0x20) validate_message(header, ignore_crc=True) cmd = header[0:4] size = struct.unpack_from('= 3: self.f = open(file_path, 'r+b', buffering=0) else: self.f = open(file_path, 'r+b') def read(self, n): return self.f.read(n) def write(self, data): self.f.write(data) def close(self): self.f.close() def reset(self): # TODO non-blocking read _logger.warn("Reset is not implemented yet") class USBCommunication(Communication): EP_IN = 0x85 EP_OUT = 3 # Read timeout. Set to 0 to disable timeouts READ_TIMEOUT_MS = 0 def __init__(self): self.read_buffer = b'' self.usbdev = usb.core.find(idVendor=0x1004, idProduct=0x633e) if self.usbdev is None: raise RuntimeError("USB device not found") def read(self, n): while len(self.read_buffer) < n: buff = self._read_chunk(self.READ_TIMEOUT_MS) self.read_buffer += buff if not buff: raise EOFError data, self.read_buffer = self.read_buffer[0:n], self.read_buffer[n:] return data def _read_chunk(self, timeout): # device seems to use 16 KiB buffers. return self.usbdev.read(self.EP_IN, 2**14, timeout=timeout) def write(self, data): # Reset read buffer for response if self.read_buffer: _logger.warn('non-empty read buffer %r', self.read_buffer) self.read_buffer = b'' self.usbdev.write(self.EP_OUT, data) def close(self): usb.util.dispose_resources(self.usbdev) def reset(self): # TODO: do some handshake and only if the response is unexpected, drain # the queue and try again self.read_buffer = b'' try: while True: junk = bytes(self._read_chunk(timeout=100)) _logger.debug("Ignoring: %r", junk) except usb.core.USBError as e: # Ignore ETIMEDOUT (110) if e.errno != 110: raise ### Interactive loop def get_commands(command): if command: yield command return # Happened on Win32/Py3.4.4 when: echo ls | lglaf.py --serial com4 if sys.stdin is None: raise RuntimeError('No console input available!') if sys.stdin.isatty(): print("LGLAF.py by Peter Wu (https://lekensteyn.nl/lglaf)\n" "Type a shell command to execute or \"exit\" to leave.", file=sys.stderr) prompt = '# ' else: prompt = '' try: while True: line = input(prompt) if line == "exit": break if line: yield line except EOFError: if prompt: print("", file=sys.stderr) def command_to_payload(command): # Handle '!' as special commands, treat others as shell command if command[0] != '!': return make_exec_request(command) command = command[1:] # !command [arg1[,arg2[,arg3[,arg4]]]] [body] # args are treated as integers (decimal or hex) # body is treated as string (escape sequences are supported) command, args, body = (command.split(' ', 2) + ['', ''])[0:3] command = text_unescape(command) args = list(map(parse_number_or_escape, args.split(',') + [0, 0, 0]))[0:4] body = text_unescape(body) return make_request(command, args, body) parser = argparse.ArgumentParser(description='LG LAF Download Mode utility') parser.add_argument("-c", "--command", help='Shell command to execute') parser.add_argument("--serial", metavar="PATH", dest="serial_path", help="Path to serial device (e.g. COM4).") parser.add_argument("--debug", action='store_true', help="Enable debug messages") def main(): args = parser.parse_args() logging.basicConfig(format='%(name)s: %(levelname)s: %(message)s', level=logging.DEBUG if args.debug else logging.INFO) # Binary stdout (output data from device as-is) try: stdout_bin = sys.stdout.buffer except: stdout_bin = sys.stdout if args.serial_path: comm = FileCommunication(args.serial_path) else: if 'usb.core' not in sys.modules: raise RuntimeError("Please install PyUSB for USB support") comm = USBCommunication() with closing(comm): comm.reset() for command in get_commands(args.command): try: payload = command_to_payload(command) header, response = comm.call(payload) # For debugging, print header if command[0] == '!': _logger.debug('Header: %s', ' '.join(str(header[i:i+4]).replace("\\x00", "\\0") for i in range(0, len(header), 4))) stdout_bin.write(response) except Exception as e: _logger.warn(e) if args.debug: import traceback; traceback.print_exc() if __name__ == '__main__': main()