This commit is contained in:
bongbui321 2024-03-26 00:34:10 -04:00
parent f968e0956a
commit 569a21a5f7
2 changed files with 72 additions and 115 deletions

View file

@ -753,7 +753,6 @@ class firehose(metaclass=LogBase):
resp = rsp["value"] == "ACK"
return response(resp=resp, data=resData, error=rsp[2]) # Do not remove, needed for oneplus
# TODO: this should be able to get backup gpt as well
def get_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba, start_sector=1):
try:
resp = self.cmd_read_buffer(lun, 0, 1, False)
@ -805,23 +804,6 @@ class firehose(metaclass=LogBase):
try:
sectorsize = self.cfg.SECTOR_SIZE_IN_BYTES
header = guid_gpt.parseheader(data, sectorsize)
#if not primary:
# print(f"signature: {header.signature}")
# print(f"revision: {header.revision}")
# print(f"header size: {header.header_size}")
# print(f"crc32: {header.crc32}")
# print(f"reserved: {header.reserved}")
# print(f"current_lba: {header.current_lba}")
# print(f"backup_lba: {header.backup_lba}")
# print(f"first_usable_lba: {header.first_usable_lba}")
# print(f"last_usable_lba: {header.last_usable_lba}")
# print(f"disk_guid: {header.disk_guid}")
# print(f"part_entry_stzrt_lba: {header.part_entry_start_lba}")
# print(f"num_part_entries: {header.num_part_entries}")
# print(f"part_entry_size: {header.part_entry_size}")
# print(f"crc32_part_entries: {header.crc32_part_entries}")
# return None, None
if header.signature == b"EFI PART":
part_table_size = header.num_part_entries * header.part_entry_size
sectors = part_table_size // self.cfg.SECTOR_SIZE_IN_BYTES
@ -1328,16 +1310,6 @@ class firehose(metaclass=LogBase):
self.warning("GetStorageInfo command isn't supported.")
return None
# 1. check for integrity of the primary header and integrity of backup header
# Yes?: update primary header + update backup header
# No? : which one is corrupted? --> update the corrupted one with the other one
# 2. how to fix corruption
# -> fix header (patch)
# -> fix partition table (write)
def check_gpt_integrity(self, gptData, guid_gpt):
primary_header = guid_gpt.header
def cmd_setactiveslot(self, slot: str):
# flags: 0x3a for inactive and 0x6f for active boot partition
def set_flags(flags, active, is_boot):
@ -1358,14 +1330,16 @@ class firehose(metaclass=LogBase):
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET*8))
return new_flags
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt, partition_a, partition_b, slot_a_status, slot_b_status, is_boot):
part_entry_size = guid_gpt.header.part_entry_size
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status, slot_b_status, is_boot):
part_entry_size = guid_gpt_a.header.part_entry_size
rf_a = BytesIO(gpt_data_a)
rf_b = BytesIO(gpt_data_b)
rf_a.seek(partition_a.entryoffset)
rf_b.seek(partition_b.entryoffset)
entryoffset_a = partition_a.entryoffset - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
rf_a.seek(entryoffset_a)
rf_b.seek(entryoffset_b)
sdata_a = rf_a.read(part_entry_size)
sdata_b = rf_b.read(part_entry_size)
@ -1386,35 +1360,34 @@ class firehose(metaclass=LogBase):
write_size = len(patch_data)
for i in range(0, write_size, size_each_patch):
pdata_subset = int(unpack("<I", patch_data[offset:offset+size_each_patch])[0])
self.cmd_patch( lun, start_sector, \
byte_offset + offset, \
pdata_subset, \
size_each_patch, True)
self.cmd_patch( lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, True)
offset += size_each_patch
return True
def update_gpt_info(guid_gpt_a, guid_gpt_b, partitionname_a, partitionname_b,
gpt_data_a, gpt_data_b, slot_a_status, slot_b_status, lun_a, lun_b
):
part_a = guid_gpt_a.partentries[partitionname_a]
part_b = guid_gpt_b.partentries[partitionname_b]
is_boot = False
if partitionname_a == "boot_a":
is_boot = True
pdata_a, poffset_a, pdata_b, poffset_b = patch_helper(
gpt_data_a, gpt_data_b,
guid_gpt_a, part_a, part_b,
guid_gpt_a, guid_gpt_b,
part_a, part_b,
slot_a_status, slot_b_status,
is_boot
)
if gpt_data_a and gpt_data_b:
entryoffset_a = poffset_a - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
gpt_data_a[entryoffset_a: entryoffset_a+len(pdata_a)] = pdata_a
gpt_data_a[entryoffset_a : entryoffset_a + len(pdata_a)] = pdata_a
new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a)
entryoffset_b = poffset_b - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
gpt_data_b[entryoffset_b:entryoffset_b + len(pdata_b)] = pdata_b
gpt_data_b[entryoffset_b : entryoffset_b + len(pdata_b)] = pdata_b
new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b)
start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES
@ -1427,7 +1400,6 @@ class firehose(metaclass=LogBase):
new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size]
cmd_patch_multiple(lun_a, start_sector_hdr_a, 0, new_hdr_a)
start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES
byte_offset_patch_b = poffset_b % self.cfg.SECTOR_SIZE_IN_BYTES
cmd_patch_multiple(lun_b, start_sector_patch_b, byte_offset_patch_b, pdata_b)
@ -1437,8 +1409,39 @@ class firehose(metaclass=LogBase):
new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size]
cmd_patch_multiple(lun_b, start_sector_hdr_b, 0, new_hdr_b)
return True
return True
return False
def fix_gpt_header(lun, fix_guid_gpt, good_hdr_crc, good_part_table_crc, good_part_table):
cmd_patch_multiple(lun, fix_guid_gpt.header.current_lba, 0x10, good_hdr_crc)
cmd_patch_multiple(lun, fix_guid_gpt.header.current_lba, 0x58, good_part_table_crc)
cmd_patch_multiple(lun, fix_guid_gpt.header.part_entry_start_lba, 0, good_part_table)
def check_fix_gpt_hdr(lun, guid_gpt, backup_guid_gpt, gpt_data, backup_gpt_data):
headeroffset = guid_gpt.sectorsize
prim_corrupted, backup_corrupted = False, False
prim_hdr = gpt_data[headeroffset : headeroffset + guid_gpt.header.header_size]
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset : headeroffset + guid_gpt_a.header.header_size]
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
prim_corrupted = prim_hdr_crc != test_hdr_crc or prim_part_table_crc != test_part_table_crc
backup_hdr = backup_gpt_data[headeroffset : headeroffset + backup_guid_gpt_a.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[headeroffset : headeroffset + guid_gpt_a.header.header_size]
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10 : 0x10 + 4], test_hdr[0x10 : 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58 : 0x58 + 4], test_hdr[0x58 : 0x58 + 4]
backup_corrupted = backup_hdr_crc != test_hdr_crc or backup_part_table_crc != test_part_table_crc
if prim_corrupted:
if backup_corrupted:
self.error("Both the gpt headers are corrupted, cannot recover")
return False
good_part_table = backup_gpt_data[2*backup_gpt_data.sectorsize]
fix_gpt_header(lun, guid_gpt, backup_hdr_crc, backup_part_table_crc, good_part_table)
elif backup_corrupted:
good_part_table = gpt_data[2*guid_gpt.sectorsize]
fix_gpt_header(lun, backup_guid_gpt, prim_hdr_crc, prim_part_table_crc, good_part_table)
return True
if slot.lower() not in ["a", "b"]:
self.error("Only slots a or b are accepted. Aborting.")
@ -1454,8 +1457,9 @@ class firehose(metaclass=LogBase):
for lun_a in self.luns:
lunname = "Lun" + str(lun_a)
fpartitions[lunname] = []
check_gpt_hdr = False
gpt_data_a, guid_gpt_a = self.get_gpt(lun_a, int(0), int(0), int(0))
#back_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba)
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba)
if guid_gpt_a is None:
break
else:
@ -1467,6 +1471,8 @@ class firehose(metaclass=LogBase):
lun_b = lun_a
gpt_data_b = gpt_data_a
guid_gpt_b = guid_gpt_a
backup_gpt_data_b = backup_gpt_data_a
backup_guid_gpt_b = backup_guid_gpt_a
else:
resp = self.detect_partition(arguments=None,
partitionname=partitionname_b,
@ -1475,82 +1481,33 @@ class firehose(metaclass=LogBase):
self.error(f"Cannot find partition {partitionname_b}")
return False
_, lun_b, gpt_data_b, guid_gpt_b = resp
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0 , 0, guid_gpt_b.header.backup_lba)
if not check_gpt_hdr:
good_headers_a = check_fix_gpt_hdr(lun_a, guid_gpt_a, backup_guid_gpt_a, gpt_data_a, backup_gpt_data_a)
if not good_headers_a:
return False
else:
self.warning(f"good gpt header a: {partitionname_a[:-2]}")
if lun_a != lun_b:
good_headers_b = check_fix_gpt_hdr(lun_b, guid_gpt_b, backup_guid_gpt_b, gpt_data_b, backup_gpt_data_b)
if not good_headers_b:
return False
else:
self.warning(f"good gpt header b: {partitionname_a[:-2]}")
check_gpt_hdr = True
update_gpt_info(guid_gpt_a, guid_gpt_b,
partitionname_a, partitionname_b,
gpt_data_a, gpt_data_b,
slot_a_status, slot_b_status,
lun_a, lun_b
)
#part_a = guid_gpt_a.partentries[partitionname_a]
#part_b = guid_gpt_b.partentries[partitionname_b]
#is_boot = False
#if partitionname_a == "boot_a":
# is_boot = True
#pdata_a, poffset_a, pdata_b, poffset_b = patch_helper(
# gpt_data_a, gpt_data_b,
# guid_gpt_a, part_a, part_b,
# slot_a_status, slot_b_status,
# is_boot
#)
#if gpt_data_a and gpt_data_b:
# gpt_data_a[poffset_a : poffset_a+len(pdata_a)] = pdata_a
# new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a)
# gpt_data_b[poffset_b:poffset_b + len(pdata_b)] = pdata_b
# new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b)
# prim_start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES
# prim_byte_offset_patch_a = poffset_a % self.cfg.SECTOR_SIZE_IN_BYTES
# prim_start_sector_hdr_a = guid_gpt_a.header.current_lba
# prim_start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES
# prim_byte_offset_patch_b = poffset_b % self.cfg.SECTOR_SIZE_IN_BYTES
# prim_start_sector_hdr_b = guid_gpt_b.header.current_lba
# headeroffset_a = prim_start_sector_hdr_a * guid_gpt_a.sectorsize
# prim_new_hdr_a = new_gpt_data_a[headeroffset_a : headeroffset_a+guid_gpt_a.header.header_size]
# headeroffset_b = prim_start_sector_hdr_b * guid_gpt_b.sectorsize
# prim_new_hdr_b = new_gpt_data_b[headeroffset_b : headeroffset_b+guid_gpt_b.header.header_size]
#self.update_gpt_info(prim_new_hdr_a, prim_new_hdr_b,
# pdata_a, pdata_b,
# lun_a, lun_b,
# prim_start_sector_hdr_a, prim_start_sector_hdr_b,
# prim_start_sector_patch_a, prim_start_sector_patch_b,
# prim_byte_offset_patch_a, prim_byte_offset_patch_b)
#self.update_gpt_info(new_hdr_a, new_hdr_b,
# pdata_a, pdata_b,
# lun_a, lun_b,
# prim_start_sector_hdr_a, prim_start_sector_hdr_b,
# prim_start_sector_patch_a, prim_start_sector_patch_b,
# prim_byte_offset_patch_a, prim_byte_offset_patch_b)
#back_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0 , 0, guid_gpt_a.header.backup_lba)
#print(f"signature: {backup_guid_gpt_a.header.signature}")
#print(f"revision: {backup_guid_gpt_a.header.revision}")
#print(f"header size: {backup_guid_gpt_a.header.header_size}")
#print(f"crc32: {backup_guid_gpt_a.header.crc32}")
#print(f"reserved: {backup_guid_gpt_a.header.reserved}")
#print(f"current_lba: {backup_guid_gpt_a.header.current_lba}")
#print(f"backup_lba: {backup_guid_gpt_a.header.backup_lba}")
#print(f"first_usable_lba: {backup_guid_gpt_a.header.first_usable_lba}")
#print(f"last_usable_lba: {backup_guid_gpt_a.header.last_usable_lba}")
#print(f"disk_guid: {backup_guid_gpt_a.header.disk_guid}")
#print(f"part_entry_stzrt_lba: {backup_guid_gpt_a.header.part_entry_start_lba}")
#print(f"num_part_entries: {backup_guid_gpt_a.header.num_part_entries}")
#print(f"part_entry_size: {backup_guid_gpt_a.header.part_entry_size}")
#print(f"crc32_part_entries: {backup_guid_gpt_a.header.crc32_part_entries}")
#return True
#if (backup_guid_gpt_a is None):
# self.error("error in backup")
# return False
lun_a, lun_b)
update_gpt_info(backup_guid_gpt_a, backup_guid_gpt_b,
partitionname_a, partitionname_b,
backup_gpt_data_a, backup_gpt_data_b,
slot_a_status, slot_b_status,
lun_a, lun_b)
except Exception as err:
self.error(str(err))

View file

@ -498,9 +498,9 @@ class gpt(metaclass=LogBase):
def fix_gpt_crc(self, data):
partentry_size = self.header.num_part_entries * self.header.part_entry_size
partentry_offset = self.header.part_entry_start_lba * self.sectorsize
partentry_offset = 2 * self.sectorsize
partdata = data[partentry_offset:partentry_offset + partentry_size]
headeroffset = self.header.current_lba * self.sectorsize
headeroffset = self.sectorsize
headerdata = bytearray(data[headeroffset:headeroffset + self.header.header_size])
headerdata[0x58:0x58 + 4] = pack("<I", crc32(partdata))
headerdata[0x10:0x10 + 4] = pack("<I", 0)