# Contributors:
# - S-Core Co., Ltd
+import logging
+from lxml import etree
+from tic.utils.error import TICError
+from tic.utils.rpmmisc import meetRequireVersion, compare_ver
+from tic.utils.rpmmisc import Dependency
+from tic.config import configmgr
+
+DEFAULT_PROFILE = 'EMPTY'
+
def analyze_dependency(pkg_group):
def dep_dfs(pkg_id):
+ #logger = logging.getLogger(__name__)
if pkg_list[pkg_id].get('dependency') is not None:
return pkg_list[pkg_id].get('dependency')
min_num[pkg_id] = min(min_num[pkg_id], min_num[req_id])
else:
+ #TODO: package does not exist
+ #logger.warning('%s does not exist in repo', req['name'])
pass
- #TODO: package doest not exist
- #print('def_dfs::', req['name'], 'is not exist (in dep_analysis)')
if min_num[pkg_id] == visited[pkg_id]:
# scc (string connected components)
scc_list[p_id] = scc_num[0]
pkg_list[p_id]['dependency'] = dep_list
if pkg_id == p_id:
- break;
+ break
def analyze():
for pkg_id in range(len(pkg_list)):
return analyze()
+def get_installed_packages(recipe, repoinfo, pkg_group):
+ logger = logging.getLogger(__name__)
+
+ def _select_rpm_from_files(fileList, require):
+ if not fileList or not require:
+ return None
+ # 1. sort
+ fileList.sort()
+ # 2-1. Choose the default rpm or the selected rpm
+ for fname in fileList:
+ file_info = pkg_dict.get(fname)
+ if fname in pkg_set or selected[file_info['id']] >= 1:
+ return file_info
+ # 2-2. Choose the rpm (it does not conflitcs)
+ for fname in fileList:
+ file_info = pkg_dict.get(fname)
+ if not _check_conflicts(file_info):
+ return file_info
+ return pkg_dict.get(fileList[0])
+
+ def _select_rpm(capability, require, recommends=None):
+ provide_list = []
+ # 1. Choose the rpm included in version from provides
+ if require.get('ver'):
+ for provide in capability:
+ ver_data = provide['data']
+ # If there is no capability version, use version of package
+ if not ver_data.get('ver'):
+ ver_data = pkg_dict.get(provide['name']).get('version')
+ if meetRequireVersion(require, ver_data):
+ provide_list.append(provide)
+ else:
+ provide_list = capability
+
+ # error case (the rpm does not exist)
+ if not provide_list:
+ return None
+
+ if len(provide_list) == 1:
+ return pkg_dict.get(provide_list[0].get('name'))
+
+ # 2 Select one of the rpms by priority
+ # 2-1. Choose the default rpm or the selected rpm
+ for pro in provide_list:
+ provide_info = pkg_dict.get(pro.get('name'))
+ if provide_info['name'] in pkg_set or selected[provide_info['id']] >= 1:
+ return provide_info
+
+ # 2-2. Choose the defualt profile
+ # TODO: should be supported
+ # if provide_info['profile'] == DEFAULT_PROFILE:
+ # return provide_info
+
+ # 2.3. Choose recommends pkg (pkg-name or capability)
+ if recommends:
+ for pro in provide_list:
+ provide_info = pkg_dict.get(pro.get('name'))
+ for reco in recommends:
+ # 2-3 Case. select a pkg that is named
+ if reco['name'] == provide_info['name']:
+ return provide_info
+ # 2-3 Case. select a pkg that provides
+ for cap in provide_info.get('provides'):
+ if reco['name'] == cap['name']:
+ return provide_info
+ # 2-4. Select the latest version of rpm
+ max_ver = None
+ for pro in provide_list:
+ if not _check_conflicts(pkg_dict.get(pro.get('name'))):
+ if max_ver:
+ cap_info = pro.get('data')
+ ret = compare_ver(max_ver.get('data'), cap_info)
+ if ret == 0: # equals
+ # string compare (compare string lexicographically using ASCII value)
+ if max_ver.get('name') > pro.get('name'):
+ max_ver = pro
+ elif ret == -1: # greater than max_ver
+ max_ver = pro
+ else:
+ max_ver = pro
+
+ # all of capability pkg are in conflict
+ if max_ver is None:
+ return pkg_dict.get(provide_list[0]['name'])
+
+ return pkg_dict.get(max_ver.get('name'))
+
+ def _create_reference(pkg1, pkg2):
+ # duplicate check
+ if pkg1.get('forward') and pkg2['name'] in pkg1.get('forward'):
+ return
+
+ if pkg1.get('forward'):
+ pkg1['forward'].append(pkg2['name'])
+ else:
+ pkg1['forward'] = [pkg2['name']]
+
+ if pkg2.get('backward'):
+ pkg2['backward'].append(pkg1['name'])
+ else:
+ pkg2['backward'] = [pkg1['name']]
+
+ def _make_scc(pkg_id):
+ scc_num[0] += 1
+ scc_list = []
+ # stack is not empty
+ while stack:
+ pkg = stack.pop()
+ scc_id[pkg['id']] = scc_num[0]
+ scc_list.append(pkg)
+ if pkg_id == pkg['id']:
+ break
+
+ # circular dependency
+ if len(scc_list) > 1:
+ group_num[0] += 1
+ group_names = []
+ # add group id
+ for pkg in scc_list:
+ pkg['group'] = group_num[0]
+ group_names.append(pkg['name'])
+
+ # { group_id = [ pkg_name_1, pkg_name_2 ], ... }
+ groups[group_num[0]] = group_names
+
+ def _check_conflicts(pkg_info):
+ # 1. Check whether node can be installed
+ if pkg_info.get('provides') is not None:
+ for pro in pkg_info['provides']:
+ if pro['name'] in conflicts:
+ for con in conflicts[pro['name']]:
+ if not con['data'].get('ver') or meetRequireVersion(con['data'], pro):
+ #package conflict
+ logger.info('Conflict %s and %s' % (pkg_info['name'], con['name']))
+ return True
+
+ #2. If the conflict package defined by node is installed
+ if pkg_info.get('conflicts') is not None:
+ for con in pkg_info['conflicts']:
+ if con['name'] in provides:
+ for pro in provides[con['name']]:
+ pkg = pkg_dict[pro['name']]
+ if selected[pkg['id']] != 0:
+ if not con.get('ver') or meetRequireVersion(con, pkg['version']):
+ logger.info('Conflict %s and %s' % (pkg_info['name'], pkg['name']))
+ return True
+ elif con['name'] in pkg_dict:
+ pkg = pkg_dict[con['name']]
+ if selected[pkg['id']] != 0:
+ if not con.get('ver') or meetRequireVersion(con, pkg['version']):
+ logger.info('Conflict %s and %s' % (pkg_info['name'], pkg['name']))
+ return True
+ return False
+
+ def _add_conflicts(pkg_info):
+ if pkg_info.get('conflicts'):
+ for con in pkg_info['conflicts']:
+ if not con['name'] in conflicts:
+ conflicts[con['name']] = []
+ conflicts[con['name']].append(dict(name=pkg_info['name'], data=con))
+ logger.info('%s add conflict package : %s' % (pkg_info['name'], con['name']))
+
+ def _analyze_dep(pkg_info):
+ if not pkg_info:
+ return
+
+ pkg_id = pkg_info['id']
+ number[0] += 1
+ selected[pkg_id] = number[0]
+ min_num[pkg_id] = number[0]
+ stack.append(pkg_info)
+
+ dep_rpms = set([pkg_info['name']])
+
+ # check for conflicts
+ if _check_conflicts(pkg_info):
+ progress['status'] = False
+ stack.pop()
+ return
+
+ # add rpms into conflicts table.
+ _add_conflicts(pkg_info)
+
+ # Installation dependency analysis of rpm
+ for dep_tag in [Dependency.REQUIRES, Dependency.RECOMMENDS]:
+ if pkg_info.get(dep_tag):
+ for req in pkg_info.get(dep_tag):
+ choose = None
+ # Find dependency rpm based on capability/files
+ if req['name'] in provides:
+ # capability : [provide_rpm_1, provide_rpm_2, ... ]
+ # Select the rpm that meets the condition (version)
+ if dep_tag == Dependency.REQUIRES:
+ choose = _select_rpm(provides[req['name']], req, pkg_info.get('recommends'))
+ else:
+ choose = _select_rpm(provides[req['name']], req)
+ elif req['name'] in files:
+ choose = _select_rpm_from_files(files[req['name']], req)
+ #choose = pkg_dict.get(files[req['name']][0])
+ elif req['name'] in pkg_dict:
+ choose = pkg_dict.get(req['name'])
+
+ if dep_tag == Dependency.RECOMMENDS:
+ # A Recommends B: B is installed when A is installed and B has no conflicts.
+ if not choose or _check_conflicts(choose):
+ logger.info('%s recommended by %s is ignored for selection (Conflict)' % (req['name'], pkg_info['name']))
+ continue
+
+ if choose:
+ if selected[choose['id']] == 0:
+ dep_set = _analyze_dep(choose)
+ if not progress['status']:
+ break
+
+ dep_rpms.update(dep_set)
+ min_num[pkg_id] = min(min_num[pkg_id], min_num[choose['id']])
+ elif scc_id[choose['id']] == 0:
+ # cross edge that can not be ignored
+ min_num[pkg_id] = min(min_num[pkg_id], min_num[choose['id']])
+
+ # add forward/backward reference
+ _create_reference(pkg_info, choose)
+ else:
+ # the rpm does not exists
+ logger.info(configmgr.message['dependency_not_exist'] % (req['name'], pkg_info['name']))
+ progress['status'] = False
+ break
+
+ if not progress['status']:
+ break
+ if min_num[pkg_id] == selected[pkg_id]:
+ # scc(strong connected components)
+ _make_scc(pkg_id)
+
+ return dep_rpms
+
+ def _check_circular_dep(node):
+ g_id = node.get('group')
+ g_pkg_list = groups[g_id]
+ g_dict = {}
+
+ # Set dict for group
+ for pkgname in g_pkg_list:
+ g_dict[pkgname] = None
+
+ for pkgname in g_pkg_list:
+ pkg = pkg_dict[pkgname]
+ # the node is selfchecked (the root node ignores selfchecked)
+ if stack[0]['id'] != pkg['id'] and pkg['selfChecked']:
+ return False
+ # check backward ref.
+ for bname in pkg.get('backward'):
+ # If node is Referenced by another node (Not a node in the group),
+ # unable to uncheck group nodes
+ if not bname in g_dict:
+ return False
+
+ # init visited dict
+ group_visited[g_id] = {}
+ # delete backward reference of group node
+ for pkgname in g_pkg_list:
+ pkg = pkg_dict[pkgname]
+ pkg['backward'] = None;
+ group_visited[g_id][pkg['name']] = -1
+ return True
+ def _delete_conflictdata(node):
+ if node.get('conflicts'):
+ for con in node.get('conflicts'):
+ if con['name'] in conflicts:
+ con_list = conflicts[con['name']]
+ for i in range(len(con_list)):
+ if con_list[i]['name'] == node['name']:
+ del con_list[i]
+ break;
+
+ def _remove_reference(parent, node):
+ if parent is not None:
+ # remove backward reference (parent)
+ if node.get('backward'):
+ for i in range(len(node['backward'])):
+ if node['backward'][i] == parent['name']:
+ del node['backward'][i]
+ break
+ # selfCheck node do not remove
+ if pkg_info.get('selfChecked'):
+ return
+
+ if node.get('backward'):
+ if node.get('group') is None or _check_circular_dep(node):
+ return
+
+ # the selected node is uncheckable
+ if node.get('group') and group_visited[node['group']]:
+ group_visited[node['group']][node['name']] = 1
+
+ # if selected node has forward references
+ if node.get('forward'):
+ for fname in node.get('forward'):
+ fnode = pkg_dict[fname]
+
+ # If pkg has a circular dependency and is unchekcable,
+ # circular dep. pkgs can only be visited once
+ gvisite = group_visited.get(fnode.get('group'))
+ if gvisite and gvisite[fnode['name']] == 1:
+ continue
+ _remove_reference(node, fnode)
+ node['forward'] = None
+ node['group'] = None
+ # delete conflict data from conflicts dict
+ _delete_conflictdata(node)
+
+ # recipe/repo
+ if not recipe or not repoinfo:
+ return []
+
+ group_set = set([])
+ pkg_set = set([])
+
+ if recipe['Recipe'].get('Groups'):
+ group_set.update(recipe['Recipe'].get('Groups'))
+ if recipe['Recipe'].get('ExtraPackages'):
+ pkg_set.update(recipe['Recipe'].get('ExtraPackages'))
+
+ # parsing group.xml
+ if group_set:
+ for repo in repoinfo:
+ if repo.get('comps'):
+ try:
+ tree = etree.parse(repo.get('comps'))
+ root = tree.getroot()
+ except etree.XMLSyntaxError as e:
+ logger.info(e)
+ raise TICError(configmgr.message['xml_parse_error'] % ('group.xml', repo['baseurl']))
+
+ # Convert groups to packages
+ for elm in root.findall('group'):
+ group_name = elm.find('name').text
+ if group_name in group_set:
+ pkglist = elm.find('packagelist')
+ plist = []
+ for pkgreq in pkglist.findall('packagereq'):
+ plist.append(pkgreq.text)
+ pkg_set.update(set(plist))
+ group_set.discard(group_name);
+
+ pkg_dict = pkg_group.get('pkg_dict')
+ provides = pkg_group.get('provides')
+ files = pkg_group.get('files')
+ groups = pkg_group.get('groups')
+ conflicts = pkg_group.get('conflicts')
+
+ stack = []
+ number = [0] # for pkg count
+ scc_num = [0] # for scc count
+ group_num = [0] # for group count
+ scc_id = [0] * len(pkg_dict)
+ min_num = [0] * len(pkg_dict)
+ selected = [0] * len(pkg_dict)
+ group_visited = None
+ install_rpm = set([])
+ progress = dict(status=True, message=None)
+
+ for pkg_name in pkg_set:
+ progress['status'] = True
+ pkg_info = pkg_dict.get(pkg_name)
+
+ if not pkg_info:
+ if provides.get(pkg_name):
+ pro = provides.get(pkg_name)[0]
+ pkg_info = pkg_dict.get(pro['name'])
+ else:
+ logger.info(configmgr.message['package_not_exist'] % pkg_name)
+ continue
+
+ pkg_info['selfChecked'] = True
+ if selected[pkg_info['id']] == 0:
+ dep_set = _analyze_dep(pkg_info)
+ if progress['status']:
+ install_rpm.update(dep_set)
+ else:
+ # delete forward/backward reference
+ group_visited = {}
+ _remove_reference(None, pkg_info)
+ return list(install_rpm)