Merge pull request #27 from tuxuser/update/omit_shellcmd_parse_gpt

Dump file compatibility, read GPT from block device, py3 compatible KILO challenge/response
This commit is contained in:
Peter Wu 2017-11-27 21:21:42 +00:00 committed by GitHub
commit 427a397d2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 600 additions and 71 deletions

View file

@ -15,14 +15,21 @@ def read_uint32(data, offset):
return struct.unpack_from('<I', data, offset)[0]
def get_file_size(comm, path):
shell_command = b'stat -t ' + path.encode('utf8') + b'\0'
shell_command = b'ls -ld ' + path.encode('utf8') + b'\0'
output = comm.call(lglaf.make_request(b'EXEC', body=shell_command))[1]
output = output.decode('utf8')
if not output.startswith(path + ' '):
_logger.debug("Output: %r", output)
raise RuntimeError("Cannot get filesize for %s" % path)
size_bytes = output.lstrip(path + ' ').split()[0]
return int(size_bytes)
if not len(output):
raise RuntimeError("Cannot find file %s" % path)
# Example output: "-rwxr-x--- root root 496888 1970-01-01 00:00 lafd"
# Accommodate for varying ls output
fields = output.split()
if len(fields) >= 7:
for field in fields[3:]:
if field.isdigit():
return int(field)
_logger.debug("ls output: %s", output)
raise RuntimeError("Cannot find filesize for path %s" % path)
@contextmanager
def laf_open_ro(comm, filename):

View file

@ -23,9 +23,12 @@ parser.add_argument("--skip-hello", action="store_true",
help="Immediately send commands, skip HELO message")
def dump_partitions(comm, disk_fd, outdir, max_size):
parts = partitions.get_partitions(comm)
for part_label, part_name in parts.items():
part_offset, part_size = partitions.partition_info(comm, part_name)
diskinfo = partitions.get_partitions(comm, disk_fd)
for part in diskinfo.gpt.partitions:
part_offset = part.first_bla * partitions.BLOCK_SIZE
part_size = (part.last_lba - part.first_bla) * partitions.BLOCK_SIZE
part_name = part.name
part_label = "/dev/mmcblk0p%i" % part.index
if max_size and part_size > max_size:
_logger.info("Ignoring large partition %s (%s) of size %dK",
part_label, part_name, part_size / 1024)

458
gpt.py Normal file
View file

@ -0,0 +1,458 @@
#!/usr/bin/env python
# coding: utf-8
# vim:et:sta:sts=2:sw=2:ts=2:tw=0:
"""
Source: https://github.com/jrd/pyreadpartitions
Read MBR (msdos) and GPT partitions for a disk on UNIX.
Does not rely on any os tools or library.
Exemple:
import pyreadpartitions as pyrp
fp = open('/dev/sda', 'rb')
info = pyrp.get_disk_partitions_info(fp)
print(info.mbr)
print(info.gpt)
pyrp.show_disk_partitions_info(info)
fp.close()
with open('/dev/sda', 'rb') as fp:
pyrp.show_disk_partitions_info(fp)
"""
from __future__ import print_function, unicode_literals, division, absolute_import
__copyright__ = 'Copyright 2013-2014, Salix OS'
__author__ = 'Cyrille Pontvieux <jrd@salixos.org>'
__credits__ = ['Cyrille Pontvieux']
__email__ = 'jrd@salixos.org'
__license__ = 'MIT'
__version__ = '1.0.0'
from collections import namedtuple
import struct
import sys
import uuid
#from fcntl import ioctl
# http://en.wikipedia.org/wiki/Master_boot_record#Sector_layout
MBR_FORMAT = [
(b'446x', '_'), # boot code
(b'16s', 'partition1'),
(b'16s', 'partition2'),
(b'16s', 'partition3'),
(b'16s', 'partition4'),
(b'2s', 'signature'),
]
# http://en.wikipedia.org/wiki/Master_boot_record#Partition_table_entries
MBR_PARTITION_FORMAT = [
(b'B', 'status'), # > 0x80 => active
(b'3p', 'chs_first'), # 8*h + 2*c + 6*s + 8*c
(b'B', 'type'),
(b'3p', 'chs_last'), # 8*h + 2*c + 6*s + 8*c
(b'L', 'lba'),
(b'L', 'sectors'),
]
# http://en.wikipedia.org/wiki/Partition_type#List_of_partition_IDs
MBR_PARTITION_TYPE = {
0x00: 'Empty',
0x01: 'FAT12',
0x04: 'FAT16 16-32MB',
0x05: 'Extended, CHS',
0x06: 'FAT16 32MB-2GB',
0x07: 'NTFS',
0x0B: 'FAT32',
0x0C: 'FAT32X',
0x0E: 'FAT16X',
0x0F: 'Extended, LBA',
0x11: 'Hidden FAT12',
0x14: 'Hidden FAT16,16-32MB',
0x15: 'Hidden Extended, CHS',
0x16: 'Hidden FAT16,32MB-2GB',
0x17: 'Hidden NTFS',
0x1B: 'Hidden FAT32',
0x1C: 'Hidden FAT32X',
0x1E: 'Hidden FAT16X',
0x1F: 'Hidden Extended, LBA',
0x27: 'Windows recovery environment',
0x39: 'Plan 9',
0x3C: 'PartitionMagic recovery partition',
0x42: 'Windows dynamic extended partition marker',
0x44: 'GoBack partition',
0x63: 'Unix System V',
0x64: 'PC-ARMOUR protected partition',
0x81: 'Minix',
0x82: 'Linux Swap',
0x83: 'Linux',
0x84: 'Hibernation',
0x85: 'Linux Extended',
0x86: 'Fault-tolerant FAT16B volume set',
0x87: 'Fault-tolerant NTFS volume set',
0x88: 'Linux plaintext',
0x8E: 'Linux LVM',
0x93: 'Hidden Linux',
0x9F: 'BSD/OS',
0xA0: 'Hibernation',
0xA1: 'Hibernation',
0xA5: 'FreeBSD',
0xA6: 'OpenBSD',
0xA8: 'Mac OS X',
0xA9: 'NetBSD',
0xAB: 'Mac OS X Boot',
0xAF: 'Mac OS X HFS',
0xBE: 'Solaris 8 boot partition',
0xBF: 'Solaris x86',
0xE8: 'Linux Unified Key Setup',
0xEB: 'BFS',
0xEE: 'EFI GPT protective MBR',
0xEF: 'EFI system partition',
0xFA: 'Bochs x86 emulator',
0xFB: 'VMware File System',
0xFC: 'VMware Swap',
0xFD: 'Linux RAID',
}
MBR_EXTENDED_TYPE = [0x05, 0x0F, 0x15, 0x1F, 0x85]
# http://en.wikipedia.org/wiki/Extended_boot_record#Structures
EBR_FORMAT = [
(b'446x', '_'),
(b'16s', 'partition'), # lba = offset from ebr, sectors = size of partition
(b'16s', 'next_ebr'), # lba = offset from extended partition, sectors = next EBR + next Partition size
(b'16x', '_'),
(b'16x', '_'),
(b'2s', 'signature'),
]
# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_table_header_.28LBA_1.29
GPT_HEADER_FORMAT = [
(b'8s', 'signature'),
(b'H', 'revision_minor'),
(b'H', 'revision_major'),
(b'L', 'header_size'),
(b'L', 'crc32'),
(b'4x', '_'),
(b'Q', 'current_lba'),
(b'Q', 'backup_lba'),
(b'Q', 'first_usable_lba'),
(b'Q', 'last_usable_lba'),
(b'16s', 'disk_guid'),
(b'Q', 'part_entry_start_lba'),
(b'L', 'num_part_entries'),
(b'L', 'part_entry_size'),
(b'L', 'crc32_part_array'),
]
# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries_.28LBA_2.E2.80.9333.29
GPT_PARTITION_FORMAT = [
(b'16s', 'guid'),
(b'16s', 'uid'),
(b'Q', 'first_lba'),
(b'Q', 'last_lba'),
(b'Q', 'flags'),
(b'72s', 'name'),
]
# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs
GPT_GUID = {
'024DEE41-33E7-11D3-9D69-0008C781F39F': 'MBR partition scheme',
'C12A7328-F81F-11D2-BA4B-00A0C93EC93B': 'EFI System partition',
'21686148-6449-6E6F-744E-656564454649': 'BIOS Boot partition',
'D3BFE2DE-3DAF-11DF-BA40-E3A556D89593': 'Intel Fast Flash (iFFS) partition (for Intel Rapid Start technology)',
'F4019732-066E-4E12-8273-346C5641494F': 'Sony boot partition',
'BFBFAFE7-A34F-448A-9A5B-6213EB736C22': 'Lenovo boot partition',
'E3C9E316-0B5C-4DB8-817D-F92DF00215AE': 'Microsoft Reserved Partition (MSR)',
'EBD0A0A2-B9E5-4433-87C0-68B6B72699C7': 'Basic data partition',
'5808C8AA-7E8F-42E0-85D2-E1E90434CFB3': 'Logical Disk Manager (LDM) metadata partition',
'AF9B60A0-1431-4F62-BC68-3311714A69AD': 'Logical Disk Manager data partition',
'DE94BBA4-06D1-4D40-A16A-BFD50179D6AC': 'Windows Recovery Environment',
'37AFFC90-EF7D-4E96-91C3-2D7AE055B174': 'IBM General Parallel File System (GPFS) partition',
'75894C1E-3AEB-11D3-B7C1-7B03A0000000': 'Data partition',
'E2A1E728-32E3-11D6-A682-7B03A0000000': 'Service Partition',
'0FC63DAF-8483-4772-8E79-3D69D8477DE4': 'Linux filesystem data',
'A19D880F-05FC-4D3B-A006-743F0F84911E': 'RAID partition',
'0657FD6D-A4AB-43C4-84E5-0933C84B4F4F': 'Swap partition',
'E6D6D379-F507-44C2-A23C-238F2A3DF928': 'Logical Volume Manager (LVM) partition',
'933AC7E1-2EB4-4F13-B844-0E14E2AEF915': '/home partition',
'3B8F8425-20E0-4F3B-907F-1A25A76F98E8': '/srv partition',
'7FFEC5C9-2D00-49B7-8941-3EA10A5586B7': 'Plain dm-crypt partition',
'CA7D7CCB-63ED-4C53-861C-1742536059CC': 'LUKS partition',
'8DA63339-0007-60C0-C436-083AC8230908': 'Reserved',
'83BD6B9D-7F41-11DC-BE0B-001560B84F0F': 'Boot partition',
'516E7CB4-6ECF-11D6-8FF8-00022D09712B': 'Data partition',
'516E7CB5-6ECF-11D6-8FF8-00022D09712B': 'Swap partition',
'516E7CB6-6ECF-11D6-8FF8-00022D09712B': 'Unix File System (UFS) partition',
'516E7CB8-6ECF-11D6-8FF8-00022D09712B': 'Vinum volume manager partition',
'516E7CBA-6ECF-11D6-8FF8-00022D09712B': 'ZFS partition',
'48465300-0000-11AA-AA11-00306543ECAC': 'Hierarchical File System Plus (HFS+) partition',
'55465300-0000-11AA-AA11-00306543ECAC': 'Apple UFS',
'6A898CC3-1DD2-11B2-99A6-080020736631': 'ZFS',
'52414944-0000-11AA-AA11-00306543ECAC': 'Apple RAID partition',
'52414944-5F4F-11AA-AA11-00306543ECAC': 'Apple RAID partition, offline',
'426F6F74-0000-11AA-AA11-00306543ECAC': 'Apple Boot partition',
'4C616265-6C00-11AA-AA11-00306543ECAC': 'Apple Label',
'5265636F-7665-11AA-AA11-00306543ECAC': 'Apple TV Recovery partition',
'53746F72-6167-11AA-AA11-00306543ECAC': 'Apple Core Storage (i.e. Lion FileVault) partition',
'6A82CB45-1DD2-11B2-99A6-080020736631': 'Boot partition',
'6A85CF4D-1DD2-11B2-99A6-080020736631': 'Root partition',
'6A87C46F-1DD2-11B2-99A6-080020736631': 'Swap partition',
'6A8B642B-1DD2-11B2-99A6-080020736631': 'Backup partition',
'6A898CC3-1DD2-11B2-99A6-080020736631': '/usr partition',
'6A8EF2E9-1DD2-11B2-99A6-080020736631': '/var partition',
'6A90BA39-1DD2-11B2-99A6-080020736631': '/home partition',
'6A9283A5-1DD2-11B2-99A6-080020736631': 'Alternate sector',
'6A945A3B-1DD2-11B2-99A6-080020736631': 'Reserved partition',
'6A9630D1-1DD2-11B2-99A6-080020736631': 'Reserved partition',
'6A980767-1DD2-11B2-99A6-080020736631': 'Reserved partition',
'6A96237F-1DD2-11B2-99A6-080020736631': 'Reserved partition',
'6A8D2AC7-1DD2-11B2-99A6-080020736631': 'Reserved partition',
'49F48D32-B10E-11DC-B99B-0019D1879648': 'Swap partition',
'49F48D5A-B10E-11DC-B99B-0019D1879648': 'FFS partition',
'49F48D82-B10E-11DC-B99B-0019D1879648': 'LFS partition',
'49F48DAA-B10E-11DC-B99B-0019D1879648': 'RAID partition',
'2DB519C4-B10F-11DC-B99B-0019D1879648': 'Concatenated partition',
'2DB519EC-B10F-11DC-B99B-0019D1879648': 'Encrypted partition',
'FE3A2A5D-4F32-41A7-B725-ACCC3285A309': 'ChromeOS kernel',
'3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC': 'ChromeOS rootfs',
'2E0A753D-9E48-43B0-8337-B15192CB1B5E': 'ChromeOS future use',
'42465331-3BA3-10F1-802A-4861696B7521': 'Haiku BFS',
'85D5E45E-237C-11E1-B4B3-E89A8F7FC3A7': 'Boot partition',
'85D5E45A-237C-11E1-B4B3-E89A8F7FC3A7': 'Data partition',
'85D5E45B-237C-11E1-B4B3-E89A8F7FC3A7': 'Swap partition',
'0394EF8B-237E-11E1-B4B3-E89A8F7FC3A7': 'Unix File System (UFS) partition',
'85D5E45C-237C-11E1-B4B3-E89A8F7FC3A7': 'Vinum volume manager partition',
'85D5E45D-237C-11E1-B4B3-E89A8F7FC3A7': 'ZFS partition',
'BFBFAFE7-A34F-448A-9A5B-6213EB736C22': 'Ceph Journal',
'45B0969E-9B03-4F30-B4C6-5EC00CEFF106': 'Ceph dm-crypt Encrypted Journal',
'4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D': 'Ceph OSD',
'4FBD7E29-9D25-41B8-AFD0-5EC00CEFF05D': 'Ceph dm-crypt OSD',
'89C57F98-2FE5-4DC0-89C1-F3AD0CEFF2BE': 'Ceph disk in creation',
'89C57F98-2FE5-4DC0-89C1-5EC00CEFF2BE': 'Ceph dm-crypt disk in creation',
}
def make_fmt(name, fmt, extras=[]):
packfmt = b'<' + b''.join(t for t, n in fmt)
tupletype = namedtuple(name, [n for t, n in fmt if n != '_'] + extras)
return (packfmt, tupletype)
class MBRError(Exception):
pass
class MBRMissing(MBRError):
pass
class GPTError(Exception):
pass
class GPTMissing(GPTError):
pass
def read_mbr_header(fp):
fmt, MBRHeader = make_fmt('MBRHeader', MBR_FORMAT)
data = fp.read(struct.calcsize(fmt))
header = MBRHeader._make(struct.unpack(fmt, data))
if header.signature != b'\x55\xAA':
raise MBRMissing('Bad MBR signature')
return header
def read_mbr_partitions(fp, header):
def read_mbr_partition(partstr, num):
fmt, MBRPartition = make_fmt('MBRPartition', MBR_PARTITION_FORMAT, extras=['index', 'active', 'type_str'])
part = MBRPartition._make(struct.unpack(fmt, partstr) + (num, False, ''))
if part.type:
ptype = 'Unknown'
if part.type in MBR_PARTITION_TYPE:
ptype = MBR_PARTITION_TYPE[part.type]
part = part._replace(active=part.status >= 0x80, type_str=ptype)
return part
def read_ebr_partition(fp, extended_lba, lba, num):
fp.seek((extended_lba + lba) * 512) # lba size is fixed to 512 for MBR
fmt, EBR = make_fmt('EBR', EBR_FORMAT)
data = fp.read(struct.calcsize(fmt))
ebr = EBR._make(struct.unpack(fmt, data))
if ebr.signature != b'\x55\xAA':
raise MBRError('Bad EBR signature')
parts = [read_mbr_partition(ebr.partition, num)]
if ebr.next_ebr != 16 * b'\x00':
part_next_ebr = read_mbr_partition(ebr.next_ebr, 0)
next_lba = part_next_ebr.lba
parts.extend(read_ebr_partition(fp, extended_lba, next_lba, num + 1))
return parts
parts = []
for i in range(1, 4):
part = read_mbr_partition(getattr(header, 'partition{0}'.format(i)), i)
if part:
parts.append(part)
extendpart = None
for part in parts:
if part.type in MBR_EXTENDED_TYPE:
extendpart = part
break
if extendpart:
parts.extend(read_ebr_partition(fp, extendpart.lba, 0, 5))
return parts
def read_gpt_header(fp, lba_size=512):
try:
# skip MBR (if any)
fp.seek(1 * lba_size)
except IOError as e:
raise GPTError(e)
fmt, GPTHeader = make_fmt('GPTHeader', GPT_HEADER_FORMAT)
data = fp.read(struct.calcsize(fmt))
header = GPTHeader._make(struct.unpack(fmt, data))
if header.signature != b'EFI PART':
raise GPTMissing('Bad GPT signature')
revision = header.revision_major + (header.revision_minor / 10)
if revision < 1.0:
raise GPTError('Bad GPT revision: {0}.{1}'.format(header.revision_major, header.revision_minor))
if header.header_size < 92:
raise GPTError('Bad GPT header size: {0}'.format(header.header_size))
header = header._replace(
disk_guid=str(uuid.UUID(bytes_le=header.disk_guid)).upper(),
)
return header
def read_gpt_partitions(fp, header, lba_size=512):
fp.seek(header.part_entry_start_lba * lba_size)
fmt, GPTPartition = make_fmt('GPTPartition', GPT_PARTITION_FORMAT, extras=['index', 'type'])
parts = []
for i in range(header.num_part_entries):
data = fp.read(header.part_entry_size)
if len(data) < struct.calcsize(fmt):
raise GPTError('Short GPT partition entry #{0}'.format(i + 1))
part = GPTPartition._make(struct.unpack(fmt, data) + (i + 1, ''))
if part.guid == 16 * b'\x00':
continue
guid = str(uuid.UUID(bytes_le=part.guid)).upper()
ptype = 'Unknown'
if guid in GPT_GUID:
ptype = GPT_GUID[guid]
part = part._replace(
guid=guid,
uid=str(uuid.UUID(bytes_le=part.uid)).upper(),
# cut on C-style string termination; otherwise you'll see a long row of NILs for most names
name=part.name.decode('utf-16').split(u'\0', 1)[0],
type=ptype,
)
parts.append(part)
return parts
class DiskException(Exception):
pass
def check_disk_file(disk):
try:
disk.tell()
except:
raise DiskException('Please provide a file pointer (sys.stding or result of open function) as first argument, pointing to an existing disk such as /dev/sda.')
def get_mbr_info(disk):
check_disk_file(disk)
disk.seek(0)
try:
mbrheader = read_mbr_header(disk)
partitions = read_mbr_partitions(disk, mbrheader)
return namedtuple('MBRInfo', 'lba_size, partitions')(512, partitions)
except MBRMissing:
return None
except MBRError:
return None
def get_gpt_info(disk):
check_disk_file(disk)
disk.seek(0)
info = {
'lba_size': None,
'revision_minor': None,
'revision_major': None,
'crc32': None,
'current_lba': None,
'backup_lba': None,
'first_usable_lba': None,
'last_usable_lba': None,
'disk_guid': None,
'part_entry_start_lba': None,
'num_part_entries': None,
'part_entry_size': None,
'crc32_part_array': None,
'partitions': [],
}
# EDIT by tuxuser: we are only working with datachunks, not real block devices
# Using hardcoded LBA size for usage by LGLAF
#try:
# blocksize = struct.unpack('i', ioctl(disk.fileno(), 4608 | 104, struct.pack('i', -1)))[0]
#except:
# blocksize = 512
blocksize = 512
try:
info['lba_size'] = blocksize
gptheader = read_gpt_header(disk, lba_size=blocksize)
for key in [k for k in info.keys() if k not in ('lba_size', 'partitions')]:
info[key] = getattr(gptheader, key)
info['partitions'] = read_gpt_partitions(disk, gptheader, lba_size=blocksize)
return namedtuple('GPTInfo', info.keys())(**info)
except GPTMissing:
return None
except GPTError:
return None
def get_disk_partitions_info(disk):
check_disk_file(disk)
return namedtuple('DiskInfo', 'mbr, gpt')(get_mbr_info(disk), get_gpt_info(disk))
def show_disk_partitions_info(diskOrInfo):
fileUsed = None
if hasattr(diskOrInfo, 'read'):
info = get_disk_partitions_info(diskOrInfo)
else:
info = diskOrInfo
if info.mbr:
mbr = info.mbr
print('MBR Header')
print('LBA size (sector size): {0}', mbr.lba_size)
print('Number of MBR partitions: {0}'.format(len(mbr.partitions)))
print('# Active From(#s) Size(#s) Code Type')
for part in mbr.partitions:
print('{n: <2} {boot: ^6} {from_s: <10} {size_s: <10} {code: ^4X} {type}'.format(n=part.index, boot='*' if part.active else '_', from_s=part.lba, size_s=part.sectors, code=part.type, type=part.type_str))
else:
print('No MBR')
print('---')
if info.gpt:
gpt = info.gpt
print('GPT Header')
print('Disk GUID: {0}'.format(gpt.disk_guid))
print('LBA size (sector size): {0}'.format(gpt.lba_size))
print('GPT First LBA: {0}'.format(gpt.current_lba))
print('GPT Last LBA: {0}'.format(gpt.backup_lba))
print('Number of GPT partitions: {0}'.format(len(gpt.partitions)))
print('# Flags From(#s) To(#s) GUID/UID Type/Name')
for part in gpt.partitions:
print(('{n: <3} {flags: ^5} {from_s: <10} {to_s: <10} {guid} {type}\n' + ' ' * 32 + '{uid} {name}').format(n=part.index, flags=part.flags, from_s=part.first_lba, to_s=part.last_lba, guid=part.guid, type=part.type, uid=part.uid, name=part.name))
else:
print('No GPT')
print('---')
if fileUsed:
fileUsed.close()
if __name__ == '__main__':
fp = sys.stdin
if len(sys.argv) > 1:
fp = open(sys.argv[1])
try:
show_disk_partitions_info(fp)
except DiskException as e:
print(e, file=sys.stderr)
if fp != sys.stdin:
fp.close()

33
laf_crypto.py Normal file
View file

@ -0,0 +1,33 @@
from Crypto.Cipher import AES
from lglaf import int_as_byte
def key_transform(old_key):
new_key = b''
for x in range(32, 0, -1):
new_key += int_as_byte(old_key[x-1] - (x % 0x0C))
return new_key
def xor_key(key, kilo_challenge):
# Reserve key
key_xor = b''
pos = 0
for i in range(8):
key_xor += int_as_byte(key[pos] ^ kilo_challenge[3])
key_xor += int_as_byte(key[pos + 1] ^ kilo_challenge[2])
key_xor += int_as_byte(key[pos + 2] ^ kilo_challenge[1])
key_xor += int_as_byte(key[pos + 3] ^ kilo_challenge[0])
pos += 4
return key_xor
def encrypt_kilo_challenge(encryption_key, kilo_challenge):
plaintext = b''
for k in range(0, 16):
# Assemble 0x00 0x01 0x02 ... 0x1F byte-array
plaintext += int_as_byte(k)
encryption_key = key_transform(encryption_key)
xored_key = xor_key(encryption_key, kilo_challenge)
obj = AES.new(xored_key, AES.MODE_ECB)
return obj.encrypt(plaintext)

View file

@ -7,7 +7,7 @@
from __future__ import print_function
from contextlib import closing
import argparse, logging, re, struct, sys
import argparse, logging, re, struct, sys, binascii
# Enhanced prompt with history
try: import readline
@ -29,6 +29,15 @@ except: pass
if '\0' == b'\0': int_as_byte = chr
else: int_as_byte = lambda x: bytes([x])
# laf crypto for KILO challenge/response
try:
import laf_crypto
except ImportError:
_logger.warning("LAF Crypto failed to import!")
pass
# Use Manufacturer key for KILO challenge/response
USE_MFG_KEY = False
laf_error_codes = {
0x80000000: "FAILED",
@ -329,6 +338,24 @@ class USBCommunication(Communication):
def close(self):
usb.util.dispose_resources(self.usbdev)
def challenge_response(comm, mode):
request_kilo = make_request(b'KILO', args=[b'CENT', b'\0\0\0\0', b'\0\0\0\0', b'\0\0\0\0'])
kilo_header, kilo_response = comm.call(request_kilo)
kilo_challenge = kilo_header[8:12]
_logger.debug("Challenge: %s" % binascii.hexlify(kilo_challenge))
if USE_MFG_KEY:
key = b'lgowvqnltpvtgogwswqn~n~mtjjjqxro'
else:
key = b'qndiakxxuiemdklseqid~a~niq,zjuxl'
kilo_response = laf_crypto.encrypt_kilo_challenge(key, kilo_challenge)
_logger.debug("Response: %s" % binascii.hexlify(kilo_response))
mode_bytes = struct.pack('<I', mode)
kilo_metr_request = make_request(b'KILO', args=[b'METR', b'\0\0\0\0', mode_bytes, b'\0\0\0\0'],
body=bytes(kilo_response))
metr_header, metr_response = comm.call(kilo_metr_request)
_logger.debug("KILO METR Response -> Header: %s, Body: %s" % (
binascii.hexlify(metr_header), binascii.hexlify(metr_response)))
def try_hello(comm):
"""
Tests whether the device speaks the expected protocol. If desynchronization
@ -426,6 +453,8 @@ def command_to_payload(command, rawshell):
parser = argparse.ArgumentParser(description='LG LAF Download Mode utility')
parser.add_argument("--skip-hello", action="store_true",
help="Immediately send commands, skip HELO message")
parser.add_argument("--cr", action="store_true",
help="Do initial challenge response (KILO CENT/METR)")
parser.add_argument('--rawshell', action="store_true",
help="Execute shell commands as-is, needed on recent devices. "
"CAUTION: stderr output is not redirected!")
@ -449,6 +478,11 @@ def main():
comm = autodetect_device()
with closing(comm):
if args.cr:
# Mode 2 seems standard, will maybe
# change in other protocol versions
_logger.debug("Doing KILO challenge response")
challenge_response(comm, mode=2)
if not args.skip_hello:
try_hello(comm)
_logger.debug("Hello done, proceeding with commands")

View file

@ -8,11 +8,14 @@
from __future__ import print_function
from collections import OrderedDict
from contextlib import closing, contextmanager
import argparse, logging, os, struct, sys
import argparse, logging, os, io, struct, sys
import lglaf
import gpt
_logger = logging.getLogger("partitions")
GPT_LBA_LEN = 34
def human_readable(sz):
suffixes = ('', 'Ki', 'Mi', 'Gi', 'Ti')
for i, suffix in enumerate(suffixes):
@ -23,48 +26,32 @@ def human_readable(sz):
def read_uint32(data, offset):
return struct.unpack_from('<I', data, offset)[0]
def cat_file(comm, path):
shell_command = b'cat ' + path.encode('ascii') + b'\0'
return comm.call(lglaf.make_request(b'EXEC', body=shell_command))[1]
def get_partitions(comm):
def get_partitions(comm, fd_num):
"""
Maps partition labels (such as "recovery") to block devices (such as
"mmcblk0p0"), sorted by the number in the block device.
"""
name_cmd = 'ls -l /dev/block/bootdevice/by-name'
output = comm.call(lglaf.make_exec_request(name_cmd))[1]
output = output.strip().decode('ascii')
names = []
for line in output.strip().split("\n"):
label, arrow, path = line.split()[-3:]
assert arrow == '->', "Expected arrow in ls output"
blockdev = path.split('/')[-1]
if not blockdev.startswith('mmcblk0p'):
continue
names.append((label, blockdev))
names.sort(key=lambda x: int(x[1].lstrip("mmcblk0p")))
return OrderedDict(names)
read_offset = 0
end_offset = GPT_LBA_LEN * BLOCK_SIZE
def find_partition(partitions, query):
try: query = "mmcblk0p%d" % int(query)
except ValueError: pass
for part_label, part_name in partitions.items():
if query in (part_label, part_name):
return part_label, part_name
table_data = b''
while read_offset < end_offset:
chunksize = min(end_offset - read_offset, BLOCK_SIZE * MAX_BLOCK_SIZE)
data = laf_read(comm, fd_num, read_offset // BLOCK_SIZE, chunksize)
table_data += data
read_offset += chunksize
with io.BytesIO(table_data) as table_fd:
info = gpt.get_disk_partitions_info(table_fd)
return info
def find_partition(diskinfo, query):
partno = int(query) if query.isdigit() else None
for part in diskinfo.gpt.partitions:
if part.index == partno or part.name == query:
return part
raise ValueError("Partition not found: %s" % query)
def partition_info(comm, part_name):
"""Retrieves the partition size and offset within the disk (in bytes)."""
disk_path = "/sys/class/block/%s" % part_name
try:
# Convert sector sizes to bytes.
output = cat_file(comm, '{0}/start {0}/size'.format(disk_path)).split()
start, size = (512 * int(x) for x in output)
except ValueError:
raise RuntimeError("Partition %s not found" % part_name)
return start, size
@contextmanager
def laf_open_disk(comm):
# Open whole disk in read/write mode
@ -120,19 +107,23 @@ def open_local_readable(path):
else:
return open(path, "rb")
def list_partitions(comm, part_filter=None):
partitions = get_partitions(comm)
def get_partition_info_string(part):
info = '# Flags From(#s) To(#s) GUID/UID Type/Name\n'
info += ('{n: <3} {flags: ^5} {from_s: <10} {to_s: <10} {guid} {type}\n' + ' ' * 32 + '{uid} {name}').format(
n=part.index, flags=part.flags, from_s=part.first_lba, to_s=part.last_lba, guid=part.guid,
type=part.type, uid=part.uid, name=part.name)
return info
def list_partitions(comm, fd_num, part_filter=None):
diskinfo = get_partitions(comm, fd_num)
if part_filter:
try: part_filter = find_partition(partitions, part_filter)[1]
except ValueError: pass # No results is OK.
print("Number StartSector Size Name")
for part_label, part_name in partitions.items():
if part_filter and part_filter != part_name:
continue
part_num = int(part_name.lstrip('mmcblk0p'))
part_offset, part_size = partition_info(comm, part_name)
print("%4d %10d %10s %s" % (part_num,
part_offset / BLOCK_SIZE, human_readable(part_size), part_label))
try:
part = find_partition(diskinfo, part_filter)
print(get_partition_info_string(part))
except ValueError as e:
print('Error: %s' % e)
else:
gpt.show_disk_partitions_info(diskinfo)
# On Linux, one bulk read returns at most 16 KiB. 32 bytes are part of the first
# header, so remove one block size (512 bytes) to stay within that margin.
@ -253,20 +244,23 @@ def main():
if not args.skip_hello:
lglaf.try_hello(comm)
if args.list:
list_partitions(comm, args.partition)
return
partitions = get_partitions(comm)
try:
part_label, part_name = find_partition(partitions, args.partition)
except ValueError as e:
parser.error(e)
part_offset, part_size = partition_info(comm, part_name)
_logger.debug("Partition %s (%s) at offset %d (%#x) size %d (%#x)",
part_label, part_name, part_offset, part_offset, part_size, part_size)
with laf_open_disk(comm) as disk_fd:
if args.list:
list_partitions(comm, disk_fd, args.partition)
return
diskinfo = get_partitions(comm, disk_fd)
try:
part = find_partition(diskinfo, args.partition)
except ValueError as e:
parser.error(e)
info = get_partition_info_string(part)
_logger.debug("%s", info)
part_offset = part.first_lba * BLOCK_SIZE
part_size = (part.last_lba - part.first_lba) * BLOCK_SIZE
_logger.debug("Opened fd %d for disk", disk_fd)
if args.dump:
dump_partition(comm, disk_fd, args.dump, part_offset, part_size)