mirror of
https://github.com/Lekensteyn/lglaf.git
synced 2025-03-31 23:29:55 -04:00
lglaf.py: Initial commit
Tested on Linux only.
This commit is contained in:
parent
c88a4289e7
commit
d5b3ae382f
1 changed files with 292 additions and 0 deletions
292
lglaf.py
Executable file
292
lglaf.py
Executable file
|
@ -0,0 +1,292 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# Interactive shell for communication with LG devices in download mode (LAF).
|
||||
#
|
||||
# Copyright (C) 2015 Peter Wu <peter@lekensteyn.nl>
|
||||
# Licensed under the MIT license <http://opensource.org/licenses/MIT>.
|
||||
|
||||
from __future__ import print_function
|
||||
from contextlib import closing
|
||||
import argparse, logging, re, struct, sys
|
||||
|
||||
# Enhanced prompt with history
|
||||
try: import readline
|
||||
except ImportError: pass
|
||||
# Try USB interface
|
||||
try: import usb.core
|
||||
except ImportError: pass
|
||||
|
||||
_logger = logging.getLogger("LGLAF.py")
|
||||
|
||||
# Python 2/3 compat
|
||||
try: input = raw_input
|
||||
except: pass
|
||||
if '\0' == b'\0': int_as_byte = chr
|
||||
else: int_as_byte = lambda x: bytes([x])
|
||||
|
||||
_ESCAPE_PATTERN = re.compile(b'''\\\\(
|
||||
x[0-9a-fA-F]{2} |
|
||||
[0-7]{1,3} |
|
||||
.)''', re.VERBOSE)
|
||||
_ESCAPE_MAP = {
|
||||
b'n': b'\n',
|
||||
b'r': b'\r',
|
||||
b't': b'\t',
|
||||
}
|
||||
_ESCAPED_CHARS = b'"\\\''
|
||||
def text_unescape(text):
|
||||
"""Converts a string with escape sequences to bytes."""
|
||||
text_bin = text.encode("utf8")
|
||||
def sub_char(m):
|
||||
what = m.group(1)
|
||||
if what[0:1] == b'x' and len(what) == 3:
|
||||
return int_as_byte(int(what[1:], 16))
|
||||
elif what[0:1] in b'01234567':
|
||||
return int_as_byte(int(what, 8))
|
||||
elif what in _ESCAPE_MAP:
|
||||
return _ESCAPE_MAP[what]
|
||||
elif what in _ESCAPED_CHARS:
|
||||
return what
|
||||
else:
|
||||
raise RuntimeError('Unknown escape sequence \\%s' %
|
||||
what.decode('utf8'))
|
||||
return re.sub(_ESCAPE_PATTERN, sub_char, text_bin)
|
||||
|
||||
def parse_number_or_escape(text):
|
||||
try:
|
||||
return int(text, 0) if text else 0
|
||||
except ValueError:
|
||||
return text_unescape(text)
|
||||
|
||||
### Protocol-related stuff
|
||||
|
||||
def crc16(data):
|
||||
"""CRC-16-CCITT computation with LSB-first and inversion."""
|
||||
crc = 0xffff
|
||||
for byte in data:
|
||||
crc ^= byte
|
||||
for bits in range(8):
|
||||
if crc & 1:
|
||||
crc = (crc >> 1) ^ 0x8408
|
||||
else:
|
||||
crc >>= 1
|
||||
return crc ^ 0xffff
|
||||
|
||||
def invert_dword(dword_bin):
|
||||
dword = struct.unpack("I", dword_bin)[0]
|
||||
return struct.pack("I", dword ^ 0xffffffff)
|
||||
|
||||
def make_request(cmd, args=[], body=b''):
|
||||
if not isinstance(cmd, bytes):
|
||||
cmd = cmd.encode('ascii')
|
||||
assert isinstance(body, bytes)
|
||||
|
||||
# Header: command, args, ... body size, header crc16, inverted command
|
||||
header = bytearray(0x20)
|
||||
def set_header(offset, val):
|
||||
if isinstance(val, int):
|
||||
val = struct.pack('<I', val)
|
||||
assert len(val) == 4
|
||||
header[offset:offset+4] = val
|
||||
|
||||
set_header(0, cmd)
|
||||
assert len(args) <= 4
|
||||
for i, arg in enumerate(args):
|
||||
set_header(4 * (i + 1), arg)
|
||||
|
||||
# 0x14: body length
|
||||
set_header(0x14, len(body))
|
||||
# 0x1c: Inverted command
|
||||
set_header(0x1c, invert_dword(cmd))
|
||||
# Header finished (with CRC placeholder), append body...
|
||||
header += body
|
||||
# finish with CRC for header and body
|
||||
set_header(0x18, crc16(header))
|
||||
return bytes(header)
|
||||
|
||||
def validate_message(payload, ignore_crc=False):
|
||||
if len(payload) < 0x20:
|
||||
raise RuntimeError("Invalid header length: %d" % len(payload))
|
||||
if not ignore_crc:
|
||||
crc = struct.unpack_from('<I', payload, 0x18)[0]
|
||||
payload_before_crc = bytearray(payload)
|
||||
payload_before_crc[0x18:0x18+4] = b'\0\0\0\0'
|
||||
crc_exp = crc16(payload_before_crc)
|
||||
if crc_exp != crc:
|
||||
raise RuntimeError("Expected CRC %04x, found %04x" % (crc_exp, crc))
|
||||
tail_exp = invert_dword(payload[0:4])
|
||||
tail = payload[0x1c:0x1c+4]
|
||||
if tail_exp != tail:
|
||||
raise RuntimeError("Expected trailer %r, found %r" % (tail_exp, tail))
|
||||
|
||||
def make_exec_request(shell_command):
|
||||
# Allow use of shell constructs such as piping. Needs more work not to eat
|
||||
# all repetitive spaces, it should also escape some things...
|
||||
body = b'sh -c "$@" -- eval 2>&1 '
|
||||
body += shell_command.encode('ascii') + b'\0'
|
||||
return make_request(b'EXEC', body=body)
|
||||
|
||||
|
||||
### USB or serial port communication
|
||||
|
||||
class Communication(object):
|
||||
def read(self, n):
|
||||
raise NotImplementedError
|
||||
def write(self, data):
|
||||
raise NotImplementedError
|
||||
def close(self):
|
||||
raise NotImplementedError
|
||||
def reset(self):
|
||||
"""Tries to consume all outstanding incoming data."""
|
||||
raise NotImplementedError
|
||||
def call(self, payload):
|
||||
"""Sends a command and returns its response."""
|
||||
validate_message(payload)
|
||||
self.write(payload)
|
||||
header = self.read(0x20)
|
||||
validate_message(header, ignore_crc=True)
|
||||
cmd = header[0:4]
|
||||
size = struct.unpack_from('<I', header, 0x14)[0]
|
||||
# could validate CRC and inverted command here...
|
||||
data = self.read(size) if size else b''
|
||||
if cmd == b'FAIL':
|
||||
errCode = struct.unpack_from('<I', header, 4)
|
||||
raise RuntimeError('Command failed with error code %#x' % errCode)
|
||||
if cmd != payload[0:4]:
|
||||
raise RuntimeError("Unexpected response: %r" % header)
|
||||
return header, data
|
||||
|
||||
class FileCommunication(Communication):
|
||||
def __init__(self, file_path):
|
||||
if sys.version_info[0] >= 3:
|
||||
self.f = open(file_path, 'r+b', buffering=0)
|
||||
else:
|
||||
self.f = open(file_path, 'r+b')
|
||||
def read(self, n):
|
||||
return self.f.read(n)
|
||||
def write(self, data):
|
||||
self.f.write(data)
|
||||
def close(self):
|
||||
self.f.close()
|
||||
def reset(self):
|
||||
# TODO non-blocking read
|
||||
_logger.warn("Reset is not implemented yet")
|
||||
|
||||
class USBCommunication(Communication):
|
||||
EP_IN = 0x85
|
||||
EP_OUT = 3
|
||||
# Read timeout. Set to 0 to disable timeouts
|
||||
READ_TIMEOUT_MS = 0
|
||||
def __init__(self):
|
||||
self.read_buffer = b''
|
||||
self.usbdev = usb.core.find(idVendor=0x1004, idProduct=0x633e)
|
||||
if self.usbdev is None:
|
||||
raise RuntimeError("USB device not found")
|
||||
def read(self, n):
|
||||
while len(self.read_buffer) < n:
|
||||
buff = self._read_chunk(self.READ_TIMEOUT_MS)
|
||||
self.read_buffer += buff
|
||||
if not buff:
|
||||
raise EOFError
|
||||
data, self.read_buffer = self.read_buffer[0:n], self.read_buffer[n:]
|
||||
return data
|
||||
def _read_chunk(self, timeout):
|
||||
# device seems to use 16 KiB buffers.
|
||||
return self.usbdev.read(self.EP_IN, 2**14, timeout=timeout)
|
||||
def write(self, data):
|
||||
# Reset read buffer for response
|
||||
if self.read_buffer:
|
||||
_logger.warn('non-empty read buffer %r', self.read_buffer)
|
||||
self.read_buffer = b''
|
||||
self.usbdev.write(self.EP_OUT, data)
|
||||
def close(self):
|
||||
usb.util.dispose_resources(self.usbdev)
|
||||
def reset(self):
|
||||
# TODO: do some handshake and only if the response is unexpected, drain
|
||||
# the queue and try again
|
||||
self.read_buffer = b''
|
||||
try:
|
||||
while True:
|
||||
junk = bytes(self._read_chunk(timeout=100))
|
||||
_logger.debug("Ignoring: %r", junk)
|
||||
except usb.core.USBError as e:
|
||||
# Ignore ETIMEDOUT (110)
|
||||
if e.errno != 110:
|
||||
raise
|
||||
|
||||
|
||||
### Interactive loop
|
||||
|
||||
def get_commands():
|
||||
if sys.stdin.isatty():
|
||||
print("LGLAF.py by Peter Wu (https://lekensteyn.nl/lglaf)\n"
|
||||
"Type a shell command to execute or \"exit\" to leave.",
|
||||
file=sys.stderr)
|
||||
prompt = '# '
|
||||
else:
|
||||
prompt = ''
|
||||
try:
|
||||
while True:
|
||||
line = input(prompt)
|
||||
if line == "exit":
|
||||
break
|
||||
if line:
|
||||
yield line
|
||||
except EOFError:
|
||||
if prompt:
|
||||
print("", file=sys.stderr)
|
||||
|
||||
def command_to_payload(command):
|
||||
# Handle '!' as special commands, treat others as shell command
|
||||
if command[0] != '!':
|
||||
return make_exec_request(command)
|
||||
command = command[1:]
|
||||
# !command [arg1[,arg2[,arg3[,arg4]]]] [body]
|
||||
# args are treated as integers (decimal or hex)
|
||||
# body is treated as string (escape sequences are supported)
|
||||
command, args, body = (command.split(' ', 2) + ['', ''])[0:3]
|
||||
command = text_unescape(command)
|
||||
args = list(map(parse_number_or_escape, args.split(',') + [0, 0, 0]))[0:4]
|
||||
body = text_unescape(body)
|
||||
return make_request(command, args, body)
|
||||
|
||||
parser = argparse.ArgumentParser(description='LG LAF Download Mode utility')
|
||||
parser.add_argument("--serial", metavar="PATH", dest="serial_path",
|
||||
help="Path to serial device (e.g. COM4).")
|
||||
parser.add_argument("--debug", action='store_true', help="Enable debug messages")
|
||||
|
||||
def main():
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(format='%(name)s: %(levelname)s: %(message)s',
|
||||
level=logging.DEBUG if args.debug else logging.INFO)
|
||||
|
||||
# Binary stdout (output data from device as-is)
|
||||
try: stdout_bin = sys.stdout.buffer
|
||||
except: stdout_bin = sys.stdout
|
||||
|
||||
if args.serial_path:
|
||||
comm = FileCommunication(args.serial_path)
|
||||
else:
|
||||
if 'usb.core' not in sys.modules:
|
||||
raise RuntimeError("Please install PyUSB for USB support")
|
||||
comm = USBCommunication()
|
||||
|
||||
with closing(comm):
|
||||
comm.reset()
|
||||
for command in get_commands():
|
||||
try:
|
||||
payload = command_to_payload(command)
|
||||
header, response = comm.call(payload)
|
||||
# For debugging, print header
|
||||
if command[0] == '!':
|
||||
_logger.debug('Header: %s',
|
||||
' '.join(str(header[i:i+4]).replace("\\x00", "\\0")
|
||||
for i in range(0, len(header), 4)))
|
||||
stdout_bin.write(response)
|
||||
except Exception as e:
|
||||
_logger.warn(e)
|
||||
if args.debug:
|
||||
import traceback; traceback.print_exc()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Add table
Reference in a new issue