3 from functools import reduce
24 class ColorFormatter(logging.Formatter):
26 logging.CRITICAL: "\x1b[35;1m",
27 logging.ERROR: "\x1b[33;1m",
28 logging.WARNING: "\x1b[33;1m",
29 logging.INFO: "\x1b[0m",
30 logging.DEBUG: "\x1b[30;1m",
31 logging.NOTSET: "\x1b[30;1m"
33 def format(self, record):
34 record.levelcolor = self._levelToColor[record.levelno]
35 record.msg = record.msg
36 return logging.Formatter.format(self, record)
38 class ColorStreamHandler(logging.StreamHandler):
39 def __init__(self, stream=None, format=None, datefmt=None, style='%', cformat=None):
40 logging.StreamHandler.__init__(self, stream)
41 if os.isatty(self.stream.fileno()):
42 self.formatter = ColorFormatter(cformat, datefmt, style)
43 self.terminator = "\x1b[0m\n"
45 self.formatter = logging.Formatter(format, datefmt, style)
48 def __init__(self, name, size, start=None, ptype=None, fstype="raw", bootable=False):
53 self.bootable = bootable
57 output.append(f"start={self.start}MiB")
58 if type(self.size) == int and self.size >= 0:
59 output.append(f"size={self.size}MiB")
61 output.append(f"name={self.name}")
62 output.append(f"type={self.ptype}")
64 output.append("bootable")
65 return ", ".join(output) + "\n"
68 def __init__(self, part_table, ltype):
71 ptype = "0FC63DAF-8483-4772-8E79-3D69D8477DE4"
75 for part in part_table:
76 part["ptype"] = part.get("ptype", ptype)
77 self.part_table.append(Partition(**part))
79 output = f"label: {self.ltype}\n"
80 for part in self.part_table:
85 def __init__(self, device, ltype):
86 # TODO: make a copy of a sublcass part_table
87 self.with_super = False
89 total_size = device_size(device)
90 self.user_size = total_size - self.reserved_space - \
91 reduce(lambda x, y: x + (y["size"] or 0), self.part_table, 0)
92 if self.user_size < 100:
93 logging.error(f"Not enough space for user data ({self.user_size}). Use larger storage.")
94 raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC), device)
95 self.part_table[self.user_partition]["size"] = self.user_size
96 self.label = Label(self.part_table, ltype)
98 def get_partition_index(self, binary):
99 return self.binaries.get(binary, None)
102 def initialize_parameters():
105 class SdFusingTargetAB(SdFusingTarget):
106 def get_partition_index(self, binary):
107 if self.update == 'b':
108 return self.binaries_b.get(binary, None)
109 return self.binaries.get(binary, None)
112 def initialize_parameters(self):
113 logging.debug("Initializing parameterss")
115 for i, p in enumerate(self.part_table):
116 if p['name'] == 'inform':
118 d = "/dev/" + get_partition_device(self.device, n)
120 subprocess.run(['tune2fs', '-O', '^metadata_csum', d],
121 stdin=subprocess.DEVNULL,
122 stdout=None, stderr=None)
124 with tempfile.TemporaryDirectory() as mnt:
125 proc = subprocess.run(['mount', '-t', 'ext4', d, mnt],
126 stdin=subprocess.DEVNULL,
127 stdout=None, stderr=None)
128 if proc.returncode != 0:
129 logging.error("Failed to mount {d} in {mnt}")
131 for param, value in self.params:
132 with open(os.path.join(mnt, param), 'w') as f:
133 f.write(value + '\n')
134 subprocess.run(['umount', d],
135 stdin=subprocess.DEVNULL,
136 stdout=None, stderr=None)
138 class Rpi3(SdFusingTarget,RpiInitParams):
139 long_name = "Raspberry Pi 3"
141 {"size": 64, "fstype": "vfat", "name": "boot", "start": 4, "ptype": "0xe", "bootable": True},
142 {"size": 3072, "fstype": "ext4", "name": "rootfs"},
143 {"size": 1344, "fstype": "ext4", "name": "system-data"},
144 {"size": None, "ptype": "5", "name": "extended", "start": 4484},
145 {"size": None, "fstype": "ext4", "name": "user"},
146 {"size": 32, "fstype": "ext4", "name": "modules"},
147 {"size": 32, "fstype": "ext4", "name": "ramdisk"},
148 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery"},
149 {"size": 8, "fstype": "ext4", "name": "inform"},
150 {"size": 256, "fstype": "ext4", "name": "hal"},
151 {"size": 125, "fstype": "ext4", "name": "reserved2"},
156 "system-data.img": 3,
160 "ramdisk-recovery.img": 8,
163 params = (('reboot-param.bin', ''),)
165 def __init__(self, device, args):
166 self.reserved_space = 12
167 self.user_partition = 4
168 super().__init__(device, "dos")
170 class Rpi4Super(SdFusingTargetAB, RpiInitParams):
171 long_name = "Raspberry Pi 4 w/ super partition"
173 {"size": 64, "fstype": "vfat", "name": "boot_a","start": 4,
174 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
175 {"size": 6656, "fstype": "ext4", "name": "super"},
176 {"size": 1344, "fstype": "ext4", "name": "system-data"},
177 {"size": 36, "fstype": "raw", "name": "none"},
178 {"size": None, "fstype": "ext4", "name": "user"},
179 {"size": 32, "fstype": "ext4", "name": "module_a"},
180 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
181 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
182 {"size": 8, "fstype": "ext4", "name": "inform"},
183 {"size": 0, "fstype": "raw", "name": "empty"},
184 {"size": 64, "fstype": "vfat", "name": "boot_b",
185 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
186 {"size": 0, "fstype": "raw", "name": "empty"},
187 {"size": 32, "fstype": "ext4", "name": "module_b"},
188 {"size": 32, "fstype": "ext4", "name": "ramdisk_b"},
189 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_b"},
190 {"size": 0, "fstype": "raw", "name": "empty"},
191 {"size": 4, "fstype": "ext4", "name": "reserved0"},
192 {"size": 64, "fstype": "ext4", "name": "reserved1"},
193 {"size": 125, "fstype": "ext4", "name": "reserved2"}
198 "system-data.img": 3,
202 "ramdisk-recovery.img": 8,
208 "ramdisk-recovery.img": 15,
210 params = (('reboot-param.bin', 'norm'),
211 ('reboot-param.info', 'norm'),
212 ('partition-ab.info', 'a'),
213 ('partition-ab-cloned.info', '1'),
214 ('upgrade-status.info', '0'),
215 ('partition-a-status.info', 'ok'),
216 ('partition-b-status.info', 'ok'))
218 def __init__(self, device, args):
219 self.reserved_space = 8
220 self.user_partition = 4
221 self.update = args.update
222 super().__init__(device, "gpt")
223 self.with_super = True
224 self.super_alignment = 1048576
226 class Rpi4(SdFusingTargetAB, RpiInitParams):
227 long_name = "Raspberry Pi 4"
229 {"size": 64, "fstype": "vfat", "name": "boot_a", "start": 4,
230 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
231 {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
232 {"size": 1344, "fstype": "ext4", "name": "system-data"},
233 {"size": 36, "fstype": "raw", "name": "none"},
234 {"size": None, "fstype": "ext4", "name": "user"},
235 {"size": 32, "fstype": "ext4", "name": "module_a"},
236 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
237 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
238 {"size": 8, "fstype": "ext4", "name": "inform"},
239 {"size": 256, "fstype": "ext4", "name": "hal_a"},
240 {"size": 64, "fstype": "vfat", "name": "boot_b",
241 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
242 {"size": 3072, "fstype": "ext4", "name": "rootfs_b"},
243 {"size": 32, "fstype": "ext4", "name": "module_b"},
244 {"size": 32, "fstype": "ext4", "name": "ramdisk_b"},
245 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_b"},
246 {"size": 256, "fstype": "ext4", "name": "hal_b"},
247 {"size": 4, "fstype": "ext4", "name": "param"},
248 {"size": 64, "fstype": "ext4", "name": "reserved1"},
249 {"size": 125, "fstype": "ext4", "name": "reserved2"},
254 "system-data.img": 3,
258 "ramdisk-recovery.img": 8,
266 "ramdisk-recovery.img": 15,
269 params = (('reboot-param.bin', 'norm'),
270 ('reboot-param.info', 'norm'),
271 ('partition-ab.info', 'a'),
272 ('partition-ab-cloned.info', '1'),
273 ('upgrade-status.info', '0'),
274 ('partition-a-status.info', 'ok'),
275 ('partition-b-status.info', 'ok'))
277 def __init__(self, device, args):
278 self.reserved_space = 5
279 self.user_partition = 4
280 self.update = args.update
281 super().__init__(device, "gpt")
283 class RV64(SdFusingTarget):
284 long_name = "QEMU RISC-V 64-bit"
286 {"size": 2, "fstype": "raw", "name": "SPL", "start": 4,
287 "ptype": "2E54B353-1271-4842-806F-E436D6AF6985"},
288 {"size": 4, "fstype": "raw", "name": "u-boot",
289 "ptype": "5B193300-FC78-40CD-8002-E86C45580B47"},
290 {"size": 292, "fstype": "vfat", "name": "boot_a",
291 "ptype": "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"},
292 {"size": 36, "fstype": "raw", "name": "none"},
293 {"size": 3072, "fstype": "ext4", "name": "rootfs_a"},
294 {"size": 1344, "fstype": "ext4", "name": "system-data"},
295 {"size": None, "fstype": "ext4", "name": "user"},
296 {"size": 32, "fstype": "ext4", "name": "module_a"},
297 {"size": 32, "fstype": "ext4", "name": "ramdisk_a"},
298 {"size": 32, "fstype": "ext4", "name": "ramdisk-recovery_a"},
299 {"size": 8, "fstype": "raw", "name": "inform"},
300 {"size": 256, "fstype": "ext4", "name": "hal_a"},
301 {"size": 4, "fstype": "raw", "name": "reserved0"},
302 {"size": 64, "fstype": "raw", "name": "reserved1"},
303 {"size": 125, "fstype": "raw", "name": "reserved2"},
306 "u-boot-spl.bin.normal.out": 1,
311 "system-data.img": 6,
315 "ramdisk-recovery.img": 10,
319 def __init__(self, device, args):
320 self.user_partition = 6
321 self.reserved_space = 5
322 super().__init__(device, 'gpt')
325 long_name = "VisionFive2"
337 def device_size(device):
338 proc = subprocess.run(["sfdisk", "-s", device],
339 stdout=subprocess.PIPE)
340 size = int(proc.stdout.decode('utf-8').strip()) >> 10
341 logging.debug(f"{device} size {size}MiB")
345 proc = subprocess.run(['sfdisk', '-v'],
346 stdout=subprocess.PIPE)
347 version = proc.stdout.decode('utf-8').strip()
348 logging.debug(f"Found {version}")
349 major, minor, patch = (int(x) for x in re.findall('[0-9]+', version))
350 support_delete = False
352 if major < 2 or major == 2 and minor < 26:
353 log.error(f"Your sfdisk {major}.{minor}.{patch} is too old.")
355 elif major == 2 and minor >= 28:
356 support_delete = True
358 return True, support_delete
360 def mkpart(args, target):
362 new, support_delete = check_sfdisk()
365 logging.error('sfdisk too old')
368 #TODO: unmount target devices
370 logging.debug("Removing old partitions")
371 subprocess.run(['sfdisk', '--delete', Device])
373 logging.debug("Removing old partition table")
374 subprocess.run(['dd', 'if=/dev/zero', 'of=' + Device,
375 'bs=512', 'count=32', 'conv=notrunc'])
377 logging.debug("New partition table:\n" + str(target.label))
378 proc = subprocess.run(['sfdisk', Device],
381 input=str(target.label).encode())
382 if proc.returncode != 0:
383 logging.error(f"Failed to create partition a new table on {Device}")
384 logging.error(f"New partition table:\n" + str(target.label))
387 for i, part in enumerate(target.part_table):
388 d = "/dev/" + get_partition_device(target.device, i+1)
389 if not 'fstype' in part:
390 logging.debug(f"Filesystem not defined for {d}, skipping")
392 logging.debug(f"Formatting {d} as {part['fstype']}")
393 if part['fstype'] == 'vfat':
394 proc = subprocess.run(['mkfs.vfat', '-F', '16', '-n', part['name'], d],
395 stdin=subprocess.DEVNULL,
396 stdout=None, stderr=None)
397 if proc.returncode != 0:
398 log.error(f"Failed to create FAT filesystem on {d}")
400 elif part['fstype'] == 'ext4':
401 proc = subprocess.run(['mkfs.ext4', '-q', '-L', part['name'], d],
402 stdin=subprocess.DEVNULL,
403 stdout=None, stderr=None)
404 if proc.returncode != 0:
405 log.error(f"Failed to create ext4 filesystem on {d}")
407 elif part['fstype'] == 'raw':
409 target.initialize_parameters()
411 def check_args(args):
414 logging.info(f"Device: {args.device}")
416 if args.binaries and len(args.binaries) > 0:
417 logging.info("Fusing binar{}: {}".format("y" if len(args.binaries) == 1 else "ies",
418 ", ".join(args.binaries)))
423 if args.format or Format:
424 response = input(f"{args.device} will be formatted. Is it OK? [y/N] ")
425 if response.lower() in ('y', 'yes'):
428 def check_device(args):
434 if os.path.exists(Device):
435 logging.error(f"Failed to create '{Device}', the file alread exists")
438 logging.debug(f"dd if=/dev/zero of={Device} conv=sparse bs=1M count={args.size}")
439 rc = subprocess.run(["dd", "if=/dev/zero", f"of={Device}",
440 "conv=sparse", "bs=1M", f"count={args.size}"])
441 if rc.returncode != 0:
442 logging.error("Failed to create the backing file")
445 if os.path.isfile(Device):
448 logging.debug(f"losetup --show --partscan --find {File}")
451 proc = subprocess.run(["losetup", "--show", "--partscan",
452 "--find", f"{File}"],
453 stdout=subprocess.PIPE)
454 Device = proc.stdout.decode('utf-8').strip()
455 if proc.returncode != 0:
456 logging.error(f"Failed to attach {File} to a loopback device")
458 logging.debug(f"Loop device found: {Device}")
459 atexit.register(lambda: subprocess.run(["losetup", "-d", Device]))
463 if not stat.S_ISBLK(s.st_mode):
465 except FileNotFoundError:
466 logging.error(f"No such device: {Device}")
469 logging.error(f"{Device} is not a block device")
472 def check_partition_format(args, target):
477 logging.info(f"Skip formatting of {Device}".format(Device))
479 logging.info(f"Start formatting of {Device}")
481 logging.info(f"{Device} formatted")
483 def check_ddversion():
484 proc = subprocess.run(["dd", "--version"],
485 stdout=subprocess.PIPE)
486 version = proc.stdout.decode('utf-8').split('\n')[0].strip()
487 logging.debug(f"Found {version}")
488 major, minor = (int(x) for x in re.findall('[0-9]+', version))
490 if major < 8 or major == 8 and minor < 24:
495 def get_partition_device(device, idx):
496 proc = subprocess.run(['lsblk', device, '-o', 'TYPE,KNAME'],
497 stdout=subprocess.PIPE)
498 for l in proc.stdout.decode('utf-8').splitlines():
499 match = re.search(f"^part\s+(.*[^0-9]{idx})", l)
504 def do_fuse_file(f, name, target):
505 idx = target.get_partition_index(name)
507 logging.info(f"No partition defined for {name}, skipping.")
509 pdevice = "/dev/" + get_partition_device(Device, idx)
510 proc_dd = subprocess.Popen(['dd', 'bs=4M',
516 stdin=subprocess.PIPE,
517 stdout=None, stderr=None)
518 logging.info(f"Writing {name} to {pdevice}")
519 buf = f.read(4 << 20)
522 proc_dd.stdin.write(buf)
523 buf = f.read(4 << 20)
524 proc_dd.communicate()
528 #TODO: functions with the target argument should probably
529 # be part of some class
531 def get_aligned_size(size, target):
532 return target.super_alignment*int(1+(size-1)/target.super_alignment)
534 def do_fuse_image_super(tmpd, target):
536 metadata_size = 65536
537 metadata_aligned_size = get_aligned_size(metadata_size, target)
539 hal_path = os.path.join(tmpd, 'hal.img')
540 rootfs_path = os.path.join(tmpd, 'rootfs.img')
541 super_path = os.path.join(tmpd, 'super.img')
544 hal_size = os.stat(hal_path).st_size
545 rootfs_size = os.stat(rootfs_path).st_size
546 except FileNotFoundError as e:
547 fn = os.path.split(e.filename)[-1]
548 logging.warning(f"{fn} is missing, skipping super partition image")
551 hal_aligned_size = get_aligned_size(hal_size, target)
552 rootfs_aligned_size = get_aligned_size(rootfs_size, target)
553 group_size = hal_aligned_size + rootfs_aligned_size
554 super_size = metadata_aligned_size + 2 * group_size
556 proc = subprocess.run(["lpmake", "-F",
558 f"--device-size={super_size}",
559 f"--metadata-size={metadata_size}",
560 f"--metadata-slots={metadata_slots}",
561 "-g", f"tizen_a:{group_size}",
562 "-p", f"rootfs_a:none:{rootfs_aligned_size}:tizen_a",
563 "-p", f"hal_a:none:{hal_aligned_size}:tizen_a",
564 "-g", f"tizen_b:{group_size}",
565 "-p", f"rootfs_b:none:{rootfs_aligned_size}:tizen_b",
566 "-p", f"hal_b:none:{hal_aligned_size}:tizen_b",
567 "-i", "rootfs_a={root_path}",
568 "-i", "rootfs_b={root_path}",
569 "-i", "hal_a={hal_path}",
570 "-i", "hal_b={hal_path}"],
571 stdin=subprocess.DEVNULL,
572 stdout=None, stderr=None)
574 if proc.returncode != 0:
575 logging.error("Failed to create super.img")
576 do_fuse_image(super_path, target)
578 def do_fuse_image_tarball(tarball, tmpd, target):
579 with tarfile.open(tarball) as tf:
581 if target.with_super:
582 if entry.name in('hal.img', 'rootfs.img'):
583 tf.extract(entry, path=tmpd)
585 f = tf.extractfile(entry)
586 do_fuse_file(f, entry.name, target)
588 def do_fuse_image(img, target):
589 with open(img, 'rb') as f:
590 do_fuse_file(f, img, target)
592 def fuse_image(args, target):
593 if args.binaries is None or len(args.binaries) == 0:
595 with tempfile.TemporaryDirectory() as tmpd:
596 for b in args.binaries:
597 if re.search('\.(tar|tar\.gz|tgz)$', b):
598 do_fuse_image_tarball(b, tmpd, target)
600 fn = os.path.split(b)[-1]
601 if target.with_super and fn in ('rootfs.img', 'hal.img'):
602 shutil.copy(b, os.path.join(tmpd, fn))
604 do_fuse_image(b, target)
606 if target.with_super:
607 do_fuse_image_super(tmpd, target)
609 if __name__ == '__main__':
610 parser = argparse.ArgumentParser(description="For {}, version {}".format(
611 ", ".join([v.long_name for k,v in TARGETS.items()]),
614 parser.add_argument("-b", "--binary", action="append", dest="binaries",
615 help="binary to flash, may be used multiple times")
616 parser.add_argument("--create", action="store_true",
617 help="create the backing file and format the loopback device")
618 parser.add_argument("-d", "--device", required=True,
619 help="device node or loopback backing file")
620 parser.add_argument("--format", action="store_true",
621 help="create new partition table on the target device")
622 parser.add_argument("--log-level", dest="log_level", default="warning",
623 help="Verbosity, possible values: debug, info, warning, "
624 "error, critical (default: warning)")
625 parser.add_argument("--size", type=int, default=8192,
626 help="size of the backing file to create (in MiB)")
627 parser.add_argument("-t", "--target", required=True,
628 help="Target device model. Use `--target list`"
629 " to show supported devices.")
630 parser.add_argument("--update", choices=['a', 'b'], default=None,
631 help="Choose partition set to update: a or b.")
632 parser.add_argument("--version", action="version",
633 version=f"%(prog)s {__version__}")
634 args = parser.parse_args()
636 if args.target == 'list':
637 print("\nSupported devices:\n")
638 for k,v in TARGETS.items():
639 print(f" {k:6} {v.long_name}")
642 conh = ColorStreamHandler(format='%(asctime)s %(levelname)s:%(message)s',
643 cformat='%(asctime)s: %(levelcolor)s%(message)s',
644 datefmt='%Y-%m-%dT%H:%M:%S')
645 log_handlers = [conh]
646 logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s',
647 handlers=log_handlers,
648 level=args.log_level.upper())
653 target = TARGETS[args.target](Device, args)
655 check_partition_format(args, target)
656 fuse_image(args, target)
657 subprocess.run(['sync'],
658 stdin=subprocess.DEVNULL,
659 stdout=None, stderr=None )