import logging
import os
import re
+import shutil
import stat
import subprocess
import sys
class SdFusingTarget:
def __init__(self, device, ltype):
+ # TODO: make a copy of a sublcass part_table
+ self.with_super = False
self.device = device
total_size = device_size(device)
self.user_size = total_size - self.reserved_space - \
reduce(lambda x, y: x + (y["size"] or 0), self.part_table, 0)
if self.user_size < 100:
- logging.error("Not enough space for user data. Use larger storage.")
+ logging.error(f"Not enough space for user data ({self.user_size}). Use larger storage.")
raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC), device)
self.part_table[self.user_partition]["size"] = self.user_size
self.label = Label(self.part_table, ltype)
self.update = args.update
super().__init__(device, "gpt")
self.with_super = True
-
+ self.super_alignment = 1048576
class Rpi4(SdFusingTargetAB, RpiInitParams):
long_name = "Raspberry Pi 4"
"ramdisk-recovery.img": 15,
"hal.img": 16,
}
+ params = (('reboot-param.bin', 'norm'),
+ ('reboot-param.info', 'norm'),
+ ('partition-ab.info', 'a'),
+ ('partition-ab-cloned.info', '1'),
+ ('upgrade-status.info', '0'),
+ ('partition-a-status.info', 'ok'),
+ ('partition-b-status.info', 'ok'))
def __init__(self, device, args):
self.reserved_space = 5
return True, support_delete
-def mkpart(args):
+def mkpart(args, target):
global Device
new, support_delete = check_sfdisk()
if not new:
- sys.exit(1)
- target = TARGETS[args.target](Device, args)
+ logging.error('sfdisk too old')
+ sys.exit(1)
#TODO: unmount target devices
if support_delete:
logging.error(f"{Device} is not a block device")
sys.exit(1)
-def check_partition_format(args):
+def check_partition_format(args, target):
global Format
global Device
logging.info(f"Skip formatting of {Device}".format(Device))
return
logging.info(f"Start formatting of {Device}")
- mkpart(args)
+ mkpart(args, target)
logging.info(f"{Device} formatted")
def check_ddversion():
logging.info("Done")
#TODO: verification
-def do_fuse_image_tarball(tarball, target):
+#TODO: functions with the target argument should probably
+# be part of some class
+
+def get_aligned_size(size, target):
+ return target.super_alignment*int(1+(size-1)/target.super_alignment)
+
+def do_fuse_image_super(tmpd, target):
+ metadata_slots = 2
+ metadata_size = 65536
+ metadata_aligned_size = get_aligned_size(metadata_size, target)
+
+ hal_path = os.path.join(tmpd, 'hal.img')
+ rootfs_path = os.path.join(tmpd, 'rootfs.img')
+ super_path = os.path.join(tmpd, 'super.img')
+
+ try:
+ hal_size = os.stat(hal_path).st_size
+ rootfs_size = os.stat(rootfs_path).st_size
+ except FileNotFoundError as e:
+ fn = os.path.split(e.filename)[-1]
+ logging.warning(f"{fn} is missing, skipping super partition image")
+ return
+
+ hal_aligned_size = get_aligned_size(hal_size, target)
+ rootfs_aligned_size = get_aligned_size(rootfs_size, target)
+ group_size = hal_aligned_size + rootfs_aligned_size
+ super_size = metadata_aligned_size + 2 * group_size
+
+ proc = subprocess.run(["lpmake", "-F",
+ f"-o={super_path}",
+ f"--device-size={super_size}",
+ f"--metadata-size={metadata_size}",
+ f"--metadata-slots={metadata_slots}",
+ "-g", f"tizen_a:{group_size}",
+ "-p", f"rootfs_a:none:{rootfs_aligned_size}:tizen_a",
+ "-p", f"hal_a:none:{hal_aligned_size}:tizen_a",
+ "-g", f"tizen_b:{group_size}",
+ "-p", f"rootfs_b:none:{rootfs_aligned_size}:tizen_b",
+ "-p", f"hal_b:none:{hal_aligned_size}:tizen_b",
+ "-i", "rootfs_a={root_path}",
+ "-i", "rootfs_b={root_path}",
+ "-i", "hal_a={hal_path}",
+ "-i", "hal_b={hal_path}"],
+ stdin=subprocess.DEVNULL,
+ stdout=None, stderr=None)
+
+ if proc.returncode != 0:
+ logging.error("Failed to create super.img")
+ do_fuse_image(super_path, target)
+
+def do_fuse_image_tarball(tarball, tmpd, target):
with tarfile.open(tarball) as tf:
for entry in tf:
+ if target.with_super:
+ if entry.name in('hal.img', 'rootfs.img'):
+ tf.extract(entry, path=tmpd)
+ continue
f = tf.extractfile(entry)
do_fuse_file(f, entry.name, target)
with open(img, 'rb') as f:
do_fuse_file(f, img, target)
-def fuse_image(args):
+def fuse_image(args, target):
if args.binaries is None or len(args.binaries) == 0:
return
-
- target = TARGETS[args.target](Device, args)
- for b in args.binaries:
- if re.search('\.(tar|tar\.gz|tgz)$', b):
- do_fuse_image_tarball(b, target)
- else:
- do_fuse_image(b, target)
+ with tempfile.TemporaryDirectory() as tmpd:
+ for b in args.binaries:
+ if re.search('\.(tar|tar\.gz|tgz)$', b):
+ do_fuse_image_tarball(b, tmpd, target)
+ else:
+ fn = os.path.split(b)[-1]
+ if target.with_super and fn in ('rootfs.img', 'hal.img'):
+ shutil.copy(b, os.path.join(tmpd, fn))
+ else:
+ do_fuse_image(b, target)
+
+ if target.with_super:
+ do_fuse_image_super(tmpd, target)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="For {}, version {}".format(
check_args(args)
check_device(args)
- check_partition_format(args)
- fuse_image(args)
+
+ target = TARGETS[args.target](Device, args)
+
+ check_partition_format(args, target)
+ fuse_image(args, target)
subprocess.run(['sync'],
stdin=subprocess.DEVNULL,
stdout=None, stderr=None )