Allow partition lookup by label

This commit is contained in:
Peter Wu 2015-12-27 11:21:32 +01:00
parent a27775129d
commit c8e27d445d
2 changed files with 78 additions and 34 deletions

View file

@ -5,6 +5,7 @@
# Copyright (C) 2015 Peter Wu <peter@lekensteyn.nl>
# Licensed under the MIT license <http://opensource.org/licenses/MIT>.
from __future__ import print_function
from collections import OrderedDict
from contextlib import closing, contextmanager
import argparse, logging, os, struct, sys
@ -12,6 +13,13 @@ import lglaf
_logger = logging.getLogger("partitions")
def human_readable(sz):
suffixes = ('', 'Ki', 'Mi', 'Gi', 'Ti')
for i, suffix in enumerate(suffixes):
if sz <= 1024**(i+1):
break
return '%.1f %sB' % (sz / 1024**i, suffix)
def read_uint32(data, offset):
return struct.unpack_from('<I', data, 4)[0]
@ -19,15 +27,34 @@ def cat_file(comm, path):
shell_command = b'cat ' + path.encode('ascii') + b'\0'
return comm.call(lglaf.make_request(b'EXEC', body=shell_command))[1]
def partition_info(comm, part_num):
def get_partitions(comm):
"""
Maps partition labels (such as "recovery") to block devices (such as
"mmcblk0p0"), sorted by the number in the block device.
"""
name_cmd = 'ls -l /dev/block/platform/*/by-name'
output = comm.call(lglaf.make_exec_request(name_cmd))[1]
output = output.strip().decode('ascii')
names = []
for line in output.strip().split("\n"):
label, arrow, path = line.split()[-3:]
assert arrow == '->', "Expected arrow in ls output"
blockdev = path.split('/')[-1]
if not blockdev.startswith('mmcblk0p'):
continue
names.append((label, blockdev))
names.sort(key=lambda x: int(x[1].lstrip("mmcblk0p")))
return OrderedDict(names)
def partition_info(comm, part_name):
"""Retrieves the partition size and offset within the disk (in bytes)."""
disk_path = "/sys/block/mmcblk0/mmcblk0p%d" % part_num
disk_path = "/sys/class/block/%s" % part_name
try:
# Convert sector sizes to bytes.
start = 512 * int(cat_file(comm, "%s/start" % disk_path))
size = 512 * int(cat_file(comm, "%s/size" % disk_path))
except ValueError:
raise RuntimeError("Partition %d not found" % part_num)
raise RuntimeError("Partition %s not found" % part_name)
return start, size
@contextmanager
@ -77,6 +104,15 @@ def open_local_readable(path):
else:
return open(path, "rb")
def list_partitions(comm):
parts = get_partitions(comm)
print("Number StartSector Size Name")
for part_label, part_name in parts.items():
part_num = int(part_name.lstrip('mmcblk0p'))
part_offset, part_size = partition_info(comm, part_name)
print("%4d %10d %10s %s" % (part_num,
part_offset / BLOCK_SIZE, human_readable(part_size), part_label))
# On Linux, one bulk read returns at most 16 KiB. 32 bytes are part of the first
# header, so remove one block size (512 bytes) to stay within that margin.
# This ensures that whenever the USB communication gets out of sync, it will
@ -150,28 +186,49 @@ def write_partition(comm, disk_fd, local_path, part_offset, part_size):
parser = argparse.ArgumentParser()
parser.add_argument("--debug", action='store_true', help="Enable debug messages")
parser.add_argument("--list", action='store_true',
help='List available partitions')
parser.add_argument("--dump", metavar="LOCAL_PATH",
help="Dump partition to file ('-' for stdout)")
parser.add_argument("--load", metavar="LOCAL_PATH",
help="Write file to partition on device ('-' for stdin)")
parser.add_argument("partition_number", type=int,
help="Partition number (e.g. 1 for block device mmcblk0p1)")
parser.add_argument("partition", nargs='?',
help="Partition number (e.g. 1 for block device mmcblk0p1)"
" or partition name (e.g. 'recovery')")
def main():
args = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(name)s: %(levelname)s: %(message)s',
level=logging.DEBUG if args.debug else logging.INFO)
part_num = args.partition_number
if args.dump and args.load:
parser.error("Please specify one action from --dump / --load")
actions = (args.dump, args.load, args.list)
if sum(1 if x else 0 for x in actions) != 1:
parser.error("Please specify one action from --dump / --load / --list")
if not args.partition and (args.dump or args.load):
parser.error("Please specify a partition")
comm = lglaf.autodetect_device()
with closing(comm):
lglaf.try_hello(comm)
part_offset, part_size = partition_info(comm, part_num)
_logger.debug("Partition %d at offset %d (%#x) size %d (%#x)",
part_num, part_offset, part_offset, part_size, part_size)
if args.list:
list_partitions(comm)
return
try:
selected_partition = "mmcblk0p%d" % int(args.partition)
except ValueError:
selected_partition = args.partition
part_names = get_partitions(comm)
for part_label, part_name in part_names.items():
if selected_partition in (part_label, part_name):
break
else:
parser.error("Partition not found: %s" % selected_partition)
part_offset, part_size = partition_info(comm, part_name)
_logger.debug("Partition %s (%s) at offset %d (%#x) size %d (%#x)",
part_label, part_name, part_offset, part_offset, part_size, part_size)
with laf_open_disk(comm) as disk_fd:
_logger.debug("Opened fd %d for disk", disk_fd)
if args.dump:

View file

@ -11,19 +11,6 @@ import lglaf, partitions
_logger = logging.getLogger("extract-partitions")
def read_partition_numbers(comm):
output = comm.call(lglaf.make_exec_request('cat /proc/partitions'))[1]
partitions = []
for line in output.decode('ascii').split('\n'):
if not line:
continue
name = line.split()[-1]
if not name.startswith('mmcblk0p'):
continue
part_num = int(name[len('mmcblk0p'):])
partitions.append(part_num)
return partitions
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--outdir", default=".",
help="Output directory for disk images.")
@ -34,14 +21,14 @@ parser.add_argument("--max-size", metavar="kbytes", type=int, default=65536,
parser.add_argument("--debug", action='store_true', help="Enable debug messages")
def dump_partitions(comm, disk_fd, outdir, max_size):
part_nums = read_partition_numbers(comm)
for part_num in part_nums:
part_offset, part_size = partitions.partition_info(comm, part_num)
parts = partitions.get_partitions(comm)
for part_label, part_name in parts.items():
part_offset, part_size = partitions.partition_info(comm, part_name)
if part_size > max_size:
_logger.info("Ignoring large partition %s of size %dK" % (part_num,
part_size / 1024))
_logger.info("Ignoring large partition %s (%s) of size %dK",
part_label, part_name, part_size / 1024)
continue
out_path = os.path.join(outdir, "mmcblk0p%d.bin" % part_num)
out_path = os.path.join(outdir, "%s.bin" % part_name)
try:
current_size = os.path.getsize(out_path)
if current_size > part_size:
@ -49,12 +36,12 @@ def dump_partitions(comm, disk_fd, outdir, max_size):
out_path, current_size / 1024, part_size / 1024)
continue
elif current_size == part_size:
_logger.info("%s: already retrieved %dK",
out_path, part_size / 1024)
_logger.info("Skipping partition %s (%s), already found at %s",
part_label, part_name, out_path)
continue
except OSError: pass
_logger.info("Dumping partition %d to %s (%d bytes)",
part_num, out_path, part_size)
_logger.info("Dumping partition %s (%s) to %s (%d bytes)",
part_label, part_name, out_path, part_size)
partitions.dump_partition(comm, disk_fd, out_path, part_offset, part_size)
def main():