WIP: sd_fusing_py
authorŁukasz Stelmach <l.stelmach@samsung.com>
Mon, 18 Sep 2023 19:31:30 +0000 (21:31 +0200)
committerŁukasz Stelmach <l.stelmach@samsung.com>
Wed, 20 Sep 2023 12:15:00 +0000 (14:15 +0200)
Change-Id: I7de043e5da6e40aaa99c646b89be3f1ffd43cb7f
Signed-off-by: Łukasz Stelmach <l.stelmach@samsung.com>
scripts/tizen/sd_fusing.py

index 9101790..687dd24 100755 (executable)
@@ -1,20 +1,53 @@
 #!/usr/bin/env python3
 
+from functools import reduce
+
 import argparse
 import atexit
 import logging
 import os
+import re
 import stat
 import subprocess
 import sys
+import tarfile
 
 __version__ = "1.0.0"
 
 Format = False
-
 Device = ""
 File = ""
 
+class Partition:
+    def __init__(self, name, size, start=None, ptype="0FC63DAF-8483-4772-8E79-3D69D8477DE4", fstype="raw"):
+        self.name = name
+        self.size = size
+        self.start = start
+        self.ptype = ptype
+    def __str__(self):
+        output = f"start={self.start}MiB, " if self.start else ""
+        output += f"size={self.size}MiB"
+        output += f", name={self.name}"
+        output += f", type={self.ptype}"
+        return output + "\n"
+
+class Label:
+    def __init__(self, part_table, ltype):
+        self.ltype = ltype
+        self.part_table = []
+        for part in part_table:
+            name = part["name"]
+            size = part["size"]
+            ptype = part.get("type", None)
+            ftype = part["fstype"]
+            start = part.get("start", None)
+            self.part_table.append(Partition(**part)) #name, size, start, ptype, ftype))
+    def __str__(self):
+        output = f"label: {self.ltype}\n"
+        for part in self.part_table:
+            output += str(part)
+        return output
+
 class Rpi3:
     long_name = "Raspberry Pi 3"
     pass
@@ -23,35 +56,131 @@ class Rpi4:
     long_name = "Raspberry Pi 4"
     pass
 
+class RV64:
+    long_name = "QEMU RISC-V 64-bit"
+    part_table = [
+        {"size": 2,    "fstype": "raw",  "name": "SPL", "start": 4,
+         "ptype": "2E54B353-1271-4842-806F-E436D6AF6985"},
+        {"size": 4,    "fstype": "raw",  "name": "u-boot",
+         "ptype": "5B193300-FC78-40CD-8002-E86C45580B47"},
+        {"size": 292,  "fstype": "vfat", "name": "boot_a",
+         "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
+        {"size": 36,   "fstype": "raw",  "name": "none"},
+        {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
+        {"size": 1344, "fstype": "ext4", "name": "system-data"},
+        {"size": None, "fstype": "ext4", "name": "user"},
+        {"size": 32,   "fstype": "ext4", "name": "module_a"},
+        {"size": 32,   "fstype": "ext4", "name": "ramdisk_a"},
+        {"size": 32,   "fstype": "ext4", "name": "ramdisk-recovery_a"},
+        {"size": 8,    "fstype": "raw",  "name": "inform"},
+        {"size": 256,  "fstype": "ext4", "name": "hal_a"},
+        {"size": 4,    "fstype": "raw",  "name": "reserved0"},
+        {"size": 64,   "fstype": "raw",  "name": "reserved1"},
+        {"size": 125,  "fstype": "raw",  "name": "reserved2"},
+    ]
+    binaries = {
+        "u-boot-spl.bin.normal.out":     1,
+        "u-boot.img":                    2,
+        "u-boot.itb":                    2,
+        "boot.img":                      3,
+        "rootfs.img":                    5,
+        "system-data.img":               6,
+        "user.img":                      7,
+        "modules.img":                   8,
+        "ramdisk.img":                   9,
+        "ramdisk-recovery.img":          10,
+        "hal.img":                       12,
+    }
+    def __init__(self, device):
+        self.device = device
+        total_size = device_size(device)
+        user_size = total_size - reduce(lambda x, y: x + (y["size"] or 0), self.part_table, 0)
+        self.part_table[6]["size"] = user_size
+        self.label = Label(self.part_table, "gpt")
+
+    def get_partition_index(self, binary):
+        return self.binaries.get(binary, None)
+
 class VF2:
     long_name = "VisionFive2"
     pass
 
-class RV64:
-    long_name = "QEMU RISC-V 64-bit"
-    pass
 
 TARGETS = {
     'rpi3': Rpi3,
     'rpi4': Rpi4,
     'vf2': VF2,
-    'rv64': RV64    
+    'rv64': RV64
 }
 
+def device_size(device):
+    proc = subprocess.run(["sfdisk", "-s", device],
+                            stdout=subprocess.PIPE)
+    size = int(proc.stdout.decode('utf-8').strip()) >> 10
+    logging.debug(f"{device} size {size}MiB")
+    return size
+
+def check_sfdisk():
+    proc = subprocess.run(['sfdisk', '-v'],
+                          stdout=subprocess.PIPE)
+    version = proc.stdout.decode('utf-8').strip()
+    logging.debug(f"Found {version}")
+    major, minor, patch = (int(x) for x in re.findall('[0-9]+', version))
+    support_delete = False
+
+    if major < 2 or major == 2 and minor < 26:
+        log.error(f"Your sfdisk {major}.{minor}.{patch} is too old.")
+        return False,False
+    elif major == 2 and minor >= 28:
+        support_delete = True
+
+    return True, support_delete
+
+def mkpart(args):
+    global Device
+    new, support_delete = check_sfdisk()
+
+    if not new:
+         sys.exit(1)
+    target = TARGETS[args.target](Device)
+
+    #TODO: unmount target devices
+    if support_delete:
+        logging.debug("Removing old partitions")
+        subprocess.run(['sfdisk', '--delete', Device])
+    else:
+        logging.debug("Removing old partition table")
+        subprocess.run(['dd', 'if=/dev/zero', 'of=' + Device,
+                        'bs=512', 'count=32', 'conv=notrunc'])
+
+    logging.debug("New partition table:\n" + str(target.label))
+    proc = subprocess.run(['sfdisk', Device],
+                          stdout=None,
+                          stderr=None,
+                          input=str(target.label).encode())
+
 def check_args(args):
+    global Format
+
     logging.info(f"Device: {args.device}")
 
     if args.binaries and len(args.binaries) > 0:
         logging.info("Fusing binar{}: {}".format("y" if len(args.binaries) == 1 else "ies",
                      ", ".join(args.binaries)))
 
-    if args.format:
-        response = input(f"{args.device} will be formatted. Is it OK? [y/N]")
+    if args.create:
+        Format = True
+
+    if args.format or Format:
+        response = input(f"{args.device} will be formatted. Is it OK? [y/N] ")
         if response.lower() in ('y', 'yes'):
             Format = True
 
 def check_device(args):
+    global Format
+    global Device
     Device = args.device
+
     if not os.path.exists(Device) and args.create:
         logging.debug(f"dd if=/dev/zero of={Device} conv=sparse bs=1M count={args.size}")
         rc = subprocess.run(["dd", "if=/dev/zero", f"of={Device}",
@@ -59,16 +188,20 @@ def check_device(args):
         if rc.returncode != 0:
             logging.error("Failed to create the backing file")
             sys.exit(1)
-        
+
     if os.path.isfile(Device):
+        global File
         File = Device
         logging.debug(f"losetup --show --partscan --find {File}")
-        rc = subprocess
 
-        proc = subprocess.Popen(["losetup", "--show", "--partscan",
+
+        proc = subprocess.run(["losetup", "--show", "--partscan",
                                  "--find", f"{File}"],
                                 stdout=subprocess.PIPE)
-        Device = proc.communicate()[0].decode('utf-8').strip()
+        Device = proc.stdout.decode('utf-8').strip()
+        if proc.returncode != 0:
+            logging.error(f"Failed to attach {File} to a loopback device")
+            sys.exit(1)
         logging.debug(f"Loop device found: {Device}")
         atexit.register(lambda: subprocess.run(["losetup", "-d", Device]))
 
@@ -82,15 +215,83 @@ def check_device(args):
     except TypeError:
         logging.error(f"{Device} is not a block device")
         sys.exit(1)
-    
-def check_partition_format():
-    pass
+
+def check_partition_format(args):
+    global Format
+    global Device
+
+    if not Format:
+        logging.info(f"Skip formatting of {Device}".format(Device))
+        return
+    logging.info(f"Start formatting of {Device}")
+    mkpart(args)
+    logging.info(f"{Device} formatted")
 
 def check_ddversion():
-    pass
+    proc = subprocess.run(["dd", "--version"],
+                            stdout=subprocess.PIPE)
+    version = proc.stdout.decode('utf-8').split('\n')[0].strip()
+    logging.debug(f"Found {version}")
+    major, minor = (int(x) for x in re.findall('[0-9]+', version))
 
-def fuse_image():
-    pass
+    if major < 8 or major == 8 and minor < 24:
+        return False
+
+    return True
+
+def get_partition_device(device, idx):
+    proc = subprocess.run(['lsblk', device, '-o', 'TYPE,KNAME'],
+                          stdout=subprocess.PIPE)
+    for l in proc.stdout.decode('utf-8').splitlines():
+        match = re.search(f"^part\s+(.*[^0-9]{idx})", l)
+        if match:
+            return match[1]
+    return None
+
+def do_fuse_file(f, name, target):
+    idx = target.get_partition_index(name)
+    if idx is None:
+        logging.info(f"No partition defined for {name}, skipping.")
+        return
+    pdevice = "/dev/" + get_partition_device(Device, idx)
+    proc_dd = subprocess.Popen(['dd', 'bs=4M',
+                                'oflag=direct',
+                                'iflag=fullblock',
+                                'conv=nocreat',
+                                f"of={pdevice}"],
+                               bufsize=(4 << 20),
+                               stdin=subprocess.PIPE,
+                               stdout=None, stderr=None)
+    logging.info(f"Writing {name} to {pdevice}")
+    buf = f.read(4 << 20)
+    while len(buf) > 0:
+        #TODO: progress
+        proc_dd.stdin.write(buf)
+        buf = f.read(4 << 20)
+    proc_dd.communicate()
+    logging.info("Done")
+    #TODO: verification
+
+def do_fuse_image_tarball(tarball, target):
+    with tarfile.open(tarball) as tf:
+        for entry in tf:
+            f = tf.extractfile(entry)
+            do_fuse_file(f, entry.name, target)
+
+def do_fuse_image(img, target):
+    with open(img, 'rb') as f:
+        do_fuse_file(f, img, target)
+
+def fuse_image(args):
+    if len(args.binaries) == 0:
+        return
+
+    target = TARGETS[args.target](Device)
+    for b in args.binaries:
+        if re.search('\.(tar|tar\.gz|tgz)$', b):
+            do_fuse_image_tarball(b, target)
+        else:
+            do_fuse_image(b, target)
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser(description="For {}, version {}".format(
@@ -98,7 +299,7 @@ if __name__ == '__main__':
         __version__
     ))
     parser.add_argument("-b", "--binary", action="append", dest="binaries",
-                        help="binary to flash")
+                        help="binary to flash, may be used multiple times")
     parser.add_argument("--create", action="store_true",
                         help="create the backing file and format the loopback device")
     parser.add_argument("-d", "--device", required=True,
@@ -110,19 +311,22 @@ if __name__ == '__main__':
                         "error, critical (default: warning)")
     parser.add_argument("-s", "--size", type=int, default=8192,
                         help="size of the backing file to create")
-    parser.add_argument("-t", "--target",
-                        help="target device model")
+    parser.add_argument("-t", "--target", required=True,
+                        help="Target device model. Use `--target list`"
+                        " to show supported devices.")
     parser.add_argument("--version", action="version",
                         version=f"%(prog)s {__version__}")
     args = parser.parse_args()
 
-    print(repr(args))
-
-    logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)s:%(message)s',
+    if args.target == 'list':
+        print("\nSupported devices:\n")
+        for k,v in TARGETS.items():
+            print(f"  {k:6}  {v.long_name}")
+        sys.exit(0)
+    logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s',
                         level=args.log_level.upper())
 
     check_args(args)
     check_device(args)
-    check_partition_format()
-    check_ddversion()
-    fuse_image()
+    check_partition_format(args)
+    fuse_image(args)