--- /dev/null
+#!/usr/bin/env python3
+
+"""
+ Copyright (c) 2022 Samsung Electronics Co., Ltd.
+ SPDX-License-Identifier: MIT
+"""
+
+import os
+import glob
+import re
+import shutil
+import subprocess
+import threading
+import time
+import datetime
+import hashlib
+import logging
+import sys
+import base64
+import pickle
+from enum import Enum
+from urllib.parse import urlparse, urljoin
+import requests
+import yaml
+from bs4 import BeautifulSoup as bs
+
+logFormatter = logging.Formatter("%(asctime)s [%(levelname)-5.5s] %(message)s")
+log = logging.getLogger(__file__)
+
+log.setLevel(logging.DEBUG)
+fileHandler = logging.FileHandler("mass-delta-builder.log")
+fileHandler.setFormatter(logFormatter)
+consoleHandler = logging.StreamHandler()
+consoleHandler.setFormatter(logFormatter)
+
+log.addHandler(fileHandler)
+log.addHandler(consoleHandler)
+
+IMAGES_DIR='/images/'
+RESULT_DIR='/result/'
+
+FAIL_FILE = f"{IMAGES_DIR}/failed"
+MAX_FAIL = 3
+FAIL_CLEANUP_DAYS = 90
+UPGRATE_TOOLS_DIR='/upgrade-tools/'
+
+
+class TotaDirType(Enum):
+ Old = 1
+ New = 2
+ Result = 3
+
+
+class DeltaType(Enum):
+ all = 1
+ kernel = 2
+ system = 3
+ full = 4
+
+ @staticmethod
+ def parse(string):
+ if string == 'all':
+ return DeltaType.all
+ if string == 'kernel':
+ return DeltaType.kernel
+ if string == 'system':
+ return DeltaType.system
+ if string == 'full':
+ return DeltaType.full
+
+ def __str__(self):
+ if self == DeltaType.all:
+ return 'all'
+ if self == DeltaType.kernel:
+ return 'kernel'
+ if self == DeltaType.system:
+ return 'system'
+ if self == DeltaType.full:
+ return 'full'
+
+
+class ImageType(Enum):
+ boot = 1
+ system = 2
+
+
+class UrlProvider:
+ def __init__(self, targets, profiles, repositories, latest=None, config=None):
+ self.targets = targets
+ self.profiles = profiles
+ self.latest = latest
+ self.repositories = repositories
+ self.snapshots = {}
+ self.config = config
+ self._update_snapshots()
+ log.info("Snapshots:")
+ log.info(self.snapshots)
+
+ def _update_snapshots(self):
+ for repository in self.repositories:
+ self.snapshots[repository] = self._update_snapshots_for(self.repositories[repository])
+
+ def _update_snapshots_for(self, repository):
+ proxies = self.config.get_proxy_dict_for_url(repository)
+ r = requests.request('GET', repository, proxies=proxies)
+ if not r.ok:
+ raise IOError
+
+ s = bs(r.content, 'html.parser')
+ snapshots = []
+ for tag in s.findAll('a'):
+ href = tag.attrs['href'].rstrip('/')
+ if href and not href.startswith('.'):
+ if href in ['latest', 'reference']:
+ continue
+ snapshots.append(href)
+
+ snapshots.sort(reverse=True)
+ return snapshots
+
+ def _get_path_for(self, target, arch, profile, image_type):
+ if image_type == ImageType.boot:
+ return self.targets[target][arch]
+ if image_type == ImageType.system:
+ return self.profiles[profile][arch]
+ raise Exception(f"Unknown image type: {image_type}")
+
+ def _find_tar_gz(self, base_url):
+ proxies = self.config.get_proxy_dict_for_url(base_url)
+ r = requests.request('GET', base_url, proxies=proxies)
+ if not r.ok:
+ log.error(f"Bad URL: {base_url}")
+ raise IOError
+
+ s = bs(r.content, 'html.parser')
+ for tag in s.findAll('a'):
+ if 'tar.gz' in tag.attrs['href']:
+ return tag.attrs['href']
+
+ return None
+
+ def _get_url_for(self, delta_order, snapshot, release_type, image_type):
+ path = self._get_path_for(delta_order.target, delta_order.arch,
+ delta_order.profile, image_type)
+ # url = f'http://download.tizen.org/snapshots/tizen/unified/{snapshot}/images/standard/{path}'
+ url = urljoin(self.repositories[delta_order.repository],
+ f"{snapshot}/images/{release_type}/{path}/")
+ tar_gz_url = self._find_tar_gz(url)
+ if not tar_gz_url:
+ raise Exception(f"No tar.gz for {url}")
+
+ return urljoin(url, tar_gz_url)
+
+ def get_urls_for(self, delta_order, snapshot, release_type):
+ links = []
+ if delta_order.delta_type in [DeltaType.all, DeltaType.system, DeltaType.full]:
+ links.append(self._get_url_for(delta_order, snapshot, release_type, ImageType.system))
+
+ if delta_order.delta_type in [DeltaType.all, DeltaType.kernel, DeltaType.full]:
+ links.append(self._get_url_for(delta_order, snapshot, release_type, ImageType.boot))
+
+ return links
+
+
+class DeltaOrder:
+ def __init__(self, target, profile, arch, delta_id, board, delta_type, target_cfg_name, snapshot, latest_snapshot, repository, url_provider, dest_dir, release_type, delimiter):
+ self.target = target
+ self.profile = profile
+ self.arch = arch
+ self.delta_id = delta_id
+ self.board = board
+ self.delta_type = delta_type
+ self._target_cfg_name = target_cfg_name
+ self.snapshot = snapshot
+ self.latest_snapshot = latest_snapshot
+ self.url_provider = url_provider
+ self.source_url = None
+ self.target_url = None
+ self.invalid_order = False
+ self.repository = repository
+ self.dest_dir = dest_dir.lstrip('/')
+ self.release_type = release_type
+ self.delimiter = delimiter
+
+ def init(self):
+ self.get_source_url()
+ self.get_target_url()
+
+ def __repr__(self):
+ return self.get_delta_name()
+ # return f"{self.target} {self.profile} {self.arch} {self.delta_id} {self.board} {self.snapshot} {self.latest_snapshot}: {self.get_source_url()} {self.get_target_url()}"
+
+ def __str__(self):
+ return self.get_delta_name()
+ # return f"{self.target} {self.profile} {self.arch} {self.delta_id} {self.board} {self.snapshot} {self.latest_snapshot}: {self.get_source_url()} {self.get_target_url()}"
+
+ def target_cfg_name(self):
+ if self._target_cfg_name:
+ return self._target_cfg_name
+
+ return f"{self.target}-{self.profile}"
+
+ def get_source_url(self):
+ if self.delta_type == DeltaType.full:
+ self.source_url = []
+ else:
+ try:
+ if not self.source_url:
+ self.source_url = self.url_provider.get_urls_for(self, self.snapshot, self.release_type)
+ except Exception as error:
+ log.error("No URL for source: {}: {}".format(self.snapshot, error))
+ self.source_url = []
+ self.invalid_order = True
+
+ return self.source_url
+
+ def _get_tar_gz_name(self, url):
+ if url:
+ return os.path.basename(urlparse(url).path)
+
+ return None
+
+ def get_source_tar_gz(self):
+ return [self._get_tar_gz_name(file) for file in self.get_source_url()]
+
+ def get_target_url(self):
+ try:
+ if not self.target_url:
+ self.target_url = self.url_provider.get_urls_for(self, self.latest_snapshot, self.release_type)
+ except Exception as error:
+ log.error("No URL for target: {}: {}".format(self.latest_snapshot, error))
+ self.target_url = []
+ self.invalid_order = True
+
+ return self.target_url
+
+ def get_target_tar_gz(self):
+ return [self._get_tar_gz_name(file) for file in self.get_target_url()]
+
+ def _version_from_snapshot(self, snapshot):
+ if not snapshot:
+ return "0.0"
+ result = re.search(r'_([0-9]+(:?\.[0-9]+))', snapshot)
+ return result.group(1)
+
+ def get_arch(self):
+ if self.arch in ['armv7l', 'arm32']:
+ return 'arm32'
+ if self.arch in ['aarch64', 'arm64']:
+ return 'arm64'
+
+ raise Exception(f"Unknown arch: {self.arch}")
+ """
+ <ID><board><imageType><architecture><sourceImage>@<taregetImage>
+ IoT_Headless-rpi4-all-arm32-202132131.1@321321321.1.tar.gz
+ """
+ def get_delta_name(self):
+ target_version = self._version_from_snapshot(self.latest_snapshot)
+ if self.delta_type == DeltaType.full:
+ source_version = '0.0' # for full delta we use '0.0' as source version
+ addon = f"{DeltaType.all}" # for full delta we use 'all' string
+ else:
+ source_version = self._version_from_snapshot(self.snapshot)
+ addon = f"{self.delta_type}"
+ return f"{self.delta_id}{self.delimiter}{self.board}{self.delimiter}{addon}{self.delimiter}{self.get_arch()}{self.delimiter}{source_version}@{target_version}"
+
+ def get_delta_name_tar_gz(self):
+ return f"{self.get_delta_name()}.tar.gz"
+
+class Config:
+ def __init__(self, config_name):
+ self.config_name = config_name
+ self._read_config()
+
+ def _read_config(self):
+ with open(self.config_name, "r") as stream:
+ data = yaml.safe_load(stream)
+ self.sets = data['sets']
+ self.targets = data['targets']
+ self.profiles = data['profiles']
+ self.repositories = data['repositories']
+ self.proxies = data.get('proxies', {})
+
+ def get_proxy_for_url(self, url):
+ for repo in self.repositories:
+ if url.startswith(self.repositories[repo]):
+ return self.proxies.get(repo, None)
+ return None
+
+ def get_proxy_env_for_url(self, url):
+ proxy = self.get_proxy_for_url(url)
+ result = None
+ if proxy:
+ result = {'HTTP_PROXY': f'http://{proxy}',
+ 'HTTPS_PROXY': f'https://{proxy}',
+ 'http_proxy': f'http://{proxy}',
+ 'https_proxy': f'https://{proxy}'}
+ return result
+
+ def get_proxy_dict_for_url(self, url):
+ proxy = self.get_proxy_for_url(url)
+ result = None
+ if proxy:
+ result = {'http': f'http://{proxy}',
+ 'https': f'https://{proxy}'}
+ return result
+
+
+class OrderMaker:
+ def __init__(self, sets, url_provider, latest):
+ self.url_provider = url_provider
+ self.sets = sets
+ self.latest = latest
+
+ def _gen_order_for_full_delta_snapshots(self, one_set, arch, profile):
+ orders = []
+ snapshots = self.url_provider.snapshots[one_set['repository']]
+ for offset in range(0, 4):
+
+ if offset >= len(snapshots):
+ break
+
+ if self._is_target_too_old(snapshots[offset]):
+ log.info(f"Too old target snapshot for full delta: {snapshots[offset]} ( < {self.latest} )")
+ continue
+
+ delta_order = DeltaOrder(one_set['target'],
+ profile,
+ arch,
+ one_set['id'],
+ one_set['board'],
+ DeltaType.parse(one_set['image_type']),
+ one_set.get('target_cfg_name'),
+ None,
+ snapshots[offset],
+ one_set['repository'],
+ self.url_provider,
+ one_set.get('dest_dir',''),
+ one_set.get('release_type', 'standard'),
+ one_set.get('delimiter', '-'))
+ orders.append(delta_order)
+ return orders
+
+
+ def _is_target_too_old(self, target_snapshot):
+ if not self.latest:
+ return False # we have no limits
+
+ snap_date = OrderMaker.get_date(target_snapshot)
+
+ return snap_date < self.latest
+
+ @staticmethod
+ def get_date(string_with_date):
+
+ date_re = re.compile(".*([0-9]{8})[.][0-9]+$")
+ snap_date = None
+ match = date_re.match(string_with_date)
+
+ if not match:
+ raise Exception(f"String {string_with_date} does not look like it contains a date")
+
+ snap_date = match.group(1)
+ return datetime.datetime.strptime(snap_date, "%Y%m%d")
+
+ @staticmethod
+ def is_too_much_interval(max_interval, date_from, date_to):
+ interval = (date_to - date_from).days
+ log.info(f"interval: {interval}")
+ return interval > max_interval # magic :D
+
+ def _gen_order_for_common_delta_snapshots(self, one_set, arch, profile):
+ orders = []
+ snapshots = self.url_provider.snapshots[one_set['repository']]
+ for offset in range(0, 4):
+
+ if offset >= len(snapshots):
+ break
+
+ if self._is_target_too_old(snapshots[offset]):
+ log.info(f"Too old target snapshot for delta: {snapshots[offset]} ( < {self.latest} )")
+ continue
+
+ for snapshot in snapshots[offset+1:]:
+ if self.is_too_much_interval(one_set.get('max_interval', 7),
+ OrderMaker.get_date(snapshot),
+ OrderMaker.get_date(snapshots[offset])):
+ log.info(f"{snapshot} is too old for {snapshots[offset]}")
+ continue
+ delta_order = DeltaOrder(one_set['target'],
+ profile,
+ arch,
+ one_set['id'],
+ one_set['board'],
+ DeltaType.parse(one_set['image_type']),
+ one_set.get('target_cfg_name'),
+ snapshot,
+ snapshots[offset],
+ one_set['repository'],
+ self.url_provider,
+ one_set.get('dest_dir',''),
+ one_set.get('release_type', 'standard'),
+ one_set.get('delimiter', '-'))
+ orders.append(delta_order)
+ return orders
+
+
+ def _gen_order_for_all_snapshots(self, one_set, arch, profile):
+ delta_type = DeltaType.parse(one_set['image_type'])
+ if delta_type == DeltaType.full:
+ return self._gen_order_for_full_delta_snapshots(one_set, arch, profile)
+ return self._gen_order_for_common_delta_snapshots(one_set, arch, profile)
+
+
+ def prepare_orders(self):
+ all_orders = []
+ for one_set in self.sets:
+ if one_set.get('disabled', False):
+ continue
+ for arch in one_set['arch']:
+ if one_set['image_type'] in ['all', 'system', 'full']:
+ for profile in one_set['profiles']:
+ orders = self._gen_order_for_all_snapshots(one_set, arch, profile)
+ all_orders += orders
+ else:
+ orders = self._gen_order_for_all_snapshots(one_set, arch, None)
+ all_orders += orders
+ return all_orders
+
+
+# To be fair, you have to have a really high IQ to understand Dict and Morty.
+class PickleDict:
+ def __init__(self, file):
+ self.file = file
+ try:
+ with open(self.file, 'rb') as file:
+ self.data = pickle.load(file)
+ except:
+ # This is not really that critical: let's handle errors by just assuming the dict is empty.
+ self.data = {}
+
+ def __getitem__(self, key):
+ return self.data[key]
+
+ def __setitem__(self, key, value):
+ self.data[key] = value
+ with open(self.file, 'wb') as file:
+ pickle.dump(self.data, file)
+
+ def __delitem__(self, key):
+ del self.data[key]
+ with open(self.file, 'wb') as file:
+ pickle.dump(self.data, file)
+
+ def keys(self):
+ return set(self.data)
+
+
+class FailCounter:
+ def __init__(self):
+ self.data = PickleDict(FAIL_FILE)
+ for key in self.data.keys():
+ if OrderMaker.is_too_much_interval(
+ FAIL_CLEANUP_DAYS,
+ self._name_to_date(key),
+ datetime.datetime.now()):
+ del self.data[key]
+
+ def ok(self, order):
+ try:
+ return self.data[order.get_delta_name()] < MAX_FAIL
+ except KeyError:
+ return True
+
+ def bump(self, order):
+ try:
+ self.data[order.get_delta_name()] += 1
+ except KeyError:
+ self.data[order.get_delta_name()] = 1
+
+ @staticmethod
+ def _name_to_date(name):
+ version = name[(name.rindex("@") + 1):(name.rindex(""))]
+ return OrderMaker.get_date(version)
+
+
+class Downloader:
+ def __init__(self, cache_dir, config):
+ self.cache_dir = cache_dir
+ self.download_list = []
+ self.stop = False
+ self.lock = threading.RLock()
+ self.on_download = None
+ self.sha256_url_cache = {}
+ self.sha256_file_cache = {}
+ self.worker = threading.Thread(target=self._thread_run, args=())
+ self.config = config
+
+ def register_on_download(self, func):
+ self.on_download = func
+
+ def add_download(self, url):
+ with self.lock:
+ if url not in self.download_list:
+ self.download_list.append(url)
+
+ def _get_sha256_url_from_cache(self, url):
+ with self.lock:
+ if url in self.sha256_url_cache:
+ return self.sha256_url_cache[url]
+ return None
+
+ def _get_sha256_for_url(self, url):
+ sha256_from_cache = self._get_sha256_url_from_cache(url)
+ if sha256_from_cache:
+ return sha256_from_cache
+
+ sha_url = urljoin(url, 'SHA256SUMS')
+ file_name = os.path.basename(urlparse(url).path)
+ proxies = self.config.get_proxy_dict_for_url(sha_url)
+ r = requests.request('GET', sha_url, proxies=proxies)
+ if not r.ok:
+ log.error(f"No SHA256SUM?: {sha_url}: {r.status_code}")
+ raise IOError
+
+ for line in r.content.decode('utf8').split('\n'):
+ if file_name in line:
+ sha256_from_net = line.split(' ')[0]
+ with self.lock:
+ self.sha256_url_cache[url] = sha256_from_net
+ return sha256_from_net
+ raise IOError
+
+ def _get_sha256_for_file(self, file_name):
+ if file_name in self.sha256_file_cache:
+ return self.sha256_file_cache[file_name]
+
+ path = os.path.join(self.cache_dir, file_name)
+ if not os.path.isfile(path):
+ log.debug(f"File {path} does not exist")
+ return False
+
+ sha256 = hashlib.sha256()
+ CHUNK = 64*1024
+ with open(path, "rb") as f:
+ while True:
+ data = f.read(CHUNK)
+ if data:
+ sha256.update(data)
+ else:
+ break
+
+ sha256_file = sha256.hexdigest()
+ return sha256_file
+
+
+ def _sha256_check(self, url):
+ # log.debug(f"Checking SHA256 for: {url}")
+ file_name = os.path.basename(urlparse(url).path)
+
+ sha256_sum = self._get_sha256_for_url(url)
+ sha256_file = self._get_sha256_for_file(file_name)
+
+ sha256_ok = sha256_file == sha256_sum
+ if sha256_ok:
+ self.sha256_file_cache[file_name] = sha256_file
+ # log.debug(f"SHA match {url}: {sha256_file} {file_name}: {sha256_sum}")
+ else:
+ log.debug(f"SHA does not match {url}: {sha256_sum} {file_name}: {sha256_file}")
+
+ return sha256_ok
+
+ def sha256_check(self, url):
+ return self._sha256_check(url)
+
+ def _download(self, url):
+ file_name = os.path.basename(urlparse(url).path)
+ download_needed = True
+
+ file_path = os.path.join(self.cache_dir, file_name)
+ if os.path.isfile(file_path):
+ if not self._sha256_check(url):
+ log.info(f"SHA does not match for {file_name}")
+ os.remove(file_path)
+ else:
+ download_needed = False
+
+ if download_needed:
+ log.info(f"Downloading: {url}")
+ args = ['aria2c', '-c', '-x5', '-d', self.cache_dir, url]
+ log.debug(f"download command: {args}")
+ envs = self.config.get_proxy_env_for_url(url)
+ subprocess.run(args, stdout=subprocess.PIPE, env=envs)
+ if self._sha256_check(url):
+ if self.on_download:
+ self.on_download(url)
+ else:
+ raise Exception(f"SHA does not meet for {file_name} after download")
+
+
+ else:
+ log.info(f"Unnecessary download, file exists: {file_name}")
+
+ def _thread_run(self):
+ while not self.stop:
+ current_url = None
+ with self.lock:
+ if self.download_list:
+ log.debug(f"Files to download: {len(self.download_list)}")
+ current_url = self.download_list.pop(0)
+ else:
+ log.info("No more files to download")
+ return
+
+ if not current_url:
+ time.sleep(1)
+ continue
+
+ try:
+ self._download(current_url)
+ except Exception as error:
+ log.error(f"Download {current_url} error: {error}")
+ with self.lock:
+ self.download_list.append(current_url)
+
+ def run(self):
+ self.worker.start()
+
+
+class DeltaBuilder:
+ def __init__(self, downloader):
+ self.downloader = downloader
+ self.orders = []
+ self.stop = False
+ self.lock = threading.RLock()
+ self.sha_ok_cache = []
+ self.downloader.register_on_download(self.on_file_downloaded)
+ self.worker = threading.Thread(target=self._thread_run, args=())
+ self.time_history = {}
+ self.failed = FailCounter()
+
+ def add_order(self, order):
+
+ if order.invalid_order:
+ log.error(f"Order {order.get_delta_name()} can not be prepared (URL problem?)")
+ raise ValueError
+
+ if self.is_delta_already_exist(order):
+ log.debug(f"Delta {order.get_delta_name()} exists - ignore")
+ elif not self.failed.ok(order):
+ log.debug(f"Delta {order.get_delta_name()} failed too many times - ignore")
+ else:
+ log.info(f"New order: {order.get_delta_name()}")
+ for url in order.get_source_url() + order.get_target_url():
+ self.downloader.add_download(url)
+ with self.lock:
+ self.orders.append(order)
+
+ def on_file_downloaded(self, url):
+ with self.lock:
+ old_list = self.orders
+ self.orders = []
+
+ i = 0
+ while i < len(old_list):
+ if url in old_list[i].get_source_url() + old_list[i].get_target_url():
+ log.debug(f"Order {old_list[i]} promotion after download: {url}")
+ self.orders.append(old_list[i])
+ old_list.pop(i)
+ else:
+ i += 1
+
+ self.orders += old_list
+
+ def is_order_complete(self, order):
+ urls = order.get_source_url() + order.get_target_url()
+
+ for url in urls:
+ if url in self.sha_ok_cache:
+ continue
+
+ if self.downloader.sha256_check(url):
+ self.sha_ok_cache.append(url)
+ else:
+ log.debug(f"missing file from: {url}")
+ return False
+
+ return bool(urls)
+
+ def is_delta_already_exist(self, order):
+ directory = os.path.join(RESULT_DIR, order.dest_dir)
+ file_path = os.path.join(directory, order.get_delta_name_tar_gz())
+ return os.path.exists(directory) and os.path.isfile(file_path)
+
+ def clear_dir(self, directory):
+ files = glob.glob(os.path.join(directory,"*"))
+ for file in files:
+ if os.path.isfile(file) or os.path.islink(file):
+ os.unlink(file)
+ elif os.path.isdir(file):
+ shutil.rmtree(file)
+
+ def copy_image(self, file_name, target_cfg_name, tota_dir_type):
+ source = os.path.join(IMAGES_DIR, file_name)
+ tar_gz_dir = self.get_tota_dir(target_cfg_name, tota_dir_type)
+ target = os.path.join(tar_gz_dir, file_name)
+ log.debug(f"COPY IMAGE: {source} -> {target}")
+ try:
+ shutil.copyfile(source, target)
+ except Exception as error:
+ log.error("Copy file error: {}".format(error))
+ raise error
+
+ def place_images_in_appropriate_dirs(self, order):
+ if order.delta_type != DeltaType.full:
+ tar_gz_dir = self.get_tota_dir(order.target_cfg_name(), TotaDirType.Old)
+ self.clear_dir(tar_gz_dir)
+ for file_name in order.get_source_tar_gz():
+ self.copy_image(file_name, order.target_cfg_name(), TotaDirType.Old)
+
+ tar_gz_dir = self.get_tota_dir(order.target_cfg_name(), TotaDirType.New)
+ self.clear_dir(tar_gz_dir)
+ for file_name in order.get_target_tar_gz():
+ self.copy_image(file_name, order.target_cfg_name(), TotaDirType.New)
+
+ def find_delta_tar(self, target_cfg_name):
+ result_dir = self.get_tota_dir(target_cfg_name, TotaDirType.Result)
+ for root, dirs, files in os.walk(result_dir):
+ if 'delta.tar' in files:
+ return os.path.join(result_dir, root, 'delta.tar')
+ return None
+
+ def get_tota_dir(self, target_cfg_name, tota_dir_type):
+ BASE_PATH = os.path.join(UPGRATE_TOOLS_DIR, "mk_delta")
+ if tota_dir_type == TotaDirType.Old:
+ return os.path.join(BASE_PATH, target_cfg_name, "data", "old_tar")
+
+ if tota_dir_type == TotaDirType.New:
+ return os.path.join(BASE_PATH, target_cfg_name, "data", "new_tar")
+
+ if tota_dir_type == TotaDirType.Result:
+ return os.path.join(BASE_PATH, target_cfg_name, "result", "")
+
+ raise Exception("Unknown tota dir type")
+
+ def place_delta_in_result_dir(self, order):
+ delta_path = self.find_delta_tar(order.target_cfg_name())
+ directory = os.path.join(RESULT_DIR, order.dest_dir)
+ if not os.path.exists(directory):
+ log.info(f"Directory {directory} not exist - creating")
+ os.makedirs(directory)
+ result_path = os.path.join(directory, f"{order.get_delta_name()}.tar")
+ shutil.move(delta_path, result_path)
+ subprocess.run(["gzip", result_path], check=True)
+ self.clear_dir(self.get_tota_dir(order.target_cfg_name(), TotaDirType.Result))
+
+ def _md5_base64_sum(self, file_path):
+ md5sum = hashlib.md5()
+ with open(file_path, "rb") as f:
+ for buff in iter(lambda: f.read(4096), b""):
+ md5sum.update(buff)
+ md5digest = md5sum.digest()
+ md5_base64 = base64.b64encode(md5digest)
+ return md5_base64.decode('utf8')
+
+ def calculate_checksum(self, order):
+ dest_dir = os.path.join(RESULT_DIR, order.dest_dir)
+ delta_path = os.path.join(dest_dir, f"{order.get_delta_name()}.tar.gz")
+ if not os.path.exists(delta_path):
+ log.error(f"Calculate checksum fail - delta {delta_path} not exist")
+
+ md5sum = self._md5_base64_sum(delta_path)
+ md5file = os.path.join(dest_dir, f"{order.get_delta_name()}.checksum.MD5.base64")
+ with open(md5file, "w", encoding="utf-8") as f:
+ content = f"{md5sum}\n"
+ f.write(content)
+
+ log.info(f"MD5({delta_path}) = {md5sum}")
+
+ def build_delta(self, order):
+ log.info(f"BUILDING BUILDING BUILDING: {order}")
+ # subprocess.run(args, stdout=subprocess.PIPE, check=True)
+
+ try:
+ self.place_images_in_appropriate_dirs(order)
+
+ build_type = "full" if order.delta_type == DeltaType.full else "common"
+ args = [os.path.join(UPGRATE_TOOLS_DIR, "scripts/delta-generation.sh"),
+ UPGRATE_TOOLS_DIR,
+ order.target_cfg_name(),
+ build_type]
+ process = subprocess.run(args, check=True)
+
+ self.place_delta_in_result_dir(order)
+ self.calculate_checksum(order)
+
+ # os.system("touch {}".format(os.path.join(RESULT_DIR, order.get_delta_name_tar_gz())))
+ except BaseException as err:
+ log.error(f"Generating failed: {err} (also see older logs)")
+ # Note that the dict must contain the key: see _thread_run.
+ self.failed.bump(order)
+
+ def print_times(self, last_order):
+ summ = 0
+ for time_entry in self.time_history.values():
+ summ += time_entry
+
+ average_time = summ / len(self.time_history)
+
+ remaining_time = 0
+ with self.lock:
+ remaining_time = average_time*len(self.orders)
+
+ last_time_d = datetime.timedelta(seconds=self.time_history[last_order.get_delta_name()])
+ average_time_d = datetime.timedelta(seconds=average_time)
+ remaining_time_d = datetime.timedelta(seconds=remaining_time)
+ end_time_d = datetime.datetime.now() + remaining_time_d
+ log.info(f"Last build time: {last_time_d} Average build time: {average_time_d} Remaining time: {remaining_time_d} Expected end: {end_time_d}")
+
+
+ def _thread_run(self):
+ while True:
+ current_order = None
+
+ with self.lock:
+ if self.orders:
+ log.debug(f"Some orders... ({len(self.orders)})")
+ for order in self.orders:
+ log.debug(f" {order}")
+ current_order = self.orders.pop(0)
+ else:
+ log.info("No more orders")
+ return
+
+ if not current_order:
+ time.sleep(1)
+ continue
+
+ if self.is_delta_already_exist(current_order):
+ log.info(f"Delta {current_order.get_delta_name()} already exists.")
+ elif not self.failed.ok(current_order):
+ log.info(f"Delta {current_order.get_delta_name()} failed too many times, skipping.")
+ elif self.is_order_complete(current_order):
+ log.info(f"Order {current_order.get_delta_name()} is complete. Building delta file.")
+ try:
+ start = time.time()
+ self.build_delta(current_order)
+ end = time.time()
+ elapsed = end - start
+ self.time_history[current_order.get_delta_name()] = elapsed
+ self.print_times(current_order)
+ except Exception as error:
+ log.error(f"Error during delta {current_order} build: {error}. Remove from the queue.")
+ # with self.lock:
+ # self.orders.append(current_order)
+ else:
+ log.info(f"Order {current_order.get_delta_name()} is not ready")
+ with self.lock:
+ self.orders.append(current_order)
+
+ time.sleep(1)
+
+ def run(self):
+ self.worker.start()
+
+
+def init_order(order):
+ try:
+ order.init()
+ except Exception as error:
+ log.error(f"Order init error: {error}")
+
+
+def main():
+ if len(sys.argv) > 1:
+ print(f"argv: {sys.argv}")
+ latest = OrderMaker.get_date(sys.argv[1])
+ else:
+ latest = None
+ config = Config("cfg/config.yaml")
+ url_provider = UrlProvider(config.targets, config.profiles, config.repositories, latest, config)
+ order_maker = OrderMaker(config.sets, url_provider, latest)
+
+ orders = order_maker.prepare_orders()
+
+ threads = []
+ for order in orders:
+ thread = threading.Thread(target=init_order, args=(order,))
+ threads.append(thread)
+ thread.start()
+
+ for thread in threads:
+ thread.join()
+
+ for order in orders:
+ print(order)
+
+ downloader = Downloader(IMAGES_DIR, config)
+
+ delta_builder = DeltaBuilder(downloader)
+ for order in orders:
+ try:
+ delta_builder.add_order(order)
+ except ValueError:
+ pass
+
+ downloader.run()
+ delta_builder.run()
+
+ delta_builder.worker.join()
+ downloader.worker.join()
+ print("Bye bye")
+
+
+if __name__ == '__main__':
+ main()