Upstream version 11.39.266.0
[platform/framework/web/crosswalk.git] / src / mojo / python / tests / system_unittest.py
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import random
6 import sys
7 import time
8 import unittest
9
10 # pylint: disable=F0401
11 import mojo.embedder
12 from mojo import system
13
14 DATA_SIZE = 1024
15
16
17 def _GetRandomBuffer(size):
18   random.seed(size)
19   return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
20
21
22 class BaseMojoTest(unittest.TestCase):
23
24   def setUp(self):
25     mojo.embedder.Init()
26
27
28 class CoreTest(BaseMojoTest):
29
30   def testResults(self):
31     self.assertEquals(system.RESULT_OK, 0)
32     self.assertLess(system.RESULT_CANCELLED, 0)
33     self.assertLess(system.RESULT_UNKNOWN, 0)
34     self.assertLess(system.RESULT_INVALID_ARGUMENT, 0)
35     self.assertLess(system.RESULT_DEADLINE_EXCEEDED, 0)
36     self.assertLess(system.RESULT_NOT_FOUND, 0)
37     self.assertLess(system.RESULT_ALREADY_EXISTS, 0)
38     self.assertLess(system.RESULT_PERMISSION_DENIED, 0)
39     self.assertLess(system.RESULT_RESOURCE_EXHAUSTED, 0)
40     self.assertLess(system.RESULT_FAILED_PRECONDITION, 0)
41     self.assertLess(system.RESULT_ABORTED, 0)
42     self.assertLess(system.RESULT_OUT_OF_RANGE, 0)
43     self.assertLess(system.RESULT_UNIMPLEMENTED, 0)
44     self.assertLess(system.RESULT_INTERNAL, 0)
45     self.assertLess(system.RESULT_UNAVAILABLE, 0)
46     self.assertLess(system.RESULT_DATA_LOSS, 0)
47     self.assertLess(system.RESULT_BUSY, 0)
48     self.assertLess(system.RESULT_SHOULD_WAIT, 0)
49
50   def testConstants(self):
51     self.assertGreaterEqual(system.DEADLINE_INDEFINITE, 0)
52     self.assertGreaterEqual(system.HANDLE_SIGNAL_NONE, 0)
53     self.assertGreaterEqual(system.HANDLE_SIGNAL_READABLE, 0)
54     self.assertGreaterEqual(system.HANDLE_SIGNAL_WRITABLE, 0)
55     self.assertGreaterEqual(system.WRITE_MESSAGE_FLAG_NONE, 0)
56     self.assertGreaterEqual(system.READ_MESSAGE_FLAG_NONE, 0)
57     self.assertGreaterEqual(system.READ_MESSAGE_FLAG_MAY_DISCARD, 0)
58     self.assertGreaterEqual(system.WRITE_DATA_FLAG_NONE, 0)
59     self.assertGreaterEqual(system.WRITE_DATA_FLAG_ALL_OR_NONE, 0)
60     self.assertGreaterEqual(system.READ_DATA_FLAG_NONE, 0)
61     self.assertGreaterEqual(system.READ_DATA_FLAG_ALL_OR_NONE, 0)
62     self.assertGreaterEqual(system.READ_DATA_FLAG_DISCARD, 0)
63     self.assertGreaterEqual(system.READ_DATA_FLAG_QUERY, 0)
64     self.assertGreaterEqual(system.MAP_BUFFER_FLAG_NONE, 0)
65
66   def testGetTimeTicksNow(self):
67     pt1 = time.time()
68     v1 = system.GetTimeTicksNow()
69     time.sleep(1e-3)
70     v2 = system.GetTimeTicksNow()
71     pt2 = time.time()
72     self.assertGreater(v1, 0)
73     self.assertGreater(v2, v1 + 1000)
74     self.assertGreater(1e6 * (pt2 - pt1), v2 - v1)
75
76   def _testHandlesCreation(self, *args):
77     for handle in args:
78       self.assertTrue(handle.IsValid())
79       handle.Close()
80       self.assertFalse(handle.IsValid())
81
82   def _TestMessageHandleCreation(self, handles):
83     self._testHandlesCreation(handles.handle0, handles.handle1)
84
85   def testCreateMessagePipe(self):
86     self._TestMessageHandleCreation(system.MessagePipe())
87
88   def testCreateMessagePipeWithNoneOptions(self):
89     self._TestMessageHandleCreation(system.MessagePipe(None))
90
91   def testCreateMessagePipeWithOptions(self):
92     self._TestMessageHandleCreation(
93         system.MessagePipe(system.CreateMessagePipeOptions()))
94
95   def testWaitOverMessagePipe(self):
96     handles = system.MessagePipe()
97     handle = handles.handle0
98
99     self.assertEquals(system.RESULT_OK, handle.Wait(
100         system.HANDLE_SIGNAL_WRITABLE, system.DEADLINE_INDEFINITE))
101     self.assertEquals(system.RESULT_DEADLINE_EXCEEDED,
102                       handle.Wait(system.HANDLE_SIGNAL_READABLE, 0))
103
104     handles.handle1.WriteMessage()
105
106     self.assertEquals(
107         system.RESULT_OK,
108         handle.Wait(
109             system.HANDLE_SIGNAL_READABLE,
110             system.DEADLINE_INDEFINITE))
111
112   def testWaitOverManyMessagePipe(self):
113     handles = system.MessagePipe()
114     handle0 = handles.handle0
115     handle1 = handles.handle1
116
117     self.assertEquals(
118         0,
119         system.WaitMany(
120             [(handle0, system.HANDLE_SIGNAL_WRITABLE),
121              (handle1, system.HANDLE_SIGNAL_WRITABLE)],
122             system.DEADLINE_INDEFINITE))
123     self.assertEquals(
124         system.RESULT_DEADLINE_EXCEEDED,
125         system.WaitMany(
126             [(handle0, system.HANDLE_SIGNAL_READABLE),
127              (handle1, system.HANDLE_SIGNAL_READABLE)], 0))
128
129     handle0.WriteMessage()
130
131     self.assertEquals(
132         1,
133         system.WaitMany(
134             [(handle0, system.HANDLE_SIGNAL_READABLE),
135              (handle1, system.HANDLE_SIGNAL_READABLE)],
136             system.DEADLINE_INDEFINITE))
137
138   def testSendBytesOverMessagePipe(self):
139     handles = system.MessagePipe()
140     data = _GetRandomBuffer(DATA_SIZE)
141     handles.handle0.WriteMessage(data)
142     (res, buffers, next_message) = handles.handle1.ReadMessage()
143     self.assertEquals(system.RESULT_RESOURCE_EXHAUSTED, res)
144     self.assertEquals(None, buffers)
145     self.assertEquals((DATA_SIZE, 0), next_message)
146     result = bytearray(DATA_SIZE)
147     (res, buffers, next_message) = handles.handle1.ReadMessage(result)
148     self.assertEquals(system.RESULT_OK, res)
149     self.assertEquals(None, next_message)
150     self.assertEquals((data, []), buffers)
151
152   def testSendEmptyDataOverMessagePipe(self):
153     handles = system.MessagePipe()
154     handles.handle0.WriteMessage(None)
155     (res, buffers, next_message) = handles.handle1.ReadMessage()
156
157     self.assertEquals(system.RESULT_OK, res)
158     self.assertEquals(None, next_message)
159     self.assertEquals((None, []), buffers)
160
161   def testSendHandleOverMessagePipe(self):
162     handles = system.MessagePipe()
163     handles_to_send = system.MessagePipe()
164     handles.handle0.WriteMessage(handles=[handles_to_send.handle0,
165                                            handles_to_send.handle1])
166     (res, buffers, next_message) = handles.handle1.ReadMessage(
167         max_number_of_handles=2)
168
169     self.assertFalse(handles_to_send.handle0.IsValid())
170     self.assertFalse(handles_to_send.handle1.IsValid())
171     self.assertEquals(system.RESULT_OK, res)
172     self.assertEquals(None, next_message)
173     self.assertEquals(None, buffers[0])
174     self.assertEquals(2, len(buffers[1]))
175
176     handles = buffers[1]
177     for handle in handles:
178       self.assertTrue(handle.IsValid())
179       (res, buffers, next_message) = handle.ReadMessage()
180       self.assertEquals(system.RESULT_SHOULD_WAIT, res)
181
182     for handle in handles:
183       handle.WriteMessage()
184
185     for handle in handles:
186       (res, buffers, next_message) = handle.ReadMessage()
187       self.assertEquals(system.RESULT_OK, res)
188
189   def _TestDataHandleCreation(self, handles):
190     self._testHandlesCreation(
191         handles.producer_handle, handles.consumer_handle)
192
193   def testCreateDataPipe(self):
194     self._TestDataHandleCreation(system.DataPipe())
195
196   def testCreateDataPipeWithNoneOptions(self):
197     self._TestDataHandleCreation(system.DataPipe(None))
198
199   def testCreateDataPipeWithDefaultOptions(self):
200     self._TestDataHandleCreation(
201         system.DataPipe(system.CreateDataPipeOptions()))
202
203   def testCreateDataPipeWithDiscardFlag(self):
204     options = system.CreateDataPipeOptions()
205     options.flags = system.CreateDataPipeOptions.FLAG_MAY_DISCARD
206     self._TestDataHandleCreation(system.DataPipe(options))
207
208   def testCreateDataPipeWithElementSize(self):
209     options = system.CreateDataPipeOptions()
210     options.element_num_bytes = 5
211     self._TestDataHandleCreation(system.DataPipe(options))
212
213   def testCreateDataPipeWithCapacity(self):
214     options = system.CreateDataPipeOptions()
215     options.element_capacity_num_bytes = DATA_SIZE
216     self._TestDataHandleCreation(system.DataPipe(options))
217
218   def testCreateDataPipeWithIncorrectParameters(self):
219     options = system.CreateDataPipeOptions()
220     options.element_num_bytes = 5
221     options.capacity_num_bytes = DATA_SIZE
222     with self.assertRaises(system.MojoException) as cm:
223       self._TestDataHandleCreation(system.DataPipe(options))
224     self.assertEquals(system.RESULT_INVALID_ARGUMENT, cm.exception.mojo_result)
225
226   def testSendEmptyDataOverDataPipe(self):
227     pipes = system.DataPipe()
228     self.assertEquals((system.RESULT_OK, 0), pipes.producer_handle.WriteData())
229     self.assertEquals(
230         (system.RESULT_OK, None), pipes.consumer_handle.ReadData())
231
232   def testSendDataOverDataPipe(self):
233     pipes = system.DataPipe()
234     data = _GetRandomBuffer(DATA_SIZE)
235     self.assertEquals((system.RESULT_OK, DATA_SIZE),
236                       pipes.producer_handle.WriteData(data))
237     self.assertEquals((system.RESULT_OK, data),
238                       pipes.consumer_handle.ReadData(bytearray(DATA_SIZE)))
239
240   def testTwoPhaseWriteOnDataPipe(self):
241     pipes = system.DataPipe()
242     (res, buf) = pipes.producer_handle.BeginWriteData(DATA_SIZE)
243     self.assertEquals(system.RESULT_OK, res)
244     self.assertGreaterEqual(len(buf.buffer), DATA_SIZE)
245     data = _GetRandomBuffer(DATA_SIZE)
246     buf.buffer[0:DATA_SIZE] = data
247     self.assertEquals(system.RESULT_OK, buf.End(DATA_SIZE))
248     self.assertEquals((system.RESULT_OK, data),
249                       pipes.consumer_handle.ReadData(bytearray(DATA_SIZE)))
250
251   def testTwoPhaseReadOnDataPipe(self):
252     pipes = system.DataPipe()
253     data = _GetRandomBuffer(DATA_SIZE)
254     self.assertEquals((system.RESULT_OK, DATA_SIZE),
255                       pipes.producer_handle.WriteData(data))
256     (res, buf) = pipes.consumer_handle.BeginReadData()
257     self.assertEquals(system.RESULT_OK, res)
258     self.assertEquals(DATA_SIZE, len(buf.buffer))
259     self.assertEquals(data, buf.buffer)
260     self.assertEquals(system.RESULT_OK, buf.End(DATA_SIZE))
261
262   def testCreateSharedBuffer(self):
263     self._testHandlesCreation(system.CreateSharedBuffer(DATA_SIZE))
264
265   def testCreateSharedBufferWithNoneOptions(self):
266     self._testHandlesCreation(system.CreateSharedBuffer(DATA_SIZE, None))
267
268   def testCreateSharedBufferWithDefaultOptions(self):
269     self._testHandlesCreation(
270         system.CreateSharedBuffer(
271             DATA_SIZE,
272             system.CreateSharedBufferOptions()))
273
274   def testDuplicateSharedBuffer(self):
275     handle = system.CreateSharedBuffer(DATA_SIZE)
276     self._testHandlesCreation(handle.Duplicate())
277
278   def testDuplicateSharedBufferWithNoneOptions(self):
279     handle = system.CreateSharedBuffer(DATA_SIZE)
280     self._testHandlesCreation(handle.Duplicate(None))
281
282   def testDuplicateSharedBufferWithDefaultOptions(self):
283     handle = system.CreateSharedBuffer(DATA_SIZE)
284     self._testHandlesCreation(
285         handle.Duplicate(system.DuplicateSharedBufferOptions()))
286
287   def testSendBytesOverSharedBuffer(self):
288     handle = system.CreateSharedBuffer(DATA_SIZE)
289     duplicated = handle.Duplicate()
290     data = _GetRandomBuffer(DATA_SIZE)
291     (res1, buf1) = handle.Map(0, DATA_SIZE)
292     (res2, buf2) = duplicated.Map(0, DATA_SIZE)
293     self.assertEquals(system.RESULT_OK, res1)
294     self.assertEquals(system.RESULT_OK, res2)
295     self.assertEquals(DATA_SIZE, len(buf1.buffer))
296     self.assertEquals(DATA_SIZE, len(buf2.buffer))
297     self.assertEquals(buf1.buffer, buf2.buffer)
298
299     buf1.buffer[:] = data
300     self.assertEquals(data, buf1.buffer)
301     self.assertEquals(data, buf2.buffer)
302     self.assertEquals(buf1.buffer, buf2.buffer)
303
304
305 if __name__ == '__main__':
306   suite = unittest.TestLoader().loadTestsFromTestCase(CoreTest)
307   test_results = unittest.TextTestRunner(verbosity=0).run(suite)
308   if not test_results.wasSuccessful():
309     sys.exit(1)
310   sys.exit(0)