--- /dev/null
+#!/usr/bin/env python2
+
+import argparse
+import glob
+# from lxml import etree
+import xml.etree.ElementTree as ET
+import logging
+import os
+import sys
+import os.path
+import shutil
+import subprocess
+import tarfile
+import traceback
+from hashlib import sha256
+import tempfile
+import base64
+import urllib
+
+"""
+ MIT License
+
+ Copyright (c) 2023 Samsung Electronics Co., Ltd
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all
+ copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ SOFTWARE.
+"""
+
+"""
+This program is used to generate ISU Packages from system images.
+The program process *.tar.gz archives with images from <src_dir> and writes to
+<out_dir> ISU Packages as tpk archives, based on the configuration files found
+in (rootfs.img)/etc/isu/<pkg_name>/isu.cfg
+"""
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+log_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+c_handler = logging.StreamHandler()
+c_handler.setFormatter(log_formatter)
+logger.addHandler(c_handler)
+
+class InvalidConfigException(Exception):
+ pass
+
+class ISUNoServiceException(Exception):
+ """No service file"""
+ pass
+
+class MountErrorException(Exception):
+ pass
+
+class UncleanExitException(Exception):
+ pass
+
+class ISUPkgCreateErrorException(Exception):
+ pass
+
+class NoISUConfigDirException(Exception):
+ pass
+
+class Config:
+ def __init__(self):
+ self.name = ""
+ self.files = []
+ self.system_services = []
+ self.user_services = []
+
+class CustomExitStack:
+ def __init__(self):
+ self.contexts = []
+
+ def enter_context(self, ctx):
+ ctx_type = type(ctx)
+ print(ctx_type)
+ enter = ctx.__class__.__enter__
+ exit = ctx.__class__.__exit__
+
+ self.contexts.append((ctx, exit))
+ return enter(ctx)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for (ctx, exit) in self.contexts:
+ exit(ctx, exc_type, exc_val, exc_tb)
+
+class CustomXMLSEC(object):
+ TEMPLATE_HEAD="""
+<Signature Id="DistributorSignature" xmlns="http://www.w3.org/2000/09/xmldsig#">
+ <SignedInfo>
+ <CanonicalizationMethod Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#" />
+ <SignatureMethod Algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256" />
+ """
+ TEMPLATE_TAIL="""
+</SignedInfo>
+ <SignatureValue>
+ </SignatureValue>
+ <KeyInfo>
+ <X509Data>
+ <X509Certificate>
+</X509Certificate>
+ <X509Certificate>
+</X509Certificate>
+ </X509Data>
+ </KeyInfo>
+ <Object Id="prop">
+ <SignatureProperties xmlns:dsp="http://www.w3.org/2009/xmldsig-properties">
+ <SignatureProperty Id="profile" Target="#{}">
+ <dsp:Profile URI="http://www.w3.org/ns/widgets-digsig#profile" />
+ </SignatureProperty>
+ <SignatureProperty Id="role" Target="#{}">
+ <dsp:Role URI="http://www.w3.org/ns/widgets-digsig#{}" />
+ </SignatureProperty>
+ <SignatureProperty Id="identifier" Target="#{}">
+ <dsp:Identifier />
+ </SignatureProperty>
+ </SignatureProperties>
+ </Object>
+</Signature>
+"""
+
+ FILE_TEMPLATE="""<Reference URI="{}">
+<DigestMethod Algorithm="http://www.w3.org/2001/04/xmlenc#sha256"/>
+<DigestValue>{}</DigestValue>
+</Reference>
+"""
+
+ MANIFEST_TEMPLATE="""<?xml version="1.0" encoding="utf-8" standalone="no"?>
+<manifest xmlns="http://tizen.org/ns/packages" api-version="1.0.0" package="org.tizen.isu.{}" res-type="tizen.isu.resource.{}" res-version="{}" version="1.0.0">
+ <label>{}</label>
+ <isu>
+ <name>{}</name>
+ <version>{}</version>
+ </isu>
+ <allowed-package id="org.tizen.*"/>
+</manifest>
+"""
+
+ def __init__(self, rpk_info, work_dir):
+ self.key_author = rpk_info['author_key']
+ self.password_author = rpk_info['author_password']
+ self.key_distributor = rpk_info['dist_key']
+ self.password_distributor = rpk_info['dist_password']
+ self.work_dir = work_dir
+ self.orig_dir = None
+
+ def __enter__(self):
+ self.orig_dir = os.getcwd()
+ os.chdir(self.work_dir)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ os.chdir(self.orig_dir)
+
+ def _hash_file(self, file_path):
+ with open(file_path, 'rb') as file:
+ hash = sha256(file.read()).digest()
+ return base64.b64encode(hash)
+
+ def _add_file(self, file, file_path, escape=True):
+ if file_path.startswith("./"):
+ file_path = file_path[2:]
+ if escape:
+ file_path = urllib.quote_plus(file_path)
+ sha256 = ""
+ file_content = self.FILE_TEMPLATE.format(file_path, sha256)
+ file.write(file_content)
+
+ def _make_tizen_manifest(self):
+ config_file = CustomConfigParser()
+ config_file.read_config("isu.cfg")
+ name = config_file.sections['isu']['name']
+ version = config_file.sections['isu']['version']
+
+ content = self.MANIFEST_TEMPLATE.format(name, name, version, name, name, version)
+
+ with open("tizen-manifest.xml", "w") as file:
+ file.write(content)
+
+ def sign_signature(self, file_name, key, password):
+ subprocess.call(["xmlsec1", "--sign", "--pkcs12", key, "--pwd", password, "--output", file_name, '--enabled-reference-uris', "same-doc,remote", file_name])
+
+ def _gen_signature(self, is_author_sig, key, password):
+ if is_author_sig:
+ sig_file_name = "author-signature.xml"
+ tag = "AuthorSignature"
+ role = "role-author"
+ else:
+ sig_file_name = "signature1.xml"
+ tag = "DistributorSignature"
+ role = "role-distributor"
+
+ with open(sig_file_name, 'w') as file:
+ file.write(self.TEMPLATE_HEAD)
+
+ for (dirpath, dirs, files) in os.walk('.'):
+ for file_name in files:
+ if file_name == sig_file_name:
+ continue
+ self._add_file(file, os.path.join(dirpath, file_name))
+
+ self._add_file(file, "#prop", False)
+ modified_template_tail = self.TEMPLATE_TAIL.format(tag, tag, role, tag)
+ file.write(modified_template_tail)
+
+ self.sign_signature(sig_file_name, key, password)
+
+ def run(self, out_dir, name):
+ self._make_tizen_manifest()
+ self._gen_signature(True, self.key_author, self.password_author)
+ self._gen_signature(False, self.key_distributor, self.password_distributor)
+ logger.debug("Maker RPK archive")
+ dest_file_path = os.path.join(out_dir, name + '.rpk')
+ curr_file_path = shutil.make_archive(dest_file_path, 'zip', self.work_dir)
+ os.rename(curr_file_path, dest_file_path)
+ return dest_file_path
+
+class CustomConfigParser(object):
+ def __init__(self):
+ self.sections = {}
+
+ def read_config(self, cfg_file):
+ with open(cfg_file, 'r') as file:
+ current_section = None
+ for line in file.readlines():
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith("#"):
+ continue
+ if line.startswith("[") and line.endswith("]"):
+ section_name = line[1:-1]
+ if section_name in self.sections:
+ raise Exception("Section {} already exist".format(section_name))
+ self.sections[section_name] = {}
+ current_section = section_name
+ else:
+ splitted = line.split("=")
+ name = splitted[0].strip()
+ if len(splitted) == 2:
+ value = splitted[1].strip()
+ else:
+ value = None
+
+ if not current_section:
+ raise Exception("Expect some section")
+ self.sections[current_section][name] = value
+
+ def write_config(self, cfg_file):
+ with open(cfg_file, 'w') as file:
+ section_num = 0
+ for section_name in self.sections.keys():
+ line = u"[{}]\n".format(section_name)
+ if section_num > 0:
+ file.write("\n")
+ file.write(line)
+ section_num += 1
+ for (name, value) in self.sections[section_name].items():
+ file.write(name)
+ if value is not None:
+ file.write("=")
+ file.write(value)
+ file.write("\n")
+
+class NoMounter:
+ def __init__(self, files_dir, tmp_dir):
+ self._files_dir = files_dir
+ self._tmp_dif = tmp_dir
+
+ def mnt_dir(self):
+ return self._files_dir
+
+class Mounter:
+ def __init__(self, img_path, tmp_dir):
+ self._failed = False
+ self._img_path = img_path
+ self._tmp_dir = tmp_dir
+ without_ext = os.path.splitext(os.path.basename(img_path))[0]
+ self._mnt_dir = os.path.join(tmp_dir, "mnts", without_ext)
+
+ def mount(self):
+ try:
+ if not os.path.isdir(str(self._mnt_dir)):
+ os.makedirs(str(self._mnt_dir))
+ except Exception as e:
+ logger.error("Make {} directory error: {}".format(self._mnt_dir, e))
+ raise e
+ try:
+ os.system("mount \"{}\" \"{}\"".format(self._img_path, self._mnt_dir))
+ except Exception as e:
+ logger.error("Mount error: {}".format(e))
+
+
+ def umount(self):
+ try:
+ os.system("umount \"{}\"".format(self._mnt_dir))
+ except Exception as e:
+ self._failed = True
+ logger.error("Umount error: {}".format(e))
+
+ def failed(self):
+ return self._failed
+
+ def mnt_dir(self):
+ return self._mnt_dir
+
+ def __enter__(self):
+ self.mount()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.umount()
+ os.rmdir(str(self._mnt_dir))
+
+
+class FileMapper:
+ """
+ This is used to convert the paths:
+ /usr/bin/some_file -> ./rootfs_mnt/usr/bin/some_file
+ /opt/some/path -> ./system-data/some/path
+ """
+ def __init__(self, map):
+ self._map = map
+ self.dir_root = None
+ for (k, v) in map.items():
+ if v == "/":
+ self.dir_root = DirTree("", k)
+
+ if self.dir_root is None:
+ raise Exception("fm init: dir root is none")
+
+ for (k, v) in map.items():
+ if v == "/":
+ continue
+ splitted = str(v).split(os.path.sep)[1:]
+ self.dir_root.append(splitted, k)
+
+ def images(self):
+ return [k for k in self._map.keys()]
+
+ def path_to_img(self, path):
+ splitted = str(path).split(os.path.sep)[1:]
+ cur_dir = self.dir_root
+ if cur_dir is None:
+ raise Exception("path_to_img: dir_root is none")
+ last_img = cur_dir.img
+ rest = splitted
+ while splitted:
+ for c in cur_dir.children:
+ if c.name == splitted[0]:
+ cur_dir = c
+ if c.img:
+ last_img = c.img
+ rest = splitted[1:]
+ break
+ splitted = splitted[1:]
+
+ if last_img is None:
+ raise Exception("last_img is none")
+ return (last_img, os.path.sep.join(rest))
+
+ def find(self, path):
+ splitted = str(path).split(os.path.sep)
+ if self.dir_root is None:
+ raise Exception("find: dir_root is none")
+ res = self.dir_root.find_img(splitted)
+ if res:
+ res = (res[0], os.path.sep.join(res[1]))
+ else:
+ raise Exception("find: no path found")
+ return res
+
+ def find_path(self, f_name, mounts):
+ res = self.find_mnt_and_path(f_name, mounts)
+ return os.path.join(res[0], res[1])
+
+ def find_mnt_and_path(self, f_name, mounts):
+ res = self.find(f_name)
+ # The first element in res is image_name
+ return (mounts[res[0]].mnt_dir(), res[1])
+
+ def printt(self):
+ if self.dir_root is None:
+ raise Exception("print: dir_root is none")
+ self.dir_root.printt()
+
+
+class ISUPkgsMakerCtx(object):
+ def __init__(self, images_dir, use_images, tmp_dir, out_dir, key, file_mapper):
+ self.images_dir = images_dir
+ self.use_images = use_images
+ self.tmp_dir = tmp_dir
+ self.out_dir = out_dir
+ self.key = key
+ self.file_mapper = file_mapper
+
+
+class ISUSinglePkgMakerCtx(ISUPkgsMakerCtx):
+ def __init__(self, isudir_path, cfg, mounts, pkgs_ctx):
+ super(ISUSinglePkgMakerCtx, self).__init__(**pkgs_ctx.__dict__)
+ self.isudir_path = isudir_path
+ self.cfg = cfg
+ self.mounts = mounts
+ self.work_dir = os.path.join(self.tmp_dir, "{}_out".format(cfg.name))
+ self.files_dir = os.path.join(self.work_dir, "files")
+ self.pkg_dir = os.path.join(self.work_dir, "pkg")
+ self.system_service_path = os.path.join(self.pkg_dir, ISUSinglePkgMaker.SYSTEM_SERVICES)
+ self.user_service_path = os.path.join(self.pkg_dir, ISUSinglePkgMaker.USER_SERVICES)
+
+
+class Extractor:
+ def __init__(self, ctx):
+ self._ctx = ctx
+ self._images = {}
+
+ def _extract_images_from_file(self, archive_path):
+ logger.info("Open archive {}".format(archive_path))
+ tar_file = tarfile.open(str(archive_path))
+ for f_name in tar_file.getnames():
+ if f_name in self._ctx.file_mapper.images():
+ logger.info("Extract {} from {}".format(f_name, archive_path))
+ tar_file.extract(str(f_name), str(self._ctx.tmp_dir))
+ self._images[f_name] = os.path.join(self._ctx.tmp_dir, f_name)
+
+ def extract_images(self):
+ if not self._ctx.use_images:
+ return None
+ for file in os.listdir(self._ctx.images_dir):
+ file = os.path.join(self._ctx.images_dir, file)
+ if os.path.isfile(file) and tarfile.is_tarfile(str(file)):
+ self._extract_images_from_file(file)
+
+ return self.images()
+
+ def images(self):
+ return self._images
+
+ def __enter__(self):
+ self.extract_images()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for image_path in self.images().values():
+ logger.info("Remove image {}".format(image_path))
+ os.remove(str(image_path))
+
+
+class DirTree:
+ def __init__(self, name, img):
+ self.name = name
+ self.img = img
+ self.children = []
+
+ def append(self, name, img):
+ for c in self.children:
+ if c.name == name[0]:
+ child = c
+ break
+ else:
+ child = DirTree(name[0], None)
+ self.children.append(child)
+
+ if len(name) == 1:
+ child.img = img
+ else:
+ child.append(name[1:], img)
+
+ def find_img(self, splitted):
+ if not splitted:
+ return None
+ if splitted[0] == self.name:
+ for c in self.children:
+ res = c.find_img(splitted[1:])
+ if res:
+ return res
+ if self.img:
+ return (self.img, splitted[1:])
+
+ return None
+
+ def printt(self, prefix=""):
+ print("{}DirTree: {} : {}".format(prefix, self.name, self.img))
+ for c in self.children:
+ c.printt(prefix + " ")
+
+
+def copy_helper(src, dest):
+ shutil.copy2(src, dest)
+ s = os.stat(src)
+ os.chown(dest, s.st_uid, s.st_gid)
+
+def copy2better(src, dest):
+ # Python2 has a problem with xattrs support, so unfortunately I need to copy files
+ # this way
+ os.system("cp -ar \"{}\" \"{}\"".format(src, dest))
+
+class FileCopy:
+ def __init__(self, work_dir, mounts, file_mapper):
+ self._work_dir = work_dir
+ self._mounts = mounts
+ self._file_mapper = file_mapper
+
+ def copy_file(self, src, dest):
+ src = str(src)
+ dest = str(dest)
+ logger.info("Copy from: {} to: {}".format(src, dest))
+ dir_name = os.path.dirname(dest)
+ if not os.path.isdir(dir_name):
+ os.makedirs(dir_name)
+
+ if os.path.isdir(str(src)):
+ copy2better(src, dest)
+ else:
+ copy2better(src, dest)
+
+ def copy_files(self, cfg):
+ for f_name in cfg.files:
+ f_optional=False
+ if f_name.startswith("-"):
+ f_optional=True
+ f_name=f_name[1:]
+ try:
+ f_dir, f_file_path = self._file_mapper.find_mnt_and_path(f_name, self._mounts)
+ if f_dir and f_file_path:
+ for f_path in glob.glob(f_dir + "/" +f_file_path):
+ d_name = os.path.relpath(str(f_path), str(f_dir))
+ self.copy_file(f_path, os.path.join(self._work_dir, os.path.normpath("./{}".format(d_name))))
+ else:
+ raise Exception("No path found for {}".format(f_name))
+ except:
+ if (not f_optional):
+ raise
+
+ def work_dir(self):
+ return self._work_dir
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+class XMLModelConfig:
+ def __init__(self, cfg_path):
+ self._xml_root = ET.parse(str(cfg_path)).getroot()
+
+ def get_value(self, key_name):
+ element = self._xml_root.find('./platform/key[@name="{}"]'.format(key_name))
+ if element is None:
+ return ""
+ return str(element.text)
+
+
+class TizenBuildConfig:
+ def __init__(self, cfg_path):
+ self.data = self._read_cfg(cfg_path)
+
+ def _read_cfg(self, cfg_path):
+ data = {}
+ with open(str(cfg_path), 'r') as tizen_build_file:
+ for line in tizen_build_file.readlines():
+ line = line.strip()
+ if not line:
+ continue
+ splitted = line.split('=')
+ if len(splitted) != 2:
+ continue
+ data[splitted[0]] = splitted[1]
+ return data
+
+ def get_value(self, key):
+ return self.data.get(key, '').strip('"')
+
+
+class ISUSinglePkgMaker:
+ SYSTEM_SERVICES = "system-services"
+ USER_SERVICES = "user-services"
+ CHECKSUM_FILE = "checksum.sha256"
+ OUTPUT_IMAGE_NAME = "rootfs.img"
+ ISU_CFG_FILE = "isu.cfg"
+
+ def __init__(self, isudir_path, cfg, mounts, isu_pkgs_ctx):
+ self._ctx = ISUSinglePkgMakerCtx(isudir_path, cfg, mounts, isu_pkgs_ctx)
+
+ if os.path.exists(str(self._ctx.work_dir)):
+ raise Exception("Work dir: {} already exists".format(self._ctx.work_dir))
+
+ os.makedirs(str(self._ctx.files_dir))
+ os.makedirs(str(self._ctx.pkg_dir))
+
+
+ def _make_squashfs(self):
+ out_img_name = ISUSinglePkgMaker.OUTPUT_IMAGE_NAME
+ out_path = os.path.join(self._ctx.pkg_dir, out_img_name)
+ if os.path.exists(out_path):
+ raise FileExistsError("Cannot create image - file {}".format(out_path))
+
+ logger.info("Make squashfs {} -> {}".format(self._ctx.files_dir, out_path))
+ process = subprocess.Popen(["mksquashfs", str(self._ctx.files_dir), str(out_path)],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ if process.returncode != 0:
+ logger.error("mksquashfs error {}:".format(process.returncode))
+ logger.error("stdout: {}\nstderr: {}".format(stdout.decode('utf8'), stderr.decode('utf8')))
+ raise Exception("Make squashfs error {}".format(stderr.decode('utf8')))
+
+
+ def _make_base_dirs(self):
+ logger.info("Make base directories in {}".format(self._ctx.files_dir))
+ os.makedirs(self._ctx.system_service_path)
+ os.makedirs(self._ctx.user_service_path)
+
+ def _copy_service_file(self, service_file, services_dir, destination_path):
+ service_path = os.path.join(self._ctx.isudir_path, services_dir, service_file)
+ if not os.path.exists(service_path):
+ raise ISUNoServiceException(service_file)
+
+ logger.info("Copy service: {} to {}".format(service_path, destination_path))
+ shutil.copy(str(service_path), str(destination_path))
+
+ def _copy_service_files(self):
+ for service_file in self._ctx.cfg.system_services:
+ self._copy_service_file(service_file, self.SYSTEM_SERVICES, self._ctx.system_service_path)
+
+ for service_file in self._ctx.cfg.user_services:
+ self._copy_service_file(service_file, self.USER_SERVICES, self._ctx.user_service_path)
+
+ def _hash_of_file(self, algo, file_path):
+ READ_SIZE = 1024*4
+ hash = algo()
+ with open(file_path, mode='rb') as f:
+ while True:
+ data = f.read(READ_SIZE)
+ if not data:
+ break
+ hash.update(data)
+
+ return hash.hexdigest()
+
+ def _make_checksum(self):
+ sum_path = os.path.join(self._ctx.pkg_dir, self.CHECKSUM_FILE)
+
+ dirs = [
+ self._ctx.pkg_dir,
+ os.path.join(self._ctx.pkg_dir, self.SYSTEM_SERVICES),
+ os.path.join(self._ctx.pkg_dir, self.USER_SERVICES),
+ ]
+ with open(sum_path, mode='w') as file:
+ for dir in dirs:
+ if not (os.path.exists(dir) and os.path.isdir(dir)):
+ continue
+ for f_path in os.listdir(dir):
+ f_path = os.path.join(dir, f_path)
+ if os.path.isfile(f_path) and os.path.basename(f_path) != self.CHECKSUM_FILE:
+ hash = self._hash_of_file(sha256, f_path)
+ line = u"{} {}\n".format(hash ,os.path.relpath(f_path, start=str(self._ctx.pkg_dir)))
+ file.write(line)
+
+ def _zip_pkg(self):
+ out_name = os.path.join(self._ctx.out_dir, self._ctx.cfg.name)
+ out_name_zip = out_name + '.zip'
+ logger.info("Zip {} -> {}".format(self._ctx.pkg_dir, out_name_zip))
+ shutil.make_archive(str(out_name), 'zip', str(self._ctx.pkg_dir))
+ return out_name_zip
+
+ def _rpk_pkg(self, rpk_info):
+ with CustomXMLSEC(rpk_info, self._ctx.pkg_dir) as rpk_maker:
+ return rpk_maker.run(self._ctx.out_dir, self._ctx.cfg.name)
+
+
+ def _make_isu_cfg(self):
+ tb_path = self._ctx.file_mapper.find_path('/etc/tizen-build.conf', self._ctx.mounts)
+ tizen_build = TizenBuildConfig(tb_path)
+
+ mc_path = self._ctx.file_mapper.find_path('/etc/config/model-config.xml', self._ctx.mounts)
+ model_config = XMLModelConfig(mc_path)
+
+ config_file = CustomConfigParser()
+ config_file.read_config(os.path.join(self._ctx.isudir_path, self.ISU_CFG_FILE))
+ config_file.sections['info'] = {}
+ config_file.sections['info']['tz_build_release_name'] = tizen_build.get_value('TZ_BUILD_RELEASE_NAME')
+ config_file.sections['info']['tz_build_arch'] = tizen_build.get_value('TZ_BUILD_ARCH')
+ config_file.sections['info']['tz_build_date'] = tizen_build.get_value('TZ_BUILD_DATE')
+ config_file.sections['info']['model_name'] = model_config.get_value('tizen.org/system/model_name')
+ config_file.sections['info']['manufacturer'] = model_config.get_value('tizen.org/system/manufacturer')
+ config_file.sections['info']['device_type'] = model_config.get_value('tizen.org/system/device_type')
+
+ out_file = os.path.join(self._ctx.pkg_dir, self.ISU_CFG_FILE)
+ config_file.write_config(out_file)
+
+ def make_pkg(self, rpk_info):
+ self._make_base_dirs()
+ self._copy_service_files()
+
+ with FileCopy(self._ctx.files_dir, self._ctx.mounts, self._ctx.file_mapper) as fc:
+ fc.copy_files(self._ctx.cfg)
+ self._make_squashfs()
+ self._make_isu_cfg()
+ self._make_checksum()
+
+ if rpk_info:
+ pkg_result_name = self._rpk_pkg(rpk_info)
+ else:
+ pkg_result_name = self._zip_pkg()
+
+ logger.info("SUCCESS: ISU Package {} created.".format(pkg_result_name))
+
+ def _clear(self):
+ logger.info("Remove {}".format(self._ctx.work_dir))
+ shutil.rmtree(str(self._ctx.work_dir))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._clear()
+
+
+class ISUPkgsMaker:
+ def __init__(self, images_dir, use_images, tmp_dir, out_dir, key, file_mapper):
+ self._ctx = ISUPkgsMakerCtx(images_dir, use_images, tmp_dir, out_dir, key, file_mapper)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ def _split(self, value):
+ if value:
+ splitted = filter(lambda x: x, value.split(" "))
+ return list(map(lambda x: x.strip(), splitted))
+ return []
+
+ def load_config(self, cfg_file):
+ config_file = CustomConfigParser()
+ config_file.read_config(cfg_file)
+
+ cfg = Config()
+
+ for section in ["isu", "files"]:
+ if section not in config_file.sections:
+ raise InvalidConfigException("No \"{}\" section in the config file.".format(section))
+
+ for key in ["name", "version"]:
+ if key not in config_file.sections["isu"]:
+ raise InvalidConfigException("No \"{}\" key in the \"isu\" section.".format(key))
+
+ cfg.name = config_file.sections['isu']['name']
+
+ system_service = config_file.sections['isu'].get('system_service')
+ cfg.system_services = list(map(lambda x: x, self._split(system_service)))
+ user_service = config_file.sections['isu'].get('user_service')
+ cfg.user_services = list(map(lambda x: x, self._split(user_service)))
+ cfg.files = [f_name for f_name in config_file.sections['files'].keys()]
+
+ logger.info("Config {} ({}) loaded.".format(cfg_file, cfg.name))
+
+ return cfg
+
+ def run(self, rpk_info, exit_on_any_error=False):
+ some_errors = False
+ with Extractor(self._ctx) as extractor:
+ with CustomExitStack() as stack:
+ if self._ctx.use_images:
+ mounts = {img_name: stack.enter_context(Mounter(img_path, self._ctx.tmp_dir))
+ for (img_name, img_path) in extractor.images().items()}
+
+ if any(m.failed() for m in mounts.values()):
+ logger.error("Some error during mounts")
+ raise MountErrorException()
+ else:
+ mounts = {'rootfs.img': NoMounter(self._ctx.images_dir, self._ctx.tmp_dir) }
+
+ isu_on_rootfs = os.path.join(mounts['rootfs.img'].mnt_dir(), 'etc/isu')
+ logger.info("Processing configs in {}".format(isu_on_rootfs))
+ if not os.path.isdir(isu_on_rootfs):
+ logger.warning("No ISU config dir on the image.")
+ raise NoISUConfigDirException()
+ for isudir in os.listdir(str(isu_on_rootfs)):
+ try:
+ isudir_path = os.path.join(isu_on_rootfs, isudir)
+ isucfg_path = os.path.join(isudir_path, ISUSinglePkgMaker.ISU_CFG_FILE)
+ if os.path.isdir(isudir_path) and os.path.isfile(isucfg_path):
+ logger.info("\n=========\nProcess dir: {}\n=========".format(isudir))
+ cfg = self.load_config(isucfg_path)
+ try:
+ with ISUSinglePkgMaker(isudir_path, cfg, mounts, self._ctx) as isu_maker:
+ isu_maker.make_pkg(rpk_info)
+ except ISUPkgCreateErrorException:
+ some_errors = True
+ if exit_on_any_error:
+ break
+ except ISUNoServiceException as error:
+ logger.warning("No service file ({}) found for {} package. Perhaps the file was deleted by security-config tests.".format(error, cfg.name))
+ some_errors = True
+ if exit_on_any_error:
+ break
+ except InvalidConfigException as error:
+ logger.error("Load isu.cfg error: {}".format(error))
+ some_errors = True
+ if exit_on_any_error:
+ break
+ logger.info("End of processing")
+ if some_errors:
+ raise UncleanExitException()
+
+
+def load_map(map_path):
+ map_file = CustomConfigParser()
+ map_file.read_config(map_path)
+ map = dict([(k, v) for (k, v) in map_file.sections['map'].items()])
+ return FileMapper(map)
+
+
+def main():
+ logger.info("Start")
+
+ parser = argparse.ArgumentParser(description="ISU Pkgs maker")
+ parser.add_argument('--src', '-s', type=str, required=True,
+ help="Directory with images archive")
+ parser.add_argument('--use_images', '-u', action='store_true', required=False,
+ help="Make packages from *tar.gz images")
+ parser.add_argument('--out', '-o', type=str, required=True,
+ help="Output directory")
+ parser.add_argument('--rpk', '-r', action='store_true', required=False,
+ help="Generate RPK files")
+ parser.add_argument('--rpk-author-key', type=str, required=False,
+ default=os.getenv('AUTHOR_KEY'),
+ help="Author key")
+ parser.add_argument('--rpk-author-key-pass', type=str, required=False,
+ default=os.getenv('AUTHOR_KEY_PASS'),
+ help="Author key password")
+ parser.add_argument('--rpk-dist-key', type=str, required=False,
+ default=os.getenv('DIST_KEY'),
+ help="Distributor key")
+ parser.add_argument('--rpk-dist-key-pass', type=str, required=False,
+ default=os.getenv('DIST_KEY_PASS'),
+ help="Distributor key password")
+ parser.add_argument('--map', '-m', type=str, required=False,
+ help="File with image<->path mapping")
+ parser.add_argument('--key', '-k', type=str, required=False,
+ help="Signing key")
+ parser.add_argument('--exit_on_any_error', '-e', action='store_true',
+ required=False, help="Terminate after any error")
+
+ args = parser.parse_args()
+
+ if args.use_images and not args.map:
+ logger.error("You need to specify the map file")
+ return os.EX_USAGE
+
+ if args.map:
+ map = load_map(args.map)
+ else:
+ map = FileMapper({'rootfs.img': '/'})
+
+ if args.rpk:
+ if not os.path.isfile(args.rpk_author_key):
+ logger.error("Author key {} not exist".format(args.rpk_author_key))
+ return os.EX_NOTFOUND
+
+ if not os.path.isfile(args.rpk_dist_key):
+ logger.error("Distribution key {} not exist".format(args.rpk_dist_key))
+ return os.EX_NOTFOUND
+
+ rpk_info = {
+ 'author_key': os.path.realpath(args.rpk_author_key),
+ 'author_password': args.rpk_author_key_pass,
+ 'dist_key': os.path.realpath(args.rpk_dist_key),
+ 'dist_password': args.rpk_dist_key_pass,
+ }
+ else:
+ rpk_info = None
+
+ tmp_dir = tempfile.mkdtemp()
+ try:
+ with ISUPkgsMaker(os.path.realpath(args.src),
+ args.use_images,
+ tmp_dir,
+ os.path.realpath(args.out),
+ args.key,
+ map) as isu_pkgs_maker:
+ isu_pkgs_maker.run(rpk_info, args.exit_on_any_error)
+ except NoISUConfigDirException:
+ return os.EX_OK
+ except MountErrorException:
+ return os.EX_IOERR
+ except UncleanExitException:
+ return os.EX_SOFTWARE
+ except Exception as err:
+ logger.error("Run error: {}".format(err))
+ traceback.print_exc()
+ return os.EX_SOFTWARE
+ finally:
+ shutil.rmtree(tmp_dir)
+
+ return os.EX_OK
+
+if __name__ == '__main__':
+ code = main()
+ sys.exit(code)