2015-12-24 20:03:07 +01:00
#!/usr/bin/env python
#
# Interactive shell for communication with LG devices in download mode (LAF).
#
# Copyright (C) 2015 Peter Wu <peter@lekensteyn.nl>
# Licensed under the MIT license <http://opensource.org/licenses/MIT>.
from __future__ import print_function
from contextlib import closing
import argparse , logging , re , struct , sys
# Enhanced prompt with history
try : import readline
except ImportError : pass
# Try USB interface
2015-12-29 11:38:38 +01:00
try : import usb . core , usb . util
2015-12-24 20:03:07 +01:00
except ImportError : pass
2015-12-24 23:58:34 +01:00
# Windows registry for serial port detection
try : import winreg
except ImportError :
try : import _winreg as winreg
except ImportError : winreg = None
2015-12-24 20:03:07 +01:00
_logger = logging . getLogger ( " LGLAF.py " )
# Python 2/3 compat
try : input = raw_input
except : pass
if ' \0 ' == b ' \0 ' : int_as_byte = chr
else : int_as_byte = lambda x : bytes ( [ x ] )
_ESCAPE_PATTERN = re . compile ( b ''' \\ \\ (
x [ 0 - 9 a - fA - F ] { 2 } |
[ 0 - 7 ] { 1 , 3 } |
. ) ''' , re.VERBOSE)
_ESCAPE_MAP = {
b ' n ' : b ' \n ' ,
b ' r ' : b ' \r ' ,
b ' t ' : b ' \t ' ,
}
_ESCAPED_CHARS = b ' " \\ \' '
def text_unescape ( text ) :
""" Converts a string with escape sequences to bytes. """
text_bin = text . encode ( " utf8 " )
def sub_char ( m ) :
what = m . group ( 1 )
if what [ 0 : 1 ] == b ' x ' and len ( what ) == 3 :
return int_as_byte ( int ( what [ 1 : ] , 16 ) )
elif what [ 0 : 1 ] in b ' 01234567 ' :
return int_as_byte ( int ( what , 8 ) )
elif what in _ESCAPE_MAP :
return _ESCAPE_MAP [ what ]
elif what in _ESCAPED_CHARS :
return what
else :
raise RuntimeError ( ' Unknown escape sequence \\ %s ' %
what . decode ( ' utf8 ' ) )
return re . sub ( _ESCAPE_PATTERN , sub_char , text_bin )
def parse_number_or_escape ( text ) :
try :
return int ( text , 0 ) if text else 0
except ValueError :
return text_unescape ( text )
### Protocol-related stuff
def crc16 ( data ) :
""" CRC-16-CCITT computation with LSB-first and inversion. """
crc = 0xffff
for byte in data :
crc ^ = byte
for bits in range ( 8 ) :
if crc & 1 :
crc = ( crc >> 1 ) ^ 0x8408
else :
crc >> = 1
return crc ^ 0xffff
def invert_dword ( dword_bin ) :
dword = struct . unpack ( " I " , dword_bin ) [ 0 ]
return struct . pack ( " I " , dword ^ 0xffffffff )
def make_request ( cmd , args = [ ] , body = b ' ' ) :
if not isinstance ( cmd , bytes ) :
cmd = cmd . encode ( ' ascii ' )
2015-12-25 15:48:30 +01:00
assert isinstance ( body , bytes ) , " body must be bytes "
2015-12-24 20:03:07 +01:00
# Header: command, args, ... body size, header crc16, inverted command
header = bytearray ( 0x20 )
def set_header ( offset , val ) :
if isinstance ( val , int ) :
val = struct . pack ( ' <I ' , val )
2015-12-26 01:09:07 +01:00
assert len ( val ) == 4 , " Header field requires a DWORD, got %s %r " % \
( type ( val ) . __name__ , val )
2015-12-24 20:03:07 +01:00
header [ offset : offset + 4 ] = val
set_header ( 0 , cmd )
2015-12-25 15:48:30 +01:00
assert len ( args ) < = 4 , " Header cannot have more than 4 arguments "
2015-12-24 20:03:07 +01:00
for i , arg in enumerate ( args ) :
set_header ( 4 * ( i + 1 ) , arg )
# 0x14: body length
set_header ( 0x14 , len ( body ) )
# 0x1c: Inverted command
set_header ( 0x1c , invert_dword ( cmd ) )
# Header finished (with CRC placeholder), append body...
header + = body
# finish with CRC for header and body
set_header ( 0x18 , crc16 ( header ) )
return bytes ( header )
def validate_message ( payload , ignore_crc = False ) :
if len ( payload ) < 0x20 :
raise RuntimeError ( " Invalid header length: %d " % len ( payload ) )
if not ignore_crc :
crc = struct . unpack_from ( ' <I ' , payload , 0x18 ) [ 0 ]
payload_before_crc = bytearray ( payload )
payload_before_crc [ 0x18 : 0x18 + 4 ] = b ' \0 \0 \0 \0 '
crc_exp = crc16 ( payload_before_crc )
if crc_exp != crc :
raise RuntimeError ( " Expected CRC %04x , found %04x " % ( crc_exp , crc ) )
tail_exp = invert_dword ( payload [ 0 : 4 ] )
tail = payload [ 0x1c : 0x1c + 4 ]
if tail_exp != tail :
raise RuntimeError ( " Expected trailer %r , found %r " % ( tail_exp , tail ) )
def make_exec_request ( shell_command ) :
2016-01-04 13:03:30 +01:00
# Allow use of shell constructs such as piping and reports syntax errors
# such as unterminated quotes. Remaining limitation: repetitive spaces are
# still eaten.
argv = b ' sh -c eval \t " $* " </dev/null \t 2>&1 -- '
argv + = shell_command . encode ( ' ascii ' )
if len ( argv ) > 255 :
raise RuntimeError ( " Command length %d is larger than 255 " % len ( argv ) )
return make_request ( b ' EXEC ' , body = argv + b ' \0 ' )
2015-12-24 20:03:07 +01:00
### USB or serial port communication
class Communication ( object ) :
2015-12-25 15:55:15 +01:00
def __init__ ( self ) :
self . read_buffer = b ' '
2015-12-29 18:40:46 +01:00
def read ( self , n , timeout = None ) :
2015-12-25 15:55:15 +01:00
""" Reads exactly n bytes. """
need = n - len ( self . read_buffer )
while need > 0 :
2015-12-29 18:40:46 +01:00
buff = self . _read ( need , timeout = timeout )
2015-12-25 15:55:15 +01:00
self . read_buffer + = buff
if not buff :
raise EOFError
need - = len ( buff )
data , self . read_buffer = self . read_buffer [ 0 : n ] , self . read_buffer [ n : ]
return data
2015-12-29 18:40:46 +01:00
def _read ( self , n , timeout = None ) :
2015-12-25 15:55:15 +01:00
""" Try one read, possibly returning less or more than n bytes. """
2015-12-24 20:03:07 +01:00
raise NotImplementedError
def write ( self , data ) :
raise NotImplementedError
def close ( self ) :
raise NotImplementedError
def reset ( self ) :
2015-12-25 15:55:15 +01:00
self . read_buffer = b ' '
2015-12-24 20:03:07 +01:00
def call ( self , payload ) :
""" Sends a command and returns its response. """
validate_message ( payload )
self . write ( payload )
header = self . read ( 0x20 )
validate_message ( header , ignore_crc = True )
cmd = header [ 0 : 4 ]
size = struct . unpack_from ( ' <I ' , header , 0x14 ) [ 0 ]
# could validate CRC and inverted command here...
data = self . read ( size ) if size else b ' '
if cmd == b ' FAIL ' :
errCode = struct . unpack_from ( ' <I ' , header , 4 )
raise RuntimeError ( ' Command failed with error code %#x ' % errCode )
if cmd != payload [ 0 : 4 ] :
raise RuntimeError ( " Unexpected response: %r " % header )
return header , data
class FileCommunication ( Communication ) :
def __init__ ( self , file_path ) :
2015-12-25 15:55:15 +01:00
super ( FileCommunication , self ) . __init__ ( )
2015-12-24 20:03:07 +01:00
if sys . version_info [ 0 ] > = 3 :
self . f = open ( file_path , ' r+b ' , buffering = 0 )
else :
self . f = open ( file_path , ' r+b ' )
2015-12-29 18:40:46 +01:00
def _read ( self , n , timeout = None ) :
2015-12-24 20:03:07 +01:00
return self . f . read ( n )
def write ( self , data ) :
self . f . write ( data )
def close ( self ) :
self . f . close ( )
class USBCommunication ( Communication ) :
EP_IN = 0x85
EP_OUT = 3
2015-12-29 11:38:38 +01:00
VENDOR_ID_LG = 0x1004
2015-12-24 20:03:07 +01:00
# Read timeout. Set to 0 to disable timeouts
2015-12-29 18:40:46 +01:00
READ_TIMEOUT_MS = 60000
2015-12-24 20:03:07 +01:00
def __init__ ( self ) :
2015-12-25 15:55:15 +01:00
super ( USBCommunication , self ) . __init__ ( )
2015-12-29 11:38:38 +01:00
# Match device using heuristics on the interface/endpoint descriptors,
# this avoids hardcoding idProduct.
self . usbdev = usb . core . find ( idVendor = self . VENDOR_ID_LG ,
custom_match = self . _match_device )
2015-12-24 20:03:07 +01:00
if self . usbdev is None :
raise RuntimeError ( " USB device not found " )
2015-12-29 18:43:22 +01:00
cfg = usb . util . find_descriptor ( self . usbdev ,
custom_match = self . _match_configuration )
current_cfg = self . usbdev . get_active_configuration ( )
if cfg . bConfigurationValue != current_cfg . bConfigurationValue :
try :
cfg . set ( )
except usb . core . USBError as e :
_logger . warning ( " Failed to set configuration, "
" has a kernel driver claimed the interface? " )
raise e
2015-12-29 11:38:38 +01:00
def _match_device ( self , device ) :
return any (
usb . util . find_descriptor ( cfg , bInterfaceClass = 255 ,
bInterfaceSubClass = 255 , bInterfaceProtocol = 255 ,
custom_match = self . _match_interface )
for cfg in device
)
def _match_interface ( self , intf ) :
return intf . bNumEndpoints == 2 and all (
ep . bEndpointAddress in ( self . EP_IN , self . EP_OUT ) and
usb . util . endpoint_type ( ep . bmAttributes ) ==
usb . util . ENDPOINT_TYPE_BULK
for ep in intf
)
2015-12-29 18:43:22 +01:00
def _match_configuration ( self , config ) :
return usb . util . find_descriptor ( config ,
custom_match = self . _match_interface )
2016-01-03 22:46:15 +01:00
def _read ( self , n , timeout = None ) :
if timeout is None :
timeout = self . READ_TIMEOUT_MS
2015-12-24 20:03:07 +01:00
# device seems to use 16 KiB buffers.
2015-12-29 18:40:46 +01:00
array = self . usbdev . read ( self . EP_IN , 2 * * 14 , timeout = timeout )
2015-12-26 22:03:49 +01:00
try : return array . tobytes ( )
except : return array . tostring ( )
2015-12-24 20:03:07 +01:00
def write ( self , data ) :
# Reset read buffer for response
if self . read_buffer :
_logger . warn ( ' non-empty read buffer %r ' , self . read_buffer )
self . read_buffer = b ' '
self . usbdev . write ( self . EP_OUT , data )
def close ( self ) :
usb . util . dispose_resources ( self . usbdev )
2015-12-25 15:55:15 +01:00
def try_hello ( comm ) :
"""
Tests whether the device speaks the expected protocol . If desynchronization
is detected , tries to read as much data as possible .
"""
2015-12-29 18:40:46 +01:00
# Wait for at most 5 seconds for a response... it shouldn't take that long
# and otherwise something is wrong.
HELLO_READ_TIMEOUT = 5000
2015-12-25 15:55:15 +01:00
hello_request = make_request ( b ' HELO ' , args = [ b ' \1 \0 \0 \1 ' ] )
comm . write ( hello_request )
2015-12-29 18:40:46 +01:00
data = comm . read ( 0x20 , timeout = HELLO_READ_TIMEOUT )
2015-12-25 15:55:15 +01:00
if data [ 0 : 4 ] != b ' HELO ' :
# Unexpected response, maybe some stale data from a previous execution?
while data [ 0 : 4 ] != b ' HELO ' :
try :
validate_message ( data , ignore_crc = True )
size = struct . unpack_from ( ' <I ' , data , 0x14 ) [ 0 ]
2015-12-29 18:40:46 +01:00
comm . read ( size , timeout = HELLO_READ_TIMEOUT )
2015-12-25 15:55:15 +01:00
except RuntimeError : pass
# Flush read buffer
comm . reset ( )
2015-12-29 18:40:46 +01:00
data = comm . read ( 0x20 , timeout = HELLO_READ_TIMEOUT )
2015-12-25 15:55:15 +01:00
# Just to be sure, send another HELO request.
comm . call ( hello_request )
2015-12-24 20:03:07 +01:00
2015-12-24 23:58:34 +01:00
def detect_serial_path ( ) :
try :
path = r ' HARDWARE \ DEVICEMAP \ SERIALCOMM '
with winreg . OpenKey ( winreg . HKEY_LOCAL_MACHINE , path ) as key :
2015-12-30 10:34:23 +01:00
for i in range ( winreg . QueryInfoKey ( key ) [ 1 ] ) :
name , value , value_type = winreg . EnumValue ( key , i )
# match both \Device\LGANDNETDIAG1 and \Device\LGVZANDNETDIAG1
name = name . upper ( )
if name . startswith ( r ' \ DEVICE \ LG ' ) and name . endswith ( ' ANDNETDIAG1 ' ) :
return value
except OSError : pass
return None
2015-12-24 23:58:34 +01:00
2015-12-26 01:09:07 +01:00
def autodetect_device ( ) :
if winreg is not None and ' usb.core ' not in sys . modules :
serial_path = detect_serial_path ( )
_logger . debug ( " Using serial port: %s " , serial_path )
if not serial_path :
2015-12-30 10:34:23 +01:00
raise RuntimeError ( " Device not found, try installing LG drivers " )
2015-12-26 01:09:07 +01:00
return FileCommunication ( serial_path )
else :
if ' usb.core ' not in sys . modules :
raise RuntimeError ( " Please install PyUSB for USB support " )
return USBCommunication ( )
2015-12-24 20:03:07 +01:00
### Interactive loop
2015-12-24 21:55:34 +01:00
def get_commands ( command ) :
if command :
yield command
return
# Happened on Win32/Py3.4.4 when: echo ls | lglaf.py --serial com4
if sys . stdin is None :
raise RuntimeError ( ' No console input available! ' )
2015-12-24 20:03:07 +01:00
if sys . stdin . isatty ( ) :
print ( " LGLAF.py by Peter Wu (https://lekensteyn.nl/lglaf) \n "
" Type a shell command to execute or \" exit \" to leave. " ,
file = sys . stderr )
prompt = ' # '
else :
prompt = ' '
try :
while True :
line = input ( prompt )
if line == " exit " :
break
if line :
yield line
except EOFError :
if prompt :
print ( " " , file = sys . stderr )
def command_to_payload ( command ) :
# Handle '!' as special commands, treat others as shell command
if command [ 0 ] != ' ! ' :
return make_exec_request ( command )
command = command [ 1 : ]
# !command [arg1[,arg2[,arg3[,arg4]]]] [body]
# args are treated as integers (decimal or hex)
# body is treated as string (escape sequences are supported)
command , args , body = ( command . split ( ' ' , 2 ) + [ ' ' , ' ' ] ) [ 0 : 3 ]
command = text_unescape ( command )
args = list ( map ( parse_number_or_escape , args . split ( ' , ' ) + [ 0 , 0 , 0 ] ) ) [ 0 : 4 ]
body = text_unescape ( body )
return make_request ( command , args , body )
parser = argparse . ArgumentParser ( description = ' LG LAF Download Mode utility ' )
2015-12-26 14:51:28 +01:00
parser . add_argument ( " --skip-hello " , action = " store_true " ,
help = " Immediately send commands, skip HELO message " )
2015-12-24 21:55:34 +01:00
parser . add_argument ( " -c " , " --command " , help = ' Shell command to execute ' )
2015-12-24 20:03:07 +01:00
parser . add_argument ( " --serial " , metavar = " PATH " , dest = " serial_path " ,
help = " Path to serial device (e.g. COM4). " )
parser . add_argument ( " --debug " , action = ' store_true ' , help = " Enable debug messages " )
def main ( ) :
args = parser . parse_args ( )
logging . basicConfig ( format = ' %(name)s : %(levelname)s : %(message)s ' ,
level = logging . DEBUG if args . debug else logging . INFO )
# Binary stdout (output data from device as-is)
try : stdout_bin = sys . stdout . buffer
except : stdout_bin = sys . stdout
if args . serial_path :
comm = FileCommunication ( args . serial_path )
else :
2015-12-26 01:09:07 +01:00
comm = autodetect_device ( )
2015-12-24 20:03:07 +01:00
with closing ( comm ) :
2015-12-26 14:51:28 +01:00
if not args . skip_hello :
try_hello ( comm )
_logger . debug ( " Hello done, proceeding with commands " )
2015-12-24 21:55:34 +01:00
for command in get_commands ( args . command ) :
2015-12-24 20:03:07 +01:00
try :
payload = command_to_payload ( command )
header , response = comm . call ( payload )
# For debugging, print header
if command [ 0 ] == ' ! ' :
_logger . debug ( ' Header: %s ' ,
2015-12-26 22:03:49 +01:00
' ' . join ( repr ( header [ i : i + 4 ] ) . replace ( " \\ x00 " , " \\ 0 " )
2015-12-24 20:03:07 +01:00
for i in range ( 0 , len ( header ) , 4 ) ) )
stdout_bin . write ( response )
except Exception as e :
_logger . warn ( e )
if args . debug :
import traceback ; traceback . print_exc ( )
if __name__ == ' __main__ ' :
main ( )