Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / third_party / chromite / cbuildbot / metadata_lib_unittest.py
1 #!/usr/bin/python
2 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Test the archive_lib module."""
7
8 import logging
9 import multiprocessing
10 import os
11 import sys
12
13 sys.path.insert(0, os.path.abspath('%s/../..' % os.path.dirname(__file__)))
14 from chromite.cbuildbot import metadata_lib
15 from chromite.lib import cros_test_lib
16 from chromite.lib import parallel
17
18
19 @cros_test_lib.NetworkTest()
20 class MetadataFetchTest(cros_test_lib.TestCase):
21   """Test functions for fetching metadata from GS."""
22
23   def testPaladinBuilder(self):
24     bot, version = ('x86-mario-paladin', '5611.0.0')
25     full_version = metadata_lib.FindLatestFullVersion(bot, version)
26     self.assertEqual(full_version, 'R35-5611.0.0-rc2')
27     metadata = metadata_lib.GetBuildMetadata(bot, full_version)
28     metadata_dict = metadata._metadata_dict # pylint: disable=W0212
29     self.assertEqual(metadata_dict['status']['status'], 'passed')
30
31
32 class MetadataTest(cros_test_lib.TestCase):
33   """Tests the correctness of various metadata methods."""
34
35   def testGetDict(self):
36     starting_dict = {'key1': 1,
37                      'key2': '2',
38                      'cl_actions': [('a', 1), ('b', 2)],
39                      'board-metadata': {
40                          'board-1': {'info': 432}
41                          }
42                      }
43     metadata = metadata_lib.CBuildbotMetadata(starting_dict)
44     ending_dict = metadata.GetDict()
45     self.assertEqual(starting_dict, ending_dict)
46
47   def testUpdateKeyDictWithDict(self):
48     expected_dict = {str(x): x for x in range(20)}
49     m = multiprocessing.Manager()
50     metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)
51
52     metadata.UpdateKeyDictWithDict('my_dict', expected_dict)
53
54     self.assertEqual(expected_dict, metadata.GetDict()['my_dict'])
55
56
57   def testUpdateKeyDictWithDictMultiprocess(self):
58     expected_dict = {str(x): x for x in range(20)}
59     m = multiprocessing.Manager()
60     metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)
61
62     with parallel.BackgroundTaskRunner(metadata.UpdateKeyDictWithDict) as q:
63       for k, v in expected_dict.iteritems():
64         q.put(['my_dict', {k: v}])
65
66     self.assertEqual(expected_dict, metadata.GetDict()['my_dict'])
67
68   def testUpdateBoardMetadataWithMultiprocessDict(self):
69     starting_dict = {'key1': 1,
70                      'key2': '2',
71                      'cl_actions': [('a', 1), ('b', 2)],
72                      'board-metadata': {
73                          'board-1': {'info': 432}
74                          }
75                      }
76
77     m = multiprocessing.Manager()
78     metadata = metadata_lib.CBuildbotMetadata(metadata_dict=starting_dict,
79                                               multiprocess_manager=m)
80
81     # pylint: disable-msg=E1101
82     update_dict = m.dict()
83     update_dict['my_key'] = 'some value'
84     metadata.UpdateBoardDictWithDict('board-1', update_dict)
85
86     self.assertEqual(metadata.GetDict()['board-metadata']['board-1']['my_key'],
87                      'some value')
88
89   def testMultiprocessSafety(self):
90     m = multiprocessing.Manager()
91     metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)
92     key_dict = {'key1': 1, 'key2': 2}
93     starting_dict = {'key1': 1,
94                      'key2': '2',
95                      'key3': key_dict,
96                      'cl_actions': [('a', 1), ('b', 2)],
97                      'board-metadata': {
98                          'board-1': {'info': 432}
99                          }
100                     }
101
102     # Test that UpdateWithDict is process-safe
103     parallel.RunParallelSteps([lambda: metadata.UpdateWithDict(starting_dict)])
104     ending_dict = metadata.GetDict()
105     self.assertEqual(starting_dict, ending_dict)
106
107     # Test that UpdateKeyDictWithDict is process-safe
108     parallel.RunParallelSteps([lambda: metadata.UpdateKeyDictWithDict(
109         'key3', key_dict)])
110     ending_dict = metadata.GetDict()
111     self.assertEqual(starting_dict, ending_dict)
112
113     # Test that RecordCLAction is process-safe
114     fake_change = metadata_lib.GerritPatchTuple(12345, 1, False)
115     fake_action = ('asdf,')
116     parallel.RunParallelSteps([lambda: metadata.RecordCLAction(fake_change,
117                                                                fake_action)])
118     ending_dict = metadata.GetDict()
119     # Assert that an action was recorded.
120     self.assertEqual(len(starting_dict['cl_actions']) + 1,
121                      len(ending_dict['cl_actions']))
122
123   def testPerBoardDict(self):
124     starting_per_board_dict = {
125         'board-1': {'kubrick': 2001,
126                     'bergman': 'persona',
127                     'hitchcock': 'vertigo'},
128         'board-2': {'kubrick': ['barry lyndon', 'dr. strangelove'],
129                     'bergman': 'the seventh seal'}
130     }
131
132     starting_dict = {'board-metadata': starting_per_board_dict}
133
134     m = multiprocessing.Manager()
135     metadata = metadata_lib.CBuildbotMetadata(metadata_dict=starting_dict,
136                                               multiprocess_manager=m)
137
138     extra_per_board_dict = {
139         'board-1': {'kurosawa': 'rashomon',
140                     'coen brothers': 'fargo'},
141         'board-3': {'hitchcock': 'north by northwest',
142                     'coen brothers': 'the big lebowski'}
143     }
144
145     expected_dict = starting_per_board_dict
146
147     # Write each per board key-value pair to metadata in a separate process.
148     with parallel.BackgroundTaskRunner(metadata.UpdateBoardDictWithDict) as q:
149       for board, board_dict in extra_per_board_dict.iteritems():
150         expected_dict.setdefault(board, {}).update(board_dict)
151         for k, v in board_dict.iteritems():
152           q.put([board, {k: v}])
153
154     self.assertEqual(expected_dict, metadata.GetDict()['board-metadata'])
155
156
157 if __name__ == '__main__':
158   cros_test_lib.main(level=logging.DEBUG)