2 # Copyright (c) 2000 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
5 # @author Chulwoo Shin <cw1.shin@samsung.com>
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
27 from tic.dependency import analyze_dependency
28 from tic.parser.repo_parser import RepodataParser
29 from tic.repo import Repo
30 from tic.repo import get_repodata_from_repos
33 current_milli_time = lambda: int(round(time.time() * 1000))
34 CWD = os.path.dirname(__file__) or '.'
35 TEST_REPODATA_LOC=os.path.join(CWD, 'dependency_fixtures')
36 DEFAULT_CACHEDIR='/var/tmp/tic-core/cached'
39 # return unittest.makeSuite(DependencyTest, ['testsun1', 'testTrue'])
40 return unittest.makeSuite(DependencyTest)
42 class DependencyTest(unittest.TestCase):
44 # test environment setup
45 self.repo_list = ['file:/' + TEST_REPODATA_LOC + '/base',
46 'file:/' + TEST_REPODATA_LOC + '/mobile']
49 for repo_url in self.repo_list:
50 repos.append(Repo(base64.urlsafe_b64encode(repo_url), repo_url))
51 self.repodata_list = get_repodata_from_repos(repos, DEFAULT_CACHEDIR)
55 repo_parser = RepodataParser(self.repodata_list)
56 self.pkg_group = repo_parser.parse()
57 except Exception as e:
58 raise self.failureException
62 # clear environment after test
67 self.assertNotEqual(self.pkg_group, None)
68 self.assertNotEqual(self.pkg_group['pkg_list'], None)
69 self.assertNotEqual(self.pkg_group['pkg2id'], None)
70 self.assertEqual(len(self.pkg_group['pkg_list']), len(self.pkg_group['pkg2id']))
72 pkg_list = self.pkg_group['pkg_list']
73 pkg2id = self.pkg_group['pkg2id']
75 # Consistency checks between pkg_list and pkg2id
76 for pkg_id in range(len(pkg_list)):
77 pkg_info = pkg_list[pkg_id]
78 self.assertEqual(pkg_id, pkg2id.get(pkg_info['name']))
80 def test_dependency(self):
81 # 1st package install-depdency analysis
82 analyze_dependency(self.pkg_group)
83 # print('time:', current_milli_time() - start_time)
84 pkg_list = self.pkg_group['pkg_list']
86 # 2nd package install-depdency analysis for test (using bruteforce)
87 test_repo_parser = RepodataParser(self.repodata_list)
88 test_pkg_group = test_repo_parser.parse()
89 test_pkg_list = test_pkg_group.get('pkg_list')
90 test_pkg2id = test_pkg_group.get('pkg2id')
93 visited = [0]*len(test_pkg_list)
94 for pkg_id in range(len(test_pkg_list)):
95 visited[pkg_id] = count
96 dep_set = self.dep_dfs(count, pkg_id, test_pkg_list, visited)
97 test_pkg_list[pkg_id]['dependency'] = list(dep_set)
99 # print('time:', current_milli_time() - start_time)
101 # compares the result of 1st and 2nd
102 if len(pkg_list) != len(test_pkg_list):
103 raise self.failureException
105 for pkg_id in range(len(pkg_list)):
106 pkg_info = pkg_list[pkg_id]
107 test_pkg_info = test_pkg_list[test_pkg2id[pkg_info['name']]]
110 if pkg_info['name'] != test_pkg_info['name']:
111 raise self.failureException
114 if len(pkg_info['dependency']) != len(test_pkg_info['dependency']):
115 print('pkg_info_1:', pkg_info['dependency'])
116 print('pkg_info_2:', test_pkg_info['dependency'])
117 raise self.failureException
119 # dependency validation between pkg_info and test_pkg_info
120 for dep in test_pkg_info.get('dependency'):
121 if dep not in pkg_info['dependency']:
122 print(dep, 'does not exist')
123 raise self.failureException
125 def dep_dfs(self, count, pkg_id, pkg_list, visited):
126 dep_set = set([pkg_list[pkg_id]['name']])
128 if pkg_list[pkg_id].get('requires'):
129 for req in pkg_list[pkg_id].get('requires'):
130 req_id = req.get('id')
131 if req_id is not None:
132 if visited[req_id] < count:
133 visited[req_id] = count
134 dep_set.update(self.dep_dfs(count, req_id, pkg_list, visited))
137 #TODO: package doest not exist
138 #print('def_dfs::', req['name'], 'is not exist (in dep_analysis)')
142 if __name__ == "__main__":
143 #import sys;sys.argv = ['', 'Test.testName']