3 from functools import reduce
24 class DebugFormatter(logging.Formatter):
25 def format(self, record):
26 if record.levelno == logging.DEBUG:
27 record.debuginfo = "[{}:{}] ".format(os.path.basename(record.pathname), record.lineno)
30 return logging.Formatter.format(self, record)
32 class ColorFormatter(DebugFormatter):
34 logging.CRITICAL: "\x1b[35;1m",
35 logging.ERROR: "\x1b[33;1m",
36 logging.WARNING: "\x1b[33;1m",
37 logging.INFO: "\x1b[0m",
38 logging.DEBUG: "\x1b[30;1m",
39 logging.NOTSET: "\x1b[30;1m"
41 def format(self, record):
42 record.levelcolor = self._levelToColor[record.levelno]
43 record.msg = record.msg
44 return super().format(record)
46 class ColorStreamHandler(logging.StreamHandler):
47 def __init__(self, stream=None, format=None, datefmt=None, style='%', cformat=None):
48 logging.StreamHandler.__init__(self, stream)
49 if os.isatty(self.stream.fileno()):
50 self.formatter = ColorFormatter(cformat, datefmt, style)
51 self.terminator = "\x1b[0m\n"
53 self.formatter = DebugFormatter(format, datefmt, style)
56 def __init__(self, name, size, start=None, ptype=None, fstype="raw", bootable=False):
61 self.bootable = bootable
65 output.append(f"start={self.start}MiB")
66 if type(self.size) == int and self.size >= 0:
67 output.append(f"size={self.size}MiB")
69 output.append(f"name={self.name}")
70 output.append(f"type={self.ptype}")
72 output.append("bootable")
73 return ", ".join(output) + "\n"
76 def __init__(self, part_table, ltype):
79 ptype = "0FC63DAF-8483-4772-8E79-3D69D8477DE4"
83 for part in part_table:
84 part["ptype"] = part.get("ptype", ptype)
85 self.part_table.append(Partition(**part))
87 output = f"label: {self.ltype}\n"
88 for part in self.part_table:
93 def __init__(self, device, ltype):
94 # TODO: make a copy of a sublcass part_table
95 self.with_super = False
97 total_size = device_size(device)
98 self.user_size = total_size - self.reserved_space - \
99 reduce(lambda x, y: x + (y["size"] or 0), self.part_table, 0)
100 if self.user_size < 100:
101 logging.error(f"Not enough space for user data ({self.user_size}). Use larger storage.")
102 raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC), device)
103 self.part_table[self.user_partition]["size"] = self.user_size
104 self.label = Label(self.part_table, ltype)
106 def get_partition_index(self, binary):
107 return self.binaries.get(binary, None)
110 def initialize_parameters(self):
113 class SdFusingTargetAB(SdFusingTarget):
114 def get_partition_index(self, binary):
115 if self.update == 'b':
116 return self.binaries_b.get(binary, None)
117 return self.binaries.get(binary, None)
120 def initialize_parameters(self):
121 logging.debug("Initializing parameterss")
123 for i, p in enumerate(self.part_table):
124 if p['name'] == 'inform':
126 d = "/dev/" + get_partition_device(self.device, n)
128 argv = ['tune2fs', '-O', '^metadata_csum', d]
129 logging.debug(" ".join(argv))
131 stdin=subprocess.DEVNULL,
132 stdout=None, stderr=None)
134 with tempfile.TemporaryDirectory() as mnt:
135 argv = ['mount', '-t', 'ext4', d, mnt]
136 logging.debug(" ".join(argv))
137 proc = subprocess.run(argv,
138 stdin=subprocess.DEVNULL,
139 stdout=None, stderr=None)
140 if proc.returncode != 0:
141 logging.error("Failed to mount {d} in {mnt}")
143 for param, value in self.params:
144 with open(os.path.join(mnt, param), 'w') as f:
145 f.write(value + '\n')
147 logging.debug(" ".join(argv))
149 stdin=subprocess.DEVNULL,
150 stdout=None, stderr=None)
152 class Rpi3(SdFusingTarget,RpiInitParams):
153 long_name = "Raspberry Pi 3"
155 {"size": 64, "fstype": "vfat", "name": "boot", "start": 4, "ptype": "0xe", "bootable": True},
156 {"size": 3072, "fstype": "ext4", "name": "rootfs"},
157 {"size": 1344, "fstype": "ext4", "name": "system-data"},
158 {"size": None, "ptype": "5", "name": "extended", "start": 4484},
159 {"size": None, "fstype": "ext4", "name": "user"},
160 {"size": 32, "fstype": "ext4", "name": "modules"},
161 {"size": 32, "fstype": "ext4", "name": "ramdisk"},
162 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery"},
163 {"size": 8, "fstype": "ext4", "name": "inform"},
164 {"size": 256, "fstype": "ext4", "name": "hal"},
165 {"size": 125, "fstype": "ext4", "name": "reserved2"},
170 "system-data.img": 3,
174 "ramdisk-recovery.img": 8,
177 params = (('reboot-param.bin', ''),)
179 def __init__(self, device, args):
180 self.reserved_space = 12
181 self.user_partition = 4
182 super().__init__(device, "dos")
184 class Rpi4Super(SdFusingTargetAB, RpiInitParams):
185 long_name = "Raspberry Pi 4 w/ super partition"
187 {"size": 64, "fstype": "vfat", "name": "boot_a","start": 4,
188 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
189 {"size": 6656, "fstype": "ext4", "name": "super"},
190 {"size": 1344, "fstype": "ext4", "name": "system-data"},
191 {"size": 36, "fstype": "raw", "name": "none"},
192 {"size": None, "fstype": "ext4", "name": "user"},
193 {"size": 32, "fstype": "ext4", "name": "module_a"},
194 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
195 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
196 {"size": 8, "fstype": "ext4", "name": "inform"},
197 {"size": 0, "fstype": "raw", "name": "empty"},
198 {"size": 64, "fstype": "vfat", "name": "boot_b",
199 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
200 {"size": 0, "fstype": "raw", "name": "empty"},
201 {"size": 32, "fstype": "ext4", "name": "module_b"},
202 {"size": 32, "fstype": "ext4", "name": "ramdisk_b"},
203 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_b"},
204 {"size": 0, "fstype": "raw", "name": "empty"},
205 {"size": 4, "fstype": "ext4", "name": "reserved0"},
206 {"size": 64, "fstype": "ext4", "name": "reserved1"},
207 {"size": 125, "fstype": "ext4", "name": "reserved2"}
212 "system-data.img": 3,
216 "ramdisk-recovery.img": 8,
222 "ramdisk-recovery.img": 15,
224 params = (('reboot-param.bin', 'norm'),
225 ('reboot-param.info', 'norm'),
226 ('partition-ab.info', 'a'),
227 ('partition-ab-cloned.info', '1'),
228 ('upgrade-status.info', '0'),
229 ('partition-a-status.info', 'ok'),
230 ('partition-b-status.info', 'ok'))
232 def __init__(self, device, args):
233 self.reserved_space = 8
234 self.user_partition = 4
235 self.update = args.update
236 super().__init__(device, "gpt")
237 self.with_super = True
238 self.super_alignment = 1048576
240 class Rpi4(SdFusingTargetAB, RpiInitParams):
241 long_name = "Raspberry Pi 4"
243 {"size": 64, "fstype": "vfat", "name": "boot_a", "start": 4,
244 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
245 {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
246 {"size": 1344, "fstype": "ext4", "name": "system-data"},
247 {"size": 36, "fstype": "raw", "name": "none"},
248 {"size": None, "fstype": "ext4", "name": "user"},
249 {"size": 32, "fstype": "ext4", "name": "module_a"},
250 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
251 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
252 {"size": 8, "fstype": "ext4", "name": "inform"},
253 {"size": 256, "fstype": "ext4", "name": "hal_a"},
254 {"size": 64, "fstype": "vfat", "name": "boot_b",
255 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
256 {"size": 3072, "fstype": "ext4", "name": "rootfs_b"},
257 {"size": 32, "fstype": "ext4", "name": "module_b"},
258 {"size": 32, "fstype": "ext4", "name": "ramdisk_b"},
259 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_b"},
260 {"size": 256, "fstype": "ext4", "name": "hal_b"},
261 {"size": 4, "fstype": "ext4", "name": "param"},
262 {"size": 64, "fstype": "ext4", "name": "reserved1"},
263 {"size": 125, "fstype": "ext4", "name": "reserved2"},
268 "system-data.img": 3,
272 "ramdisk-recovery.img": 8,
280 "ramdisk-recovery.img": 15,
283 params = (('reboot-param.bin', 'norm'),
284 ('reboot-param.info', 'norm'),
285 ('partition-ab.info', 'a'),
286 ('partition-ab-cloned.info', '1'),
287 ('upgrade-status.info', '0'),
288 ('partition-a-status.info', 'ok'),
289 ('partition-b-status.info', 'ok'))
291 def __init__(self, device, args):
292 self.reserved_space = 5
293 self.user_partition = 4
294 self.update = args.update
295 super().__init__(device, "gpt")
297 class RV64(SdFusingTarget):
298 long_name = "QEMU RISC-V 64-bit"
300 {"size": 2, "fstype": "raw", "name": "SPL", "start": 4,
301 "ptype": "2E54B353-1271-4842-806F-E436D6AF6985"},
302 {"size": 4, "fstype": "raw", "name": "u-boot",
303 "ptype": "5B193300-FC78-40CD-8002-E86C45580B47"},
304 {"size": 292, "fstype": "vfat", "name": "boot_a",
305 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
306 {"size": 36, "fstype": "raw", "name": "none"},
307 {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
308 {"size": 1344, "fstype": "ext4", "name": "system-data"},
309 {"size": None, "fstype": "ext4", "name": "user"},
310 {"size": 32, "fstype": "ext4", "name": "module_a"},
311 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
312 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
313 {"size": 8, "fstype": "raw", "name": "inform"},
314 {"size": 256, "fstype": "ext4", "name": "hal_a"},
315 {"size": 4, "fstype": "raw", "name": "reserved0"},
316 {"size": 64, "fstype": "raw", "name": "reserved1"},
317 {"size": 125, "fstype": "raw", "name": "reserved2"},
320 "u-boot-spl.bin.normal.out": 1,
325 "system-data.img": 6,
329 "ramdisk-recovery.img": 10,
333 def __init__(self, device, args):
334 self.user_partition = 6
335 self.reserved_space = 5
336 super().__init__(device, 'gpt')
339 long_name = "VisionFive2"
351 def device_size(device):
352 argv = ["sfdisk", "-s", device]
353 logging.debug(" ".join(argv))
354 proc = subprocess.run(argv,
355 stdout=subprocess.PIPE)
356 size = int(proc.stdout.decode('utf-8').strip()) >> 10
357 logging.debug(f"{device} size {size}MiB")
361 proc = subprocess.run(['sfdisk', '-v'],
362 stdout=subprocess.PIPE)
363 version = proc.stdout.decode('utf-8').strip()
364 logging.debug(f"Found {version}")
365 major, minor, patch = (int(x) for x in re.findall('[0-9]+', version))
366 support_delete = False
368 if major < 2 or major == 2 and minor < 26:
369 log.error(f"Your sfdisk {major}.{minor}.{patch} is too old.")
371 elif major == 2 and minor >= 28:
372 support_delete = True
374 return True, support_delete
376 def mkpart(args, target):
378 new, support_delete = check_sfdisk()
381 logging.error('sfdisk too old')
384 #TODO: unmount target devices
386 logging.info("Removing old partitions")
387 argv = ['sfdisk', '--delete', Device]
388 logging.debug(" ".join(argv))
391 logging.info("Removing old partition table")
392 argv = ['dd', 'if=/dev/zero', 'of=' + Device,
393 'bs=512', 'count=32', 'conv=notrunc']
394 logging.debug(" ".join(argv))
397 logging.debug("New partition table:\n" + str(target.label))
398 argv = ['sfdisk', Device]
399 logging.debug(" ".join(argv))
400 proc = subprocess.run(argv,
403 input=str(target.label).encode())
404 if proc.returncode != 0:
405 logging.error(f"Failed to create partition a new table on {Device}")
406 logging.error(f"New partition table:\n" + str(target.label))
409 for i, part in enumerate(target.part_table):
410 d = "/dev/" + get_partition_device(target.device, i+1)
411 if not 'fstype' in part:
412 logging.debug(f"Filesystem not defined for {d}, skipping")
414 logging.debug(f"Formatting {d} as {part['fstype']}")
415 if part['fstype'] == 'vfat':
416 argv = ['mkfs.vfat', '-F', '16', '-n', part['name'], d]
417 logging.debug(" ".join(argv))
418 proc = subprocess.run(argv,
419 stdin=subprocess.DEVNULL,
420 stdout=None, stderr=None)
421 if proc.returncode != 0:
422 log.error(f"Failed to create FAT filesystem on {d}")
424 elif part['fstype'] == 'ext4':
425 argv = ['mkfs.ext4', '-q', '-L', part['name'], d]
426 logging.debug(" ".join(argv))
427 proc = subprocess.run(argv,
428 stdin=subprocess.DEVNULL,
429 stdout=None, stderr=None)
430 if proc.returncode != 0:
431 log.error(f"Failed to create ext4 filesystem on {d}")
433 elif part['fstype'] == 'raw':
435 target.initialize_parameters()
437 def check_args(args):
440 logging.info(f"Device: {args.device}")
442 if args.binaries and len(args.binaries) > 0:
443 logging.info("Fusing binar{}: {}".format("y" if len(args.binaries) == 1 else "ies",
444 ", ".join(args.binaries)))
449 if args.format or Format:
450 response = input(f"{args.device} will be formatted. Is it OK? [y/N] ")
451 if response.lower() in ('y', 'yes'):
454 def check_device(args):
460 if os.path.exists(Device):
461 logging.error(f"Failed to create '{Device}', the file alread exists")
464 argv = ["dd", "if=/dev/zero", f"of={Device}",
465 "conv=sparse", "bs=1M", f"count={args.size}"]
466 logging.debug(" ".join(argv))
467 rc = subprocess.run(argv)
468 if rc.returncode != 0:
469 logging.error("Failed to create the backing file")
472 if os.path.isfile(Device):
476 argv = ["losetup", "--show", "--partscan", "--find", f"{File}"]
477 logging.debug(" ".join(argv))
478 proc = subprocess.run(argv,
479 stdout=subprocess.PIPE)
480 Device = proc.stdout.decode('utf-8').strip()
481 if proc.returncode != 0:
482 logging.error(f"Failed to attach {File} to a loopback device")
484 logging.debug(f"Loop device found: {Device}")
485 atexit.register(lambda: subprocess.run(["losetup", "-d", Device]))
489 if not stat.S_ISBLK(s.st_mode):
491 except FileNotFoundError:
492 logging.error(f"No such device: {Device}")
495 logging.error(f"{Device} is not a block device")
498 def check_partition_format(args, target):
503 logging.info(f"Skip formatting of {Device}".format(Device))
505 logging.info(f"Start formatting of {Device}")
507 logging.info(f"{Device} formatted")
509 def check_ddversion():
510 proc = subprocess.run(["dd", "--version"],
511 stdout=subprocess.PIPE)
512 version = proc.stdout.decode('utf-8').split('\n')[0].strip()
513 logging.debug(f"Found {version}")
514 major, minor = (int(x) for x in re.findall('[0-9]+', version))
516 if major < 8 or major == 8 and minor < 24:
521 def get_partition_device(device, idx):
522 argv = ['lsblk', device, '-o', 'TYPE,KNAME']
523 logging.debug(" ".join(argv))
524 proc = subprocess.run(argv,
525 stdout=subprocess.PIPE)
526 for l in proc.stdout.decode('utf-8').splitlines():
527 match = re.search(f"^part\s+(.*[^0-9]{idx})", l)
532 def do_fuse_file(f, name, target):
533 idx = target.get_partition_index(name)
535 logging.info(f"No partition defined for {name}, skipping.")
537 pdevice = "/dev/" + get_partition_device(Device, idx)
538 argv = ['dd', 'bs=4M',
543 logging.debug(" ".join(argv))
544 proc_dd = subprocess.Popen(argv,
546 stdin=subprocess.PIPE,
547 stdout=None, stderr=None)
548 logging.info(f"Writing {name} to {pdevice}")
549 buf = f.read(4 << 20)
552 proc_dd.stdin.write(buf)
553 buf = f.read(4 << 20)
554 proc_dd.communicate()
558 #TODO: functions with the target argument should probably
559 # be part of some class
561 def get_aligned_size(size, target):
562 return target.super_alignment*int(1+(size-1)/target.super_alignment)
564 def do_fuse_image_super(tmpd, target):
566 metadata_size = 65536
567 metadata_aligned_size = get_aligned_size(metadata_size, target)
569 hal_path = os.path.join(tmpd, 'hal.img')
570 rootfs_path = os.path.join(tmpd, 'rootfs.img')
571 super_path = os.path.join(tmpd, 'super.img')
574 hal_size = os.stat(hal_path).st_size
575 rootfs_size = os.stat(rootfs_path).st_size
576 except FileNotFoundError as e:
577 fn = os.path.split(e.filename)[-1]
578 logging.warning(f"{fn} is missing, skipping super partition image")
581 hal_aligned_size = get_aligned_size(hal_size, target)
582 rootfs_aligned_size = get_aligned_size(rootfs_size, target)
583 group_size = hal_aligned_size + rootfs_aligned_size
584 super_size = metadata_aligned_size + 2 * group_size
586 argv = ["lpmake", "-F",
588 f"--device-size={super_size}",
589 f"--metadata-size={metadata_size}",
590 f"--metadata-slots={metadata_slots}",
591 "-g", f"tizen_a:{group_size}",
592 "-p", f"rootfs_a:none:{rootfs_aligned_size}:tizen_a",
593 "-p", f"hal_a:none:{hal_aligned_size}:tizen_a",
594 "-g", f"tizen_b:{group_size}",
595 "-p", f"rootfs_b:none:{rootfs_aligned_size}:tizen_b",
596 "-p", f"hal_b:none:{hal_aligned_size}:tizen_b",
597 "-i", f"rootfs_a={rootfs_path}",
598 "-i", f"rootfs_b={rootfs_path}",
599 "-i", f"hal_a={hal_path}",
600 "-i", f"hal_b={hal_path}"]
601 logging.debug(" ".join(argv))
602 proc = subprocess.run(argv,
603 stdin=subprocess.DEVNULL,
604 stdout=None, stderr=None)
606 if proc.returncode != 0:
607 logging.error("Failed to create super.img")
608 do_fuse_image(super_path, target)
610 def do_fuse_image_tarball(tarball, tmpd, target):
611 with tarfile.open(tarball) as tf:
613 if target.with_super:
614 if entry.name in('hal.img', 'rootfs.img'):
615 tf.extract(entry, path=tmpd)
617 f = tf.extractfile(entry)
618 do_fuse_file(f, entry.name, target)
620 def do_fuse_image(img, target):
621 with open(img, 'rb') as f:
622 do_fuse_file(f, os.path.basename(img), target)
624 def fuse_image(args, target):
625 if args.binaries is None or len(args.binaries) == 0:
627 with tempfile.TemporaryDirectory() as tmpd:
628 for b in args.binaries:
629 if re.search('\.(tar|tar\.gz|tgz)$', b):
630 do_fuse_image_tarball(b, tmpd, target)
632 fn = os.path.split(b)[-1]
633 if target.with_super and fn in ('rootfs.img', 'hal.img'):
634 shutil.copy(b, os.path.join(tmpd, fn))
636 do_fuse_image(b, target)
638 if target.with_super:
639 do_fuse_image_super(tmpd, target)
641 if __name__ == '__main__':
642 parser = argparse.ArgumentParser(description="For {}, version {}".format(
643 ", ".join([v.long_name for k,v in TARGETS.items()]),
646 parser.add_argument("-b", "--binary", action="append", dest="binaries",
647 help="binary to flash, may be used multiple times")
648 parser.add_argument("--create", action="store_true",
649 help="create the backing file and format the loopback device")
650 parser.add_argument("-d", "--device", required=True,
651 help="device node or loopback backing file")
652 parser.add_argument("--format", action="store_true",
653 help="create new partition table on the target device")
654 parser.add_argument("--log-level", dest="log_level", default="warning",
655 help="Verbosity, possible values: debug, info, warning, "
656 "error, critical (default: warning)")
657 parser.add_argument("--size", type=int, default=8192,
658 help="size of the backing file to create (in MiB)")
659 parser.add_argument("-t", "--target", required=True,
660 help="Target device model. Use `--target list`"
661 " to show supported devices.")
662 parser.add_argument("--update", choices=['a', 'b'], default=None,
663 help="Choose partition set to update: a or b.")
664 parser.add_argument("--version", action="version",
665 version=f"%(prog)s {__version__}")
666 args = parser.parse_args()
668 if args.target == 'list':
669 print("\nSupported devices:\n")
670 for k,v in TARGETS.items():
671 print(f" {k:6} {v.long_name}")
674 conh = ColorStreamHandler(format='%(asctime)s.%(msecs)s %(debuginfo)%(levelname)-8s %(message)s',
675 cformat='%(asctime)s.%(msecs)s %(debuginfo)s%(levelcolor)s%(message)s',
676 datefmt='%Y-%m-%dT%H:%M:%S')
677 log_handlers = [conh]
678 logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
679 handlers=log_handlers,
680 level=args.log_level.upper())
682 logging.debug(" ".join(sys.argv))
686 target = TARGETS[args.target](Device, args)
688 check_partition_format(args, target)
689 fuse_image(args, target)
690 subprocess.run(['sync'],
691 stdin=subprocess.DEVNULL,
692 stdout=None, stderr=None )