bfcae8d03c260020a898010e33ce909c756ad6f7
[archive/20170607/tools/tic-core.git] / test / test_dependency.py
1 #!/usr/bin/python
2 # Copyright (c) 2000 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
3 #
4 # Contact: 
5 # @author Chulwoo Shin <cw1.shin@samsung.com>
6
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # Contributors:
20 # - S-Core Co., Ltd
21
22 import os
23 import base64
24 import time
25 import unittest
26
27 from tic.dependency import analyze_dependency
28 from tic.parser.repo_parser import RepodataParser
29 from tic.repo import Repo
30 from tic.repo import get_repodata_from_repos
31
32
33 current_milli_time = lambda: int(round(time.time() * 1000))
34 CWD = os.path.dirname(__file__) or '.'
35 TEST_REPODATA_LOC=os.path.join(CWD, 'dependency_fixtures')
36 DEFAULT_CACHEDIR='/var/tmp/tic-core/cached'
37
38 def suite():
39     # return unittest.makeSuite(DependencyTest, ['testsun1', 'testTrue'])
40     return unittest.makeSuite(DependencyTest)
41
42 class DependencyTest(unittest.TestCase):
43     def setUp(self):
44         # test environment setup
45         self.repo_list = ['file:/' + TEST_REPODATA_LOC + '/base',
46                           'file:/' + TEST_REPODATA_LOC + '/mobile']
47         
48         repos = []
49         for repo_url in self.repo_list:
50             repos.append(Repo(base64.urlsafe_b64encode(repo_url), repo_url))
51         self.repodata_list = get_repodata_from_repos(repos, DEFAULT_CACHEDIR)
52         
53         self.pkg_group = None
54         try:
55             repo_parser = RepodataParser(self.repodata_list)
56             self.pkg_group = repo_parser.parse()
57         except Exception as e:
58             raise self.failureException
59
60
61     def tearDown(self):
62         # clear environment after test 
63         del self.repo_list
64         del self.pkg_group
65
66     def test_parse(self):
67         self.assertNotEqual(self.pkg_group, None)
68         self.assertNotEqual(self.pkg_group['pkg_list'], None)
69         self.assertNotEqual(self.pkg_group['pkg2id'], None)
70         self.assertEqual(len(self.pkg_group['pkg_list']), len(self.pkg_group['pkg2id']))
71         
72         pkg_list = self.pkg_group['pkg_list']
73         pkg2id = self.pkg_group['pkg2id']
74         
75         # Consistency checks between pkg_list and pkg2id 
76         for pkg_id in range(len(pkg_list)):
77             pkg_info = pkg_list[pkg_id]
78             self.assertEqual(pkg_id, pkg2id.get(pkg_info['name']))
79     
80     def test_dependency(self):
81         # 1st package install-depdency analysis
82         analyze_dependency(self.pkg_group)
83         # print('time:', current_milli_time() - start_time)
84         pkg_list = self.pkg_group['pkg_list']
85         
86         # 2nd package install-depdency analysis for test (using bruteforce)
87         test_repo_parser = RepodataParser(self.repodata_list)
88         test_pkg_group = test_repo_parser.parse()
89         test_pkg_list = test_pkg_group.get('pkg_list')
90         test_pkg2id = test_pkg_group.get('pkg2id')
91         
92         count = 1
93         visited = [0]*len(test_pkg_list)
94         for pkg_id in range(len(test_pkg_list)):
95             visited[pkg_id] = count
96             dep_set = self.dep_dfs(count, pkg_id, test_pkg_list, visited)
97             test_pkg_list[pkg_id]['dependency'] = list(dep_set)
98             count += 1
99         # print('time:', current_milli_time() - start_time)
100         
101         # compares the result of 1st and 2nd
102         if len(pkg_list) != len(test_pkg_list):
103             raise self.failureException
104         
105         for pkg_id in range(len(pkg_list)):
106             pkg_info = pkg_list[pkg_id]
107             test_pkg_info = test_pkg_list[test_pkg2id[pkg_info['name']]]
108             
109             # package check 
110             if pkg_info['name'] != test_pkg_info['name']:
111                 raise self.failureException
112             
113             # dependency count 
114             if len(pkg_info['dependency']) != len(test_pkg_info['dependency']):
115                 print('pkg_info_1:', pkg_info['dependency'])
116                 print('pkg_info_2:', test_pkg_info['dependency'])
117                 raise self.failureException
118             
119             # dependency validation between pkg_info and test_pkg_info
120             for dep in test_pkg_info.get('dependency'):
121                 if dep not in pkg_info['dependency']:
122                     print(dep, 'does not exist')
123                     raise self.failureException
124     
125     def dep_dfs(self, count, pkg_id, pkg_list, visited):
126         dep_set = set([pkg_list[pkg_id]['name']])
127         
128         if pkg_list[pkg_id].get('requires'):
129             for req in pkg_list[pkg_id].get('requires'):
130                 req_id = req.get('id')
131                 if req_id is not None:
132                     if visited[req_id] < count:
133                         visited[req_id] = count
134                         dep_set.update(self.dep_dfs(count, req_id, pkg_list, visited))
135                 else:
136                     pass
137                     #TODO: package doest not exist
138                     #print('def_dfs::', req['name'], 'is not exist (in dep_analysis)')
139         
140         return dep_set
141
142 if __name__ == "__main__":
143     #import sys;sys.argv = ['', 'Test.testName']
144     unittest.main()