3 from functools import reduce
24 class DebugFormatter(logging.Formatter):
25 def format(self, record):
26 if record.levelno == logging.DEBUG:
27 record.debuginfo = "[{}:{}] ".format(os.path.basename(record.pathname), record.lineno)
30 return logging.Formatter.format(self, record)
32 class ColorFormatter(DebugFormatter):
34 logging.CRITICAL: "\x1b[35;1m",
35 logging.ERROR: "\x1b[33;1m",
36 logging.WARNING: "\x1b[33;1m",
37 logging.INFO: "\x1b[0m",
38 logging.DEBUG: "\x1b[30;1m",
39 logging.NOTSET: "\x1b[30;1m"
41 def format(self, record):
42 record.levelcolor = self._levelToColor[record.levelno]
43 record.msg = record.msg
44 return super().format(record)
46 class ColorStreamHandler(logging.StreamHandler):
47 def __init__(self, stream=None, format=None, datefmt=None, style='%', cformat=None):
48 logging.StreamHandler.__init__(self, stream)
49 if os.isatty(self.stream.fileno()):
50 self.formatter = ColorFormatter(cformat, datefmt, style)
51 self.terminator = "\x1b[0m\n"
53 self.formatter = DebugFormatter(format, datefmt, style)
56 def __init__(self, name, size, start=None, ptype=None, fstype="raw", bootable=False):
61 self.bootable = bootable
65 output.append(f"start={self.start}MiB")
66 if type(self.size) == int and self.size >= 0:
67 output.append(f"size={self.size}MiB")
69 output.append(f"name={self.name}")
70 output.append(f"type={self.ptype}")
72 output.append("bootable")
73 return ", ".join(output) + "\n"
76 def __init__(self, part_table, ltype):
79 ptype = "0FC63DAF-8483-4772-8E79-3D69D8477DE4"
83 for part in part_table:
84 part["ptype"] = part.get("ptype", ptype)
85 self.part_table.append(Partition(**part))
87 output = f"label: {self.ltype}\n"
88 for part in self.part_table:
93 def __init__(self, device, ltype):
94 # TODO: make a copy of a sublcass part_table
95 self.with_super = False
97 total_size = device_size(device)
98 self.user_size = total_size - self.reserved_space - \
99 reduce(lambda x, y: x + (y["size"] or 0), self.part_table, 0)
100 if self.user_size < 100:
101 logging.error(f"Not enough space for user data ({self.user_size}). Use larger storage.")
102 raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC), device)
103 self.part_table[self.user_partition]["size"] = self.user_size
104 self.label = Label(self.part_table, ltype)
106 def get_partition_index(self, binary):
107 return self.binaries.get(binary, None)
110 def initialize_parameters(self):
113 class SdFusingTargetAB(SdFusingTarget):
114 def get_partition_index(self, binary):
115 if self.update == 'b':
116 return self.binaries_b.get(binary, None)
117 return self.binaries.get(binary, None)
120 def initialize_parameters(self):
121 logging.debug("Initializing parameterss")
123 for i, p in enumerate(self.part_table):
124 if p['name'] == 'inform':
126 d = "/dev/" + get_partition_device(self.device, n)
128 argv = ['tune2fs', '-O', '^metadata_csum', d]
129 logging.debug(" ".join(argv))
131 stdin=subprocess.DEVNULL,
132 stdout=None, stderr=None)
134 with tempfile.TemporaryDirectory() as mnt:
135 argv = ['mount', '-t', 'ext4', d, mnt]
136 logging.debug(" ".join(argv))
137 proc = subprocess.run(argv,
138 stdin=subprocess.DEVNULL,
139 stdout=None, stderr=None)
140 if proc.returncode != 0:
141 logging.error("Failed to mount {d} in {mnt}")
143 for param, value in self.params:
144 with open(os.path.join(mnt, param), 'w') as f:
145 f.write(value + '\n')
147 logging.debug(" ".join(argv))
149 stdin=subprocess.DEVNULL,
150 stdout=None, stderr=None)
152 class Rpi3(SdFusingTarget,RpiInitParams):
153 long_name = "Raspberry Pi 3"
155 {"size": 64, "fstype": "vfat", "name": "boot", "start": 4, "ptype": "0xe", "bootable": True},
156 {"size": 3072, "fstype": "ext4", "name": "rootfs"},
157 {"size": 1344, "fstype": "ext4", "name": "system-data"},
158 {"size": None, "ptype": "5", "name": "extended", "start": 4484},
159 {"size": None, "fstype": "ext4", "name": "user"},
160 {"size": 32, "fstype": "ext4", "name": "modules"},
161 {"size": 32, "fstype": "ext4", "name": "ramdisk"},
162 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery"},
163 {"size": 8, "fstype": "ext4", "name": "inform"},
164 {"size": 256, "fstype": "ext4", "name": "hal"},
165 {"size": 125, "fstype": "ext4", "name": "reserved2"},
170 "system-data.img": 3,
174 "ramdisk-recovery.img": 8,
177 params = (('reboot-param.bin', ''),)
179 def __init__(self, device, args):
180 self.reserved_space = 12
181 self.user_partition = 4
182 super().__init__(device, "dos")
184 class Rpi4Super(SdFusingTargetAB, RpiInitParams):
185 long_name = "Raspberry Pi 4 w/ super partition"
187 {"size": 64, "fstype": "vfat", "name": "boot_a","start": 4,
188 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
189 {"size": 6656, "fstype": "ext4", "name": "super"},
190 {"size": 1344, "fstype": "ext4", "name": "system-data"},
191 {"size": 36, "fstype": "raw", "name": "none"},
192 {"size": None, "fstype": "ext4", "name": "user"},
193 {"size": 32, "fstype": "ext4", "name": "module_a"},
194 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
195 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
196 {"size": 8, "fstype": "ext4", "name": "inform"},
197 {"size": 0, "fstype": "raw", "name": "empty"},
198 {"size": 64, "fstype": "vfat", "name": "boot_b",
199 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
200 {"size": 0, "fstype": "raw", "name": "empty"},
201 {"size": 32, "fstype": "ext4", "name": "module_b"},
202 {"size": 32, "fstype": "ext4", "name": "ramdisk_b"},
203 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_b"},
204 {"size": 0, "fstype": "raw", "name": "empty"},
205 {"size": 4, "fstype": "ext4", "name": "reserved0"},
206 {"size": 64, "fstype": "ext4", "name": "reserved1"},
207 {"size": 125, "fstype": "ext4", "name": "reserved2"}
212 "system-data.img": 3,
216 "ramdisk-recovery.img": 8,
222 "ramdisk-recovery.img": 15,
224 params = (('reboot-param.bin', 'norm'),
225 ('reboot-param.info', 'norm'),
226 ('partition-ab.info', 'a'),
227 ('partition-ab-cloned.info', '1'),
228 ('upgrade-status.info', '0'),
229 ('partition-a-status.info', 'ok'),
230 ('partition-b-status.info', 'ok'))
232 def __init__(self, device, args):
233 self.reserved_space = 8
234 self.user_partition = 4
235 self.update = args.update
236 super().__init__(device, "gpt")
237 self.with_super = True
238 self.super_alignment = 1048576
240 class Rpi4(SdFusingTargetAB, RpiInitParams):
241 long_name = "Raspberry Pi 4"
243 {"size": 64, "fstype": "vfat", "name": "boot_a", "start": 4,
244 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
245 {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
246 {"size": 1344, "fstype": "ext4", "name": "system-data"},
247 {"size": 36, "fstype": "raw", "name": "none"},
248 {"size": None, "fstype": "ext4", "name": "user"},
249 {"size": 32, "fstype": "ext4", "name": "module_a"},
250 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
251 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
252 {"size": 8, "fstype": "ext4", "name": "inform"},
253 {"size": 256, "fstype": "ext4", "name": "hal_a"},
254 {"size": 64, "fstype": "vfat", "name": "boot_b",
255 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
256 {"size": 3072, "fstype": "ext4", "name": "rootfs_b"},
257 {"size": 32, "fstype": "ext4", "name": "module_b"},
258 {"size": 32, "fstype": "ext4", "name": "ramdisk_b"},
259 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_b"},
260 {"size": 256, "fstype": "ext4", "name": "hal_b"},
261 {"size": 4, "fstype": "ext4", "name": "param"},
262 {"size": 64, "fstype": "ext4", "name": "reserved1"},
263 {"size": 125, "fstype": "ext4", "name": "reserved2"},
268 "system-data.img": 3,
272 "ramdisk-recovery.img": 8,
280 "ramdisk-recovery.img": 15,
283 params = (('reboot-param.bin', 'norm'),
284 ('reboot-param.info', 'norm'),
285 ('partition-ab.info', 'a'),
286 ('partition-ab-cloned.info', '1'),
287 ('upgrade-status.info', '0'),
288 ('partition-a-status.info', 'ok'),
289 ('partition-b-status.info', 'ok'))
291 def __init__(self, device, args):
292 self.reserved_space = 5
293 self.user_partition = 4
294 self.update = args.update
295 super().__init__(device, "gpt")
297 class RV64(SdFusingTarget):
298 long_name = "QEMU RISC-V 64-bit"
300 {"size": 2, "fstype": "raw", "name": "SPL", "start": 4,
301 "ptype": "2E54B353-1271-4842-806F-E436D6AF6985"},
302 {"size": 4, "fstype": "raw", "name": "u-boot",
303 "ptype": "5B193300-FC78-40CD-8002-E86C45580B47"},
304 {"size": 292, "fstype": "vfat", "name": "boot_a",
305 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
306 {"size": 36, "fstype": "raw", "name": "none"},
307 {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
308 {"size": 1344, "fstype": "ext4", "name": "system-data"},
309 {"size": None, "fstype": "ext4", "name": "user"},
310 {"size": 32, "fstype": "ext4", "name": "module_a"},
311 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
312 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
313 {"size": 8, "fstype": "raw", "name": "inform"},
314 {"size": 256, "fstype": "ext4", "name": "hal_a"},
315 {"size": 4, "fstype": "raw", "name": "reserved0"},
316 {"size": 64, "fstype": "raw", "name": "reserved1"},
317 {"size": 125, "fstype": "raw", "name": "reserved2"},
320 "u-boot-spl.bin.normal.out": 1,
325 "system-data.img": 6,
329 "ramdisk-recovery.img": 10,
333 def __init__(self, device, args):
334 self.user_partition = 6
335 self.reserved_space = 5
336 super().__init__(device, 'gpt')
339 long_name = "VisionFive2"
351 def device_size(device):
352 argv = ["sfdisk", "-s", device]
353 logging.debug(" ".join(argv))
354 proc = subprocess.run(argv,
355 stdout=subprocess.PIPE)
356 size = int(proc.stdout.decode('utf-8').strip()) >> 10
357 logging.debug(f"{device} size {size}MiB")
361 proc = subprocess.run(['sfdisk', '-v'],
362 stdout=subprocess.PIPE)
363 version = proc.stdout.decode('utf-8').strip()
364 logging.debug(f"Found {version}")
365 major, minor = [int(x) for x in re.findall('[0-9]+', version)][0:2]
366 support_delete = False
368 if major < 2 or major == 2 and minor < 26:
369 log.error(f"Your sfdisk {major}.{minor}.{patch} is too old.")
371 elif major == 2 and minor >= 28:
372 support_delete = True
374 return True, support_delete
376 def mkpart(args, target):
378 new, support_delete = check_sfdisk()
381 logging.error('sfdisk too old')
384 with open('/proc/self/mounts') as mounts:
385 device_kname = '/dev/' + get_device_kname(Device)
386 device_re = re.compile(device_kname + '[^ ]*')
387 logging.debug("Checking for mounted partitions on {device_kname}")
389 match = device_re.match(m)
391 logging.warning('Found mounted device: ' + match[0])
392 argv = ['umount', match[0]]
393 logging.debug(" ".join(argv))
394 proc = subprocess.run(argv)
395 if proc.returncode != 0:
396 logging.error(f"Failed to unmount {match[0]}")
400 logging.info("Removing old partitions")
401 argv = ['sfdisk', '--delete', Device]
402 logging.debug(" ".join(argv))
403 proc = subprocess.run(argv)
404 if proc.returncode != 0:
405 logging.error(f"Failed to remove the old partitions from {Device}")
408 logging.info("Removing old partition table")
409 argv = ['dd', 'if=/dev/zero', 'of=' + Device,
410 'bs=512', 'count=32', 'conv=notrunc']
411 logging.debug(" ".join(argv))
412 proc = subprocess.run(argv)
413 if proc.returncode != 0:
414 logging.error(f"Failed to clear the old partition table on {Device}")
417 logging.debug("New partition table:\n" + str(target.label))
418 argv = ['sfdisk', Device]
419 logging.debug(" ".join(argv))
420 proc = subprocess.run(argv,
423 input=str(target.label).encode())
424 if proc.returncode != 0:
425 logging.error(f"Failed to create partition a new table on {Device}")
426 logging.error(f"New partition table:\n" + str(target.label))
429 for i, part in enumerate(target.part_table):
430 d = "/dev/" + get_partition_device(target.device, i+1)
431 if not 'fstype' in part:
432 logging.debug(f"Filesystem not defined for {d}, skipping")
434 logging.debug(f"Formatting {d} as {part['fstype']}")
435 if part['fstype'] == 'vfat':
436 argv = ['mkfs.vfat', '-F', '16', '-n', part['name'], d]
437 logging.debug(" ".join(argv))
438 proc = subprocess.run(argv,
439 stdin=subprocess.DEVNULL,
440 stdout=None, stderr=None)
441 if proc.returncode != 0:
442 log.error(f"Failed to create FAT filesystem on {d}")
444 elif part['fstype'] == 'ext4':
445 argv = ['mkfs.ext4', '-q', '-L', part['name'], d]
446 logging.debug(" ".join(argv))
447 proc = subprocess.run(argv,
448 stdin=subprocess.DEVNULL,
449 stdout=None, stderr=None)
450 if proc.returncode != 0:
451 log.error(f"Failed to create ext4 filesystem on {d}")
453 elif part['fstype'] == 'raw':
455 target.initialize_parameters()
457 def check_args(args):
460 logging.info(f"Device: {args.device}")
462 if args.binaries and len(args.binaries) > 0:
463 logging.info("Fusing binar{}: {}".format("y" if len(args.binaries) == 1 else "ies",
464 ", ".join(args.binaries)))
469 if args.format or Format:
470 response = input(f"{args.device} will be formatted. Is it OK? [y/N] ")
471 if response.lower() in ('y', 'yes'):
474 def check_device(args):
480 if os.path.exists(Device):
481 logging.error(f"Failed to create '{Device}', the file alread exists")
484 argv = ["dd", "if=/dev/zero", f"of={Device}",
485 "conv=sparse", "bs=1M", f"count={args.size}"]
486 logging.debug(" ".join(argv))
487 rc = subprocess.run(argv)
488 if rc.returncode != 0:
489 logging.error("Failed to create the backing file")
492 if os.path.isfile(Device):
496 argv = ["losetup", "--show", "--partscan", "--find", f"{File}"]
497 logging.debug(" ".join(argv))
498 proc = subprocess.run(argv,
499 stdout=subprocess.PIPE)
500 Device = proc.stdout.decode('utf-8').strip()
501 if proc.returncode != 0:
502 logging.error(f"Failed to attach {File} to a loopback device")
504 logging.debug(f"Loop device found: {Device}")
505 atexit.register(lambda: subprocess.run(["losetup", "-d", Device]))
509 if not stat.S_ISBLK(s.st_mode):
511 except FileNotFoundError:
512 logging.error(f"No such device: {Device}")
515 logging.error(f"{Device} is not a block device")
518 def check_partition_format(args, target):
523 logging.info(f"Skip formatting of {Device}".format(Device))
525 logging.info(f"Start formatting of {Device}")
527 logging.info(f"{Device} formatted")
529 def check_ddversion():
530 proc = subprocess.run(["dd", "--version"],
531 stdout=subprocess.PIPE)
532 version = proc.stdout.decode('utf-8').split('\n')[0].strip()
533 logging.debug(f"Found {version}")
534 major, minor = (int(x) for x in re.findall('[0-9]+', version))
536 if major < 8 or major == 8 and minor < 24:
541 def get_partition_device(device, idx):
542 argv = ['lsblk', device, '-o', 'TYPE,KNAME']
543 logging.debug(" ".join(argv))
544 proc = subprocess.run(argv,
545 stdout=subprocess.PIPE)
546 for l in proc.stdout.decode('utf-8').splitlines():
547 match = re.search(f"^part\s+(.*[^0-9]{idx})", l)
552 def get_device_kname(device):
553 argv = ['lsblk', device, '-o', 'TYPE,KNAME']
554 logging.debug(" ".join(argv))
555 proc = subprocess.run(argv,
556 stdout=subprocess.PIPE)
557 for l in proc.stdout.decode('utf-8').splitlines():
558 match = re.search(f"^(disk|loop)\s+(.*)", l)
563 def do_fuse_file(f, name, target):
564 idx = target.get_partition_index(name)
566 logging.info(f"No partition defined for {name}, skipping.")
568 pdevice = "/dev/" + get_partition_device(Device, idx)
569 argv = ['dd', 'bs=4M',
574 logging.debug(" ".join(argv))
575 proc_dd = subprocess.Popen(argv,
577 stdin=subprocess.PIPE,
578 stdout=None, stderr=None)
579 logging.info(f"Writing {name} to {pdevice}")
580 buf = f.read(4 << 20)
583 proc_dd.stdin.write(buf)
584 buf = f.read(4 << 20)
585 proc_dd.communicate()
589 #TODO: functions with the target argument should probably
590 # be part of some class
592 def get_aligned_size(size, target):
593 return target.super_alignment*int(1+(size-1)/target.super_alignment)
595 def do_fuse_image_super(tmpd, target):
597 metadata_size = 65536
598 metadata_aligned_size = get_aligned_size(metadata_size, target)
600 hal_path = os.path.join(tmpd, 'hal.img')
601 rootfs_path = os.path.join(tmpd, 'rootfs.img')
602 super_path = os.path.join(tmpd, 'super.img')
605 hal_size = os.stat(hal_path).st_size
606 rootfs_size = os.stat(rootfs_path).st_size
607 except FileNotFoundError as e:
608 fn = os.path.split(e.filename)[-1]
609 logging.warning(f"{fn} is missing, skipping super partition image")
612 hal_aligned_size = get_aligned_size(hal_size, target)
613 rootfs_aligned_size = get_aligned_size(rootfs_size, target)
614 group_size = hal_aligned_size + rootfs_aligned_size
615 super_size = metadata_aligned_size + 2 * group_size
617 argv = ["lpmake", "-F",
619 f"--device-size={super_size}",
620 f"--metadata-size={metadata_size}",
621 f"--metadata-slots={metadata_slots}",
622 "-g", f"tizen_a:{group_size}",
623 "-p", f"rootfs_a:none:{rootfs_aligned_size}:tizen_a",
624 "-p", f"hal_a:none:{hal_aligned_size}:tizen_a",
625 "-g", f"tizen_b:{group_size}",
626 "-p", f"rootfs_b:none:{rootfs_aligned_size}:tizen_b",
627 "-p", f"hal_b:none:{hal_aligned_size}:tizen_b",
628 "-i", f"rootfs_a={rootfs_path}",
629 "-i", f"rootfs_b={rootfs_path}",
630 "-i", f"hal_a={hal_path}",
631 "-i", f"hal_b={hal_path}"]
632 logging.debug(" ".join(argv))
633 proc = subprocess.run(argv,
634 stdin=subprocess.DEVNULL,
635 stdout=None, stderr=None)
637 if proc.returncode != 0:
638 logging.error("Failed to create super.img")
639 do_fuse_image(super_path, target)
641 def do_fuse_image_tarball(tarball, tmpd, target):
642 with tarfile.open(tarball) as tf:
644 if target.with_super:
645 if entry.name in('hal.img', 'rootfs.img'):
646 tf.extract(entry, path=tmpd)
648 f = tf.extractfile(entry)
649 do_fuse_file(f, entry.name, target)
651 def do_fuse_image(img, target):
652 with open(img, 'rb') as f:
653 do_fuse_file(f, os.path.basename(img), target)
655 def fuse_image(args, target):
656 if args.binaries is None or len(args.binaries) == 0:
658 with tempfile.TemporaryDirectory() as tmpd:
659 for b in args.binaries:
660 if re.search('\.(tar|tar\.gz|tgz)$', b):
661 do_fuse_image_tarball(b, tmpd, target)
663 fn = os.path.split(b)[-1]
664 if target.with_super and fn in ('rootfs.img', 'hal.img'):
665 shutil.copy(b, os.path.join(tmpd, fn))
667 do_fuse_image(b, target)
669 if target.with_super:
670 do_fuse_image_super(tmpd, target)
672 if __name__ == '__main__':
673 parser = argparse.ArgumentParser(description="For {}, version {}".format(
674 ", ".join([v.long_name for k,v in TARGETS.items()]),
677 parser.add_argument("-b", "--binary", action="extend", dest="binaries",
679 help="binary to flash, may be used multiple times")
680 parser.add_argument("--create", action="store_true",
681 help="create the backing file and format the loopback device")
682 parser.add_argument("--debug", action='store_const', const='DEBUG',
684 help="set log level to DEBUG")
685 parser.add_argument("-d", "--device", required=True,
686 help="device node or loopback backing file")
687 parser.add_argument("--format", action="store_true",
688 help="create new partition table on the target device")
689 parser.add_argument("--log-level", dest="log_level", default="warning",
690 help="Verbosity, possible values: debug, info, warning, "
691 "error, critical (default: warning)")
692 parser.add_argument("--size", type=int, default=8192,
693 help="size of the backing file to create (in MiB)")
694 parser.add_argument("-t", "--target", required=True,
695 help="Target device model. Use `--target list`"
696 " to show supported devices.")
697 parser.add_argument("--update", choices=['a', 'b'], default=None,
698 help="Choose partition set to update: a or b.")
699 parser.add_argument("--version", action="version",
700 version=f"%(prog)s {__version__}")
701 args = parser.parse_args()
703 if args.target == 'list':
704 print("\nSupported devices:\n")
705 for k,v in TARGETS.items():
706 print(f" {k:6} {v.long_name}")
709 conh = ColorStreamHandler(format='%(asctime)s.%(msecs)d %(debuginfo)s%(levelname)-8s %(message)s',
710 cformat='%(asctime)s.%(msecs)d %(debuginfo)s%(levelcolor)s%(message)s',
711 datefmt='%Y-%m-%dT%H:%M:%S')
712 log_handlers = [conh]
713 logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
714 handlers=log_handlers,
715 level=args.log_level.upper())
717 logging.debug(" ".join(sys.argv))
721 target = TARGETS[args.target](Device, args)
723 check_partition_format(args, target)
724 fuse_image(args, target)
725 subprocess.run(['sync'],
726 stdin=subprocess.DEVNULL,
727 stdout=None, stderr=None )