2 # Copyright (c) 2000 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
5 # @author Chulwoo Shin <cw1.shin@samsung.com>
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
26 from tic.dependency import analyze_dependency
27 from tic.parser.repo_parser import RepodataParser
28 from tic.repo import get_repodata_from_repos
30 current_milli_time = lambda: int(round(time.time() * 1000))
31 CWD = os.path.dirname(__file__) or '.'
32 TEST_REPODATA_LOC=os.path.join(CWD, 'dependency_fixtures')
33 DEFAULT_CACHEDIR='/var/tmp/tic-core/cached'
36 # return unittest.makeSuite(DependencyTest, ['testsun1', 'testTrue'])
37 return unittest.makeSuite(DependencyTest)
39 class DependencyTest(unittest.TestCase):
41 # test environment setup
42 self.repo_list = [{'name': 'local_base',
43 'url': 'file:/' + TEST_REPODATA_LOC + '/base'},
44 {'name': 'local_mobile',
45 'url': 'file:/' + TEST_REPODATA_LOC + '/mobile'}]
47 self.repodata_list = get_repodata_from_repos(self.repo_list, DEFAULT_CACHEDIR)
51 repo_parser = RepodataParser(self.repodata_list)
52 self.pkg_group = repo_parser.parse()
53 except Exception as e:
54 raise self.failureException
58 # clear environment after test
63 self.assertNotEqual(self.pkg_group, None)
64 self.assertNotEqual(self.pkg_group['pkg_list'], None)
65 self.assertNotEqual(self.pkg_group['pkg2id'], None)
66 self.assertEqual(len(self.pkg_group['pkg_list']), len(self.pkg_group['pkg2id']))
68 pkg_list = self.pkg_group['pkg_list']
69 pkg2id = self.pkg_group['pkg2id']
71 # Consistency checks between pkg_list and pkg2id
72 for pkg_id in range(len(pkg_list)):
73 pkg_info = pkg_list[pkg_id]
74 self.assertEqual(pkg_id, pkg2id.get(pkg_info['name']))
76 def test_dependency(self):
77 # 1st package install-depdency analysis
78 analyze_dependency(self.pkg_group)
79 # print('time:', current_milli_time() - start_time)
80 pkg_list = self.pkg_group['pkg_list']
82 # 2nd package install-depdency analysis for test (using bruteforce)
83 test_repo_parser = RepodataParser(self.repodata_list)
84 test_pkg_group = test_repo_parser.parse()
85 test_pkg_list = test_pkg_group.get('pkg_list')
86 test_pkg2id = test_pkg_group.get('pkg2id')
89 visited = [0]*len(test_pkg_list)
90 for pkg_id in range(len(test_pkg_list)):
91 visited[pkg_id] = count
92 dep_set = self.dep_dfs(count, pkg_id, test_pkg_list, visited)
93 test_pkg_list[pkg_id]['dependency'] = list(dep_set)
95 # print('time:', current_milli_time() - start_time)
97 # compares the result of 1st and 2nd
98 if len(pkg_list) != len(test_pkg_list):
99 raise self.failureException
101 for pkg_id in range(len(pkg_list)):
102 pkg_info = pkg_list[pkg_id]
103 test_pkg_info = test_pkg_list[test_pkg2id[pkg_info['name']]]
106 if pkg_info['name'] != test_pkg_info['name']:
107 raise self.failureException
110 if len(pkg_info['dependency']) != len(test_pkg_info['dependency']):
111 print('pkg_info_1:', pkg_info['dependency'])
112 print('pkg_info_2:', test_pkg_info['dependency'])
113 raise self.failureException
115 # dependency validation between pkg_info and test_pkg_info
116 for dep in test_pkg_info.get('dependency'):
117 if dep not in pkg_info['dependency']:
118 print(dep, 'does not exist')
119 raise self.failureException
121 def dep_dfs(self, count, pkg_id, pkg_list, visited):
122 dep_set = set([pkg_list[pkg_id]['name']])
124 if pkg_list[pkg_id].get('requires'):
125 for req in pkg_list[pkg_id].get('requires'):
126 req_id = req.get('id')
127 if req_id is not None:
128 if visited[req_id] < count:
129 visited[req_id] = count
130 dep_set.update(self.dep_dfs(count, req_id, pkg_list, visited))
133 #TODO: package doest not exist
134 #print('def_dfs::', req['name'], 'is not exist (in dep_analysis)')
138 if __name__ == "__main__":
139 #import sys;sys.argv = ['', 'Test.testName']