#!/usr/bin/env python # # Manage a single partition (info, read, write). # # Copyright (C) 2015 Peter Wu # Licensed under the MIT license . from __future__ import print_function from collections import OrderedDict from contextlib import closing, contextmanager import argparse, logging, os, io, struct, sys import lglaf import gpt _logger = logging.getLogger("partitions") GPT_LBA_LEN = 34 def human_readable(sz): suffixes = ('', 'Ki', 'Mi', 'Gi', 'Ti') for i, suffix in enumerate(suffixes): if sz <= 1024**(i+1): break return '%.1f %sB' % (sz / 1024**i, suffix) def read_uint32(data, offset): return struct.unpack_from('= 34 * 512, "Will not allow overwriting GPT scheme" with open_local_readable(local_path) as f: try: length = f.seek(0, 2) except OSError: # Will try to write up to the end of the file. _logger.debug("File %s is not seekable, length is unknown", local_path) else: # Restore position and check if file is small enough f.seek(0) if length > part_size: raise RuntimeError("File size %d is larger than partition " "size %d" % (length, part_size)) # Some special bytes report 0 (such as /dev/zero) if length > 0: _logger.debug("Will write %d bytes", length) written = 0 while write_offset < end_offset: chunksize = min(end_offset - write_offset, BLOCK_SIZE * MAX_BLOCK_SIZE) data = f.read(chunksize) if not data: break # End of file laf_write(comm, disk_fd, write_offset // BLOCK_SIZE, data) written += len(data) write_offset += chunksize if len(data) != chunksize: break # Short read, end of file _logger.info("Done after writing %d bytes from %s", written, local_path) def wipe_partition(comm, disk_fd, part_offset, part_size): sector_start = part_offset // BLOCK_SIZE sector_count = part_size // BLOCK_SIZE # Sanity check assert sector_start >= 34, "Will not allow overwriting GPT scheme" # Discarding no sectors or more than 512 GiB is a bit stupid. assert 0 < sector_count < 1024**3, "Invalid sector count %d" % sector_count laf_erase(comm, disk_fd, sector_start, sector_count) _logger.info("Done with TRIM from sector %d, count %d (%s)", sector_start, sector_count, human_readable(part_size)) parser = argparse.ArgumentParser() parser.add_argument("--debug", action='store_true', help="Enable debug messages") parser.add_argument("--list", action='store_true', help='List available partitions') parser.add_argument("--dump", metavar="LOCAL_PATH", help="Dump partition to file ('-' for stdout)") parser.add_argument("--restore", metavar="LOCAL_PATH", help="Write file to partition on device ('-' for stdin)") parser.add_argument("--wipe", action='store_true', help="TRIMs a partition") parser.add_argument("partition", nargs='?', help="Partition number (e.g. 1 for block device mmcblk0p1)" " or partition name (e.g. 'recovery')") parser.add_argument("--skip-hello", action="store_true", help="Immediately send commands, skip HELO message") def main(): args = parser.parse_args() logging.basicConfig(format='%(asctime)s %(name)s: %(levelname)s: %(message)s', level=logging.DEBUG if args.debug else logging.INFO) actions = (args.list, args.dump, args.restore, args.wipe) if sum(1 if x else 0 for x in actions) != 1: parser.error("Please specify one action from" " --list / --dump / --restore / --wipe") if not args.partition and (args.dump or args.restore or args.wipe): parser.error("Please specify a partition") comm = lglaf.autodetect_device() with closing(comm): if not args.skip_hello: lglaf.try_hello(comm) with laf_open_disk(comm) as disk_fd: if args.list: list_partitions(comm, disk_fd, args.partition) return diskinfo = get_partitions(comm, disk_fd) try: part = find_partition(diskinfo, args.partition) except ValueError as e: parser.error(e) info = get_partition_info_string(part) _logger.debug("%s", info) part_offset = part.first_lba * BLOCK_SIZE part_size = (part.last_lba - part.first_lba) * BLOCK_SIZE _logger.debug("Opened fd %d for disk", disk_fd) if args.dump: dump_partition(comm, disk_fd, args.dump, part_offset, part_size) elif args.restore: write_partition(comm, disk_fd, args.restore, part_offset, part_size) elif args.wipe: wipe_partition(comm, disk_fd, part_offset, part_size) if __name__ == '__main__': try: main() except OSError as e: # Ignore when stdout is closed in a pipe if e.errno != 32: raise