diff --git a/lglaf.py b/lglaf.py new file mode 100755 index 0000000..1d98112 --- /dev/null +++ b/lglaf.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python +# +# Interactive shell for communication with LG devices in download mode (LAF). +# +# Copyright (C) 2015 Peter Wu <peter@lekensteyn.nl> +# Licensed under the MIT license <http://opensource.org/licenses/MIT>. + +from __future__ import print_function +from contextlib import closing +import argparse, logging, re, struct, sys + +# Enhanced prompt with history +try: import readline +except ImportError: pass +# Try USB interface +try: import usb.core +except ImportError: pass + +_logger = logging.getLogger("LGLAF.py") + +# Python 2/3 compat +try: input = raw_input +except: pass +if '\0' == b'\0': int_as_byte = chr +else: int_as_byte = lambda x: bytes([x]) + +_ESCAPE_PATTERN = re.compile(b'''\\\\( +x[0-9a-fA-F]{2} | +[0-7]{1,3} | +.)''', re.VERBOSE) +_ESCAPE_MAP = { + b'n': b'\n', + b'r': b'\r', + b't': b'\t', +} +_ESCAPED_CHARS = b'"\\\'' +def text_unescape(text): + """Converts a string with escape sequences to bytes.""" + text_bin = text.encode("utf8") + def sub_char(m): + what = m.group(1) + if what[0:1] == b'x' and len(what) == 3: + return int_as_byte(int(what[1:], 16)) + elif what[0:1] in b'01234567': + return int_as_byte(int(what, 8)) + elif what in _ESCAPE_MAP: + return _ESCAPE_MAP[what] + elif what in _ESCAPED_CHARS: + return what + else: + raise RuntimeError('Unknown escape sequence \\%s' % + what.decode('utf8')) + return re.sub(_ESCAPE_PATTERN, sub_char, text_bin) + +def parse_number_or_escape(text): + try: + return int(text, 0) if text else 0 + except ValueError: + return text_unescape(text) + +### Protocol-related stuff + +def crc16(data): + """CRC-16-CCITT computation with LSB-first and inversion.""" + crc = 0xffff + for byte in data: + crc ^= byte + for bits in range(8): + if crc & 1: + crc = (crc >> 1) ^ 0x8408 + else: + crc >>= 1 + return crc ^ 0xffff + +def invert_dword(dword_bin): + dword = struct.unpack("I", dword_bin)[0] + return struct.pack("I", dword ^ 0xffffffff) + +def make_request(cmd, args=[], body=b''): + if not isinstance(cmd, bytes): + cmd = cmd.encode('ascii') + assert isinstance(body, bytes) + + # Header: command, args, ... body size, header crc16, inverted command + header = bytearray(0x20) + def set_header(offset, val): + if isinstance(val, int): + val = struct.pack('<I', val) + assert len(val) == 4 + header[offset:offset+4] = val + + set_header(0, cmd) + assert len(args) <= 4 + for i, arg in enumerate(args): + set_header(4 * (i + 1), arg) + + # 0x14: body length + set_header(0x14, len(body)) + # 0x1c: Inverted command + set_header(0x1c, invert_dword(cmd)) + # Header finished (with CRC placeholder), append body... + header += body + # finish with CRC for header and body + set_header(0x18, crc16(header)) + return bytes(header) + +def validate_message(payload, ignore_crc=False): + if len(payload) < 0x20: + raise RuntimeError("Invalid header length: %d" % len(payload)) + if not ignore_crc: + crc = struct.unpack_from('<I', payload, 0x18)[0] + payload_before_crc = bytearray(payload) + payload_before_crc[0x18:0x18+4] = b'\0\0\0\0' + crc_exp = crc16(payload_before_crc) + if crc_exp != crc: + raise RuntimeError("Expected CRC %04x, found %04x" % (crc_exp, crc)) + tail_exp = invert_dword(payload[0:4]) + tail = payload[0x1c:0x1c+4] + if tail_exp != tail: + raise RuntimeError("Expected trailer %r, found %r" % (tail_exp, tail)) + +def make_exec_request(shell_command): + # Allow use of shell constructs such as piping. Needs more work not to eat + # all repetitive spaces, it should also escape some things... + body = b'sh -c "$@" -- eval 2>&1 ' + body += shell_command.encode('ascii') + b'\0' + return make_request(b'EXEC', body=body) + + +### USB or serial port communication + +class Communication(object): + def read(self, n): + raise NotImplementedError + def write(self, data): + raise NotImplementedError + def close(self): + raise NotImplementedError + def reset(self): + """Tries to consume all outstanding incoming data.""" + raise NotImplementedError + def call(self, payload): + """Sends a command and returns its response.""" + validate_message(payload) + self.write(payload) + header = self.read(0x20) + validate_message(header, ignore_crc=True) + cmd = header[0:4] + size = struct.unpack_from('<I', header, 0x14)[0] + # could validate CRC and inverted command here... + data = self.read(size) if size else b'' + if cmd == b'FAIL': + errCode = struct.unpack_from('<I', header, 4) + raise RuntimeError('Command failed with error code %#x' % errCode) + if cmd != payload[0:4]: + raise RuntimeError("Unexpected response: %r" % header) + return header, data + +class FileCommunication(Communication): + def __init__(self, file_path): + if sys.version_info[0] >= 3: + self.f = open(file_path, 'r+b', buffering=0) + else: + self.f = open(file_path, 'r+b') + def read(self, n): + return self.f.read(n) + def write(self, data): + self.f.write(data) + def close(self): + self.f.close() + def reset(self): + # TODO non-blocking read + _logger.warn("Reset is not implemented yet") + +class USBCommunication(Communication): + EP_IN = 0x85 + EP_OUT = 3 + # Read timeout. Set to 0 to disable timeouts + READ_TIMEOUT_MS = 0 + def __init__(self): + self.read_buffer = b'' + self.usbdev = usb.core.find(idVendor=0x1004, idProduct=0x633e) + if self.usbdev is None: + raise RuntimeError("USB device not found") + def read(self, n): + while len(self.read_buffer) < n: + buff = self._read_chunk(self.READ_TIMEOUT_MS) + self.read_buffer += buff + if not buff: + raise EOFError + data, self.read_buffer = self.read_buffer[0:n], self.read_buffer[n:] + return data + def _read_chunk(self, timeout): + # device seems to use 16 KiB buffers. + return self.usbdev.read(self.EP_IN, 2**14, timeout=timeout) + def write(self, data): + # Reset read buffer for response + if self.read_buffer: + _logger.warn('non-empty read buffer %r', self.read_buffer) + self.read_buffer = b'' + self.usbdev.write(self.EP_OUT, data) + def close(self): + usb.util.dispose_resources(self.usbdev) + def reset(self): + # TODO: do some handshake and only if the response is unexpected, drain + # the queue and try again + self.read_buffer = b'' + try: + while True: + junk = bytes(self._read_chunk(timeout=100)) + _logger.debug("Ignoring: %r", junk) + except usb.core.USBError as e: + # Ignore ETIMEDOUT (110) + if e.errno != 110: + raise + + +### Interactive loop + +def get_commands(): + if sys.stdin.isatty(): + print("LGLAF.py by Peter Wu (https://lekensteyn.nl/lglaf)\n" + "Type a shell command to execute or \"exit\" to leave.", + file=sys.stderr) + prompt = '# ' + else: + prompt = '' + try: + while True: + line = input(prompt) + if line == "exit": + break + if line: + yield line + except EOFError: + if prompt: + print("", file=sys.stderr) + +def command_to_payload(command): + # Handle '!' as special commands, treat others as shell command + if command[0] != '!': + return make_exec_request(command) + command = command[1:] + # !command [arg1[,arg2[,arg3[,arg4]]]] [body] + # args are treated as integers (decimal or hex) + # body is treated as string (escape sequences are supported) + command, args, body = (command.split(' ', 2) + ['', ''])[0:3] + command = text_unescape(command) + args = list(map(parse_number_or_escape, args.split(',') + [0, 0, 0]))[0:4] + body = text_unescape(body) + return make_request(command, args, body) + +parser = argparse.ArgumentParser(description='LG LAF Download Mode utility') +parser.add_argument("--serial", metavar="PATH", dest="serial_path", + help="Path to serial device (e.g. COM4).") +parser.add_argument("--debug", action='store_true', help="Enable debug messages") + +def main(): + args = parser.parse_args() + logging.basicConfig(format='%(name)s: %(levelname)s: %(message)s', + level=logging.DEBUG if args.debug else logging.INFO) + + # Binary stdout (output data from device as-is) + try: stdout_bin = sys.stdout.buffer + except: stdout_bin = sys.stdout + + if args.serial_path: + comm = FileCommunication(args.serial_path) + else: + if 'usb.core' not in sys.modules: + raise RuntimeError("Please install PyUSB for USB support") + comm = USBCommunication() + + with closing(comm): + comm.reset() + for command in get_commands(): + try: + payload = command_to_payload(command) + header, response = comm.call(payload) + # For debugging, print header + if command[0] == '!': + _logger.debug('Header: %s', + ' '.join(str(header[i:i+4]).replace("\\x00", "\\0") + for i in range(0, len(header), 4))) + stdout_bin.write(response) + except Exception as e: + _logger.warn(e) + if args.debug: + import traceback; traceback.print_exc() + +if __name__ == '__main__': + main()