#!/usr/bin/env python3
+from functools import reduce
+
import argparse
import atexit
import logging
import os
+import re
import stat
import subprocess
import sys
+import tarfile
__version__ = "1.0.0"
Format = False
-
Device = ""
File = ""
+class Partition:
+ def __init__(self, name, size, start=None, ptype="0FC63DAF-8483-4772-8E79-3D69D8477DE4", fstype="raw"):
+ self.name = name
+ self.size = size
+ self.start = start
+ self.ptype = ptype
+ def __str__(self):
+ output = f"start={self.start}MiB, " if self.start else ""
+ output += f"size={self.size}MiB"
+ output += f", name={self.name}"
+ output += f", type={self.ptype}"
+ return output + "\n"
+
+class Label:
+ def __init__(self, part_table, ltype):
+ self.ltype = ltype
+ self.part_table = []
+ for part in part_table:
+ name = part["name"]
+ size = part["size"]
+ ptype = part.get("type", None)
+ ftype = part["fstype"]
+ start = part.get("start", None)
+ self.part_table.append(Partition(**part)) #name, size, start, ptype, ftype))
+ def __str__(self):
+ output = f"label: {self.ltype}\n"
+ for part in self.part_table:
+ output += str(part)
+ return output
+
class Rpi3:
long_name = "Raspberry Pi 3"
pass
long_name = "Raspberry Pi 4"
pass
+class RV64:
+ long_name = "QEMU RISC-V 64-bit"
+ part_table = [
+ {"size": 2, "fstype": "raw", "name": "SPL", "start": 4,
+ "ptype": "2E54B353-1271-4842-806F-E436D6AF6985"},
+ {"size": 4, "fstype": "raw", "name": "u-boot",
+ "ptype": "5B193300-FC78-40CD-8002-E86C45580B47"},
+ {"size": 292, "fstype": "vfat", "name": "boot_a",
+ "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
+ {"size": 36, "fstype": "raw", "name": "none"},
+ {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
+ {"size": 1344, "fstype": "ext4", "name": "system-data"},
+ {"size": None, "fstype": "ext4", "name": "user"},
+ {"size": 32, "fstype": "ext4", "name": "module_a"},
+ {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
+ {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
+ {"size": 8, "fstype": "raw", "name": "inform"},
+ {"size": 256, "fstype": "ext4", "name": "hal_a"},
+ {"size": 4, "fstype": "raw", "name": "reserved0"},
+ {"size": 64, "fstype": "raw", "name": "reserved1"},
+ {"size": 125, "fstype": "raw", "name": "reserved2"},
+ ]
+ binaries = {
+ "u-boot-spl.bin.normal.out": 1,
+ "u-boot.img": 2,
+ "u-boot.itb": 2,
+ "boot.img": 3,
+ "rootfs.img": 5,
+ "system-data.img": 6,
+ "user.img": 7,
+ "modules.img": 8,
+ "ramdisk.img": 9,
+ "ramdisk-recovery.img": 10,
+ "hal.img": 12,
+ }
+ def __init__(self, device):
+ self.device = device
+ total_size = device_size(device)
+ user_size = total_size - reduce(lambda x, y: x + (y["size"] or 0), self.part_table, 0)
+ self.part_table[6]["size"] = user_size
+ self.label = Label(self.part_table, "gpt")
+
+ def get_partition_index(self, binary):
+ return self.binaries.get(binary, None)
+
class VF2:
long_name = "VisionFive2"
pass
-class RV64:
- long_name = "QEMU RISC-V 64-bit"
- pass
TARGETS = {
'rpi3': Rpi3,
'rpi4': Rpi4,
'vf2': VF2,
- 'rv64': RV64
+ 'rv64': RV64
}
+def device_size(device):
+ proc = subprocess.run(["sfdisk", "-s", device],
+ stdout=subprocess.PIPE)
+ size = int(proc.stdout.decode('utf-8').strip()) >> 10
+ logging.debug(f"{device} size {size}MiB")
+ return size
+
+def check_sfdisk():
+ proc = subprocess.run(['sfdisk', '-v'],
+ stdout=subprocess.PIPE)
+ version = proc.stdout.decode('utf-8').strip()
+ logging.debug(f"Found {version}")
+ major, minor, patch = (int(x) for x in re.findall('[0-9]+', version))
+ support_delete = False
+
+ if major < 2 or major == 2 and minor < 26:
+ log.error(f"Your sfdisk {major}.{minor}.{patch} is too old.")
+ return False,False
+ elif major == 2 and minor >= 28:
+ support_delete = True
+
+ return True, support_delete
+
+def mkpart(args):
+ global Device
+ new, support_delete = check_sfdisk()
+
+ if not new:
+ sys.exit(1)
+ target = TARGETS[args.target](Device)
+
+ #TODO: unmount target devices
+ if support_delete:
+ logging.debug("Removing old partitions")
+ subprocess.run(['sfdisk', '--delete', Device])
+ else:
+ logging.debug("Removing old partition table")
+ subprocess.run(['dd', 'if=/dev/zero', 'of=' + Device,
+ 'bs=512', 'count=32', 'conv=notrunc'])
+
+ logging.debug("New partition table:\n" + str(target.label))
+ proc = subprocess.run(['sfdisk', Device],
+ stdout=None,
+ stderr=None,
+ input=str(target.label).encode())
+
def check_args(args):
+ global Format
+
logging.info(f"Device: {args.device}")
if args.binaries and len(args.binaries) > 0:
logging.info("Fusing binar{}: {}".format("y" if len(args.binaries) == 1 else "ies",
", ".join(args.binaries)))
- if args.format:
- response = input(f"{args.device} will be formatted. Is it OK? [y/N]")
+ if args.create:
+ Format = True
+
+ if args.format or Format:
+ response = input(f"{args.device} will be formatted. Is it OK? [y/N] ")
if response.lower() in ('y', 'yes'):
Format = True
def check_device(args):
+ global Format
+ global Device
Device = args.device
+
if not os.path.exists(Device) and args.create:
logging.debug(f"dd if=/dev/zero of={Device} conv=sparse bs=1M count={args.size}")
rc = subprocess.run(["dd", "if=/dev/zero", f"of={Device}",
if rc.returncode != 0:
logging.error("Failed to create the backing file")
sys.exit(1)
-
+
if os.path.isfile(Device):
+ global File
File = Device
logging.debug(f"losetup --show --partscan --find {File}")
- rc = subprocess
- proc = subprocess.Popen(["losetup", "--show", "--partscan",
+
+ proc = subprocess.run(["losetup", "--show", "--partscan",
"--find", f"{File}"],
stdout=subprocess.PIPE)
- Device = proc.communicate()[0].decode('utf-8').strip()
+ Device = proc.stdout.decode('utf-8').strip()
+ if proc.returncode != 0:
+ logging.error(f"Failed to attach {File} to a loopback device")
+ sys.exit(1)
logging.debug(f"Loop device found: {Device}")
atexit.register(lambda: subprocess.run(["losetup", "-d", Device]))
except TypeError:
logging.error(f"{Device} is not a block device")
sys.exit(1)
-
-def check_partition_format():
- pass
+
+def check_partition_format(args):
+ global Format
+ global Device
+
+ if not Format:
+ logging.info(f"Skip formatting of {Device}".format(Device))
+ return
+ logging.info(f"Start formatting of {Device}")
+ mkpart(args)
+ logging.info(f"{Device} formatted")
def check_ddversion():
- pass
+ proc = subprocess.run(["dd", "--version"],
+ stdout=subprocess.PIPE)
+ version = proc.stdout.decode('utf-8').split('\n')[0].strip()
+ logging.debug(f"Found {version}")
+ major, minor = (int(x) for x in re.findall('[0-9]+', version))
-def fuse_image():
- pass
+ if major < 8 or major == 8 and minor < 24:
+ return False
+
+ return True
+
+def get_partition_device(device, idx):
+ proc = subprocess.run(['lsblk', device, '-o', 'TYPE,KNAME'],
+ stdout=subprocess.PIPE)
+ for l in proc.stdout.decode('utf-8').splitlines():
+ match = re.search(f"^part\s+(.*[^0-9]{idx})", l)
+ if match:
+ return match[1]
+ return None
+
+def do_fuse_file(f, name, target):
+ idx = target.get_partition_index(name)
+ if idx is None:
+ logging.info(f"No partition defined for {name}, skipping.")
+ return
+ pdevice = "/dev/" + get_partition_device(Device, idx)
+ proc_dd = subprocess.Popen(['dd', 'bs=4M',
+ 'oflag=direct',
+ 'iflag=fullblock',
+ 'conv=nocreat',
+ f"of={pdevice}"],
+ bufsize=(4 << 20),
+ stdin=subprocess.PIPE,
+ stdout=None, stderr=None)
+ logging.info(f"Writing {name} to {pdevice}")
+ buf = f.read(4 << 20)
+ while len(buf) > 0:
+ #TODO: progress
+ proc_dd.stdin.write(buf)
+ buf = f.read(4 << 20)
+ proc_dd.communicate()
+ logging.info("Done")
+ #TODO: verification
+
+def do_fuse_image_tarball(tarball, target):
+ with tarfile.open(tarball) as tf:
+ for entry in tf:
+ f = tf.extractfile(entry)
+ do_fuse_file(f, entry.name, target)
+
+def do_fuse_image(img, target):
+ with open(img, 'rb') as f:
+ do_fuse_file(f, img, target)
+
+def fuse_image(args):
+ if len(args.binaries) == 0:
+ return
+
+ target = TARGETS[args.target](Device)
+ for b in args.binaries:
+ if re.search('\.(tar|tar\.gz|tgz)$', b):
+ do_fuse_image_tarball(b, target)
+ else:
+ do_fuse_image(b, target)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="For {}, version {}".format(
__version__
))
parser.add_argument("-b", "--binary", action="append", dest="binaries",
- help="binary to flash")
+ help="binary to flash, may be used multiple times")
parser.add_argument("--create", action="store_true",
help="create the backing file and format the loopback device")
parser.add_argument("-d", "--device", required=True,
"error, critical (default: warning)")
parser.add_argument("-s", "--size", type=int, default=8192,
help="size of the backing file to create")
- parser.add_argument("-t", "--target",
- help="target device model")
+ parser.add_argument("-t", "--target", required=True,
+ help="Target device model. Use `--target list`"
+ " to show supported devices.")
parser.add_argument("--version", action="version",
version=f"%(prog)s {__version__}")
args = parser.parse_args()
- print(repr(args))
-
- logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)s:%(message)s',
+ if args.target == 'list':
+ print("\nSupported devices:\n")
+ for k,v in TARGETS.items():
+ print(f" {k:6} {v.long_name}")
+ sys.exit(0)
+ logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s',
level=args.log_level.upper())
check_args(args)
check_device(args)
- check_partition_format()
- check_ddversion()
- fuse_image()
+ check_partition_format(args)
+ fuse_image(args)