Branched from 2.0alpha and pushed for 2.0
[profile/ivi/dbus-python.git] / test / cross-test-client.py
1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
2 #
3 # Permission is hereby granted, free of charge, to any person
4 # obtaining a copy of this software and associated documentation
5 # files (the "Software"), to deal in the Software without
6 # restriction, including without limitation the rights to use, copy,
7 # modify, merge, publish, distribute, sublicense, and/or sell copies
8 # of the Software, and to permit persons to whom the Software is
9 # furnished to do so, subject to the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 # DEALINGS IN THE SOFTWARE.
22
23 from time import sleep
24 import logging
25
26 import gobject
27
28 from dbus import SessionBus, Interface, Array, Byte, Double, Boolean, ByteArray, Int16, Int32, Int64, UInt16, UInt32, UInt64, String, UTF8String, Struct, Dictionary
29 from dbus.service import BusName
30 import dbus.glib
31
32 from crosstest import CROSS_TEST_PATH, CROSS_TEST_BUS_NAME,\
33                       INTERFACE_SINGLE_TESTS, INTERFACE_TESTS,\
34                       INTERFACE_SIGNAL_TESTS, INTERFACE_CALLBACK_TESTS,\
35                       SignalTestsImpl
36
37
38 logging.basicConfig()
39 logging.getLogger().setLevel(1)
40 logger = logging.getLogger('cross-test-client')
41
42
43 class Client(SignalTestsImpl):
44     fail_id = 0
45     expected = set()
46
47     def quit(self):
48         for x in self.expected:
49             self.fail_id += 1
50             print "%s fail %d" % (x, self.fail_id)
51             s = "report %d: reply to %s didn't arrive" % (self.fail_id, x)
52             print s
53             logger.error(s)
54         logger.info("asking server to Exit")
55         Interface(self.obj, INTERFACE_TESTS).Exit(reply_handler=self.quit_reply_handler, error_handler=self.quit_error_handler)
56         # if the server doesn't reply we'll just exit anyway
57         gobject.timeout_add(1000, lambda: (loop.quit(), False)[1])
58
59     def quit_reply_handler(self):
60         logger.info("server says it will exit")
61         loop.quit()
62
63     def quit_error_handler(self, e):
64         logger.error("error telling server to quit: %s %s",
65                      e.__class__, e)
66         loop.quit()
67
68     @dbus.service.method(INTERFACE_CALLBACK_TESTS, 'qd')
69     def Response(self, input1, input2):
70         logger.info("signal/callback: Response received (%r,%r)",
71                     input1, input2)
72         self.expected.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
73         if (input1, input2) != (42, 23):
74             self.fail_id += 1
75             print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS, self.fail_id)
76             s = ("report %d: expected (42,23), got %r"
77                  % (self.fail_id, (input1, input2)))
78             logger.error(s)
79             print s
80         else:
81             print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
82         self.quit()
83
84     def assert_method_matches(self, interface, check_fn, check_arg, member, *args):
85         if_obj = Interface(self.obj, interface)
86         method = getattr(if_obj, member)
87         try:
88             real_ret = method(*args)
89         except Exception, e:
90             self.fail_id += 1
91             print "%s.%s fail %d" % (interface, member, self.fail_id)
92             s = ("report %d: %s.%s%r: raised %r \"%s\""
93                  % (self.fail_id, interface, member, args, e, e))
94             print s
95             logger.error(s)
96             __import__('traceback').print_exc()
97             return
98         try:
99             check_fn(real_ret, check_arg)
100         except Exception, e:
101             self.fail_id += 1
102             print "%s.%s fail %d" % (interface, member, self.fail_id)
103             s = ("report %d: %s.%s%r: %s"
104                  % (self.fail_id, interface, member, args, e))
105             print s
106             logger.error(s)
107             return
108         print "%s.%s pass" % (interface, member)
109
110     def assert_method_eq(self, interface, ret, member, *args):
111         def equals(real_ret, exp):
112             if real_ret != exp:
113                 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp, exp.__class__, real_ret, real_ret.__class__))
114             if real_ret != exp:
115                 raise AssertionError('expected %r, got %r' % (exp, real_ret))
116             if not isinstance(exp, (tuple, type(None))):
117                 if real_ret.variant_level != getattr(exp, 'variant_level', 0):
118                     raise AssertionError('expected variant_level=%d, got %r with level %d'
119                         % (getattr(exp, 'variant_level', 0), real_ret,
120                            real_ret.variant_level))
121             if isinstance(exp, list) or isinstance(exp, tuple):
122                 for i in xrange(len(exp)):
123                     try:
124                         equals(real_ret[i], exp[i])
125                     except AssertionError, e:
126                         if not isinstance(e.args, tuple):
127                             e.args = (e.args,)
128                         e.args = e.args + ('(at position %d in sequence)' % i,)
129                         raise e
130             elif isinstance(exp, dict):
131                 for k in exp:
132                     try:
133                         equals(real_ret[k], exp[k])
134                     except AssertionError, e:
135                         if not isinstance(e.args, tuple):
136                             e.args = (e.args,)
137                         e.args = e.args + ('(at key %r in dict)' % k,)
138                         raise e
139         self.assert_method_matches(interface, equals, ret, member, *args)
140
141     def assert_InvertMapping_eq(self, interface, expected, member, mapping):
142         def check(real_ret, exp):
143             for key in exp:
144                 if key not in real_ret:
145                     raise AssertionError('missing key %r' % key)
146             for key in real_ret:
147                 if key not in exp:
148                     raise AssertionError('unexpected key %r' % key)
149                 got = list(real_ret[key])
150                 wanted = list(exp[key])
151                 got.sort()
152                 wanted.sort()
153                 if got != wanted:
154                     raise AssertionError('expected %r => %r, got %r'
155                                          % (key, wanted, got))
156         self.assert_method_matches(interface, check, expected, member, mapping)
157
158     def triggered_cb(self, param, sender_path):
159         logger.info("method/signal: Triggered(%r) by %r",
160                     param, sender_path)
161         self.expected.discard('%s.Trigger' % INTERFACE_TESTS)
162         if sender_path != '/Where/Ever':
163             self.fail_id += 1
164             print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
165             s = ("report %d: expected signal from /Where/Ever, got %r"
166                  % (self.fail_id, sender_path))
167             print s
168             logger.error(s)
169         elif param != 42:
170             self.fail_id += 1
171             print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
172             s = ("report %d: expected signal param 42, got %r"
173                  % (self.fail_id, parameter))
174             print s
175             logger.error(s)
176         else:
177             print "%s.Trigger pass" % INTERFACE_TESTS
178
179     def trigger_returned_cb(self):
180         logger.info('method/signal: Trigger() returned')
181         # Callback tests
182         logger.info("signal/callback: Emitting signal to trigger callback")
183         self.expected.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
184         self.Trigger(UInt16(42), 23.0)
185         logger.info("signal/callback: Emitting signal returned")
186
187     def run_client(self):
188         bus = SessionBus()
189         obj = bus.get_object(CROSS_TEST_BUS_NAME, CROSS_TEST_PATH)
190         self.obj = obj
191
192         self.run_synchronous_tests(obj)
193
194         # Signal tests
195         logger.info("Binding signal handler for Triggered")
196         # FIXME: doesn't seem to work when going via the Interface method
197         # FIXME: should be possible to ask the proxy object for its
198         # bus name
199         bus.add_signal_receiver(self.triggered_cb, 'Triggered',
200                                 INTERFACE_SIGNAL_TESTS,
201                                 CROSS_TEST_BUS_NAME,
202                                 path_keyword='sender_path')
203         logger.info("method/signal: Triggering signal")
204         self.expected.add('%s.Trigger' % INTERFACE_TESTS)
205         Interface(obj, INTERFACE_TESTS).Trigger(u'/Where/Ever', dbus.UInt64(42), reply_handler=self.trigger_returned_cb, error_handler=self.trigger_error_handler)
206
207     def trigger_error_handler(self, e):
208         logger.error("method/signal: %s %s", e.__class__, e)
209         Interface(self.obj, INTERFACE_TESTS).Exit()
210         self.quit()
211
212     def run_synchronous_tests(self, obj):
213         # We can't test that coercion works correctly unless the server has
214         # sent us introspection data. Java doesn't :-/
215         have_signatures = True
216
217         # "Single tests"
218         if have_signatures:
219             self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [1, 2, 3])
220             self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ['\x01', '\x02', '\x03'])
221         self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
222         self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ByteArray('\x01\x02\x03'))
223
224         # Main tests
225         self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', String('foo'))
226         self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', UTF8String('foo'))
227         self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
228         self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=23), 'Identity', Byte(42, variant_level=23))
229         self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', 42.5)
230         self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
231
232         if have_signatures:
233             self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', 'foo')
234             self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
235             self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', Double(42.5))
236             self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
237
238         for i in (0, 42, 255):
239             self.assert_method_eq(INTERFACE_TESTS, Byte(i), 'IdentityByte', Byte(i))
240         for i in (True, False):
241             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityBool', i)
242
243         for i in (-0x8000, 0, 42, 0x7fff):
244             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt16', Int16(i))
245         for i in (0, 42, 0xffff):
246             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt16', UInt16(i))
247         for i in (-0x7fffffff-1, 0, 42, 0x7fffffff):
248             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt32', Int32(i))
249         for i in (0L, 42L, 0xffffffffL):
250             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt32', UInt32(i))
251         MANY = 0x8000L * 0x10000L * 0x10000L * 0x10000L
252         for i in (-MANY, 0, 42, MANY-1):
253             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt64', Int64(i))
254         for i in (0, 42, 2*MANY - 1):
255             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt64', UInt64(i))
256
257         self.assert_method_eq(INTERFACE_TESTS, 42.3, 'IdentityDouble', 42.3)
258         for i in ('', 'foo'):
259             self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityString', i)
260         for i in (u'\xa9', '\xc2\xa9'):
261             self.assert_method_eq(INTERFACE_TESTS, u'\xa9', 'IdentityString', i)
262
263         if have_signatures:
264             self.assert_method_eq(INTERFACE_TESTS, Byte(0x42), 'IdentityByte', '\x42')
265             self.assert_method_eq(INTERFACE_TESTS, True, 'IdentityBool', 42)
266             self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt16', 42)
267             self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt16', 42)
268             self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt32', 42)
269             self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt32', 42)
270             self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt64', 42)
271             self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt64', 42)
272             self.assert_method_eq(INTERFACE_TESTS, 42.0, 'IdentityDouble', 42)
273
274         self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
275                                                 Byte('\x02', variant_level=1),
276                                                 Byte('\x03', variant_level=1)],
277                                                'IdentityArray',
278                                                Array([Byte('\x01'),
279                                                       Byte('\x02'),
280                                                       Byte('\x03')],
281                                                      signature='v'))
282
283         self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
284                                                 Int32(2, variant_level=1),
285                                                 Int32(3, variant_level=1)],
286                                                'IdentityArray',
287                                                Array([Int32(1),
288                                                       Int32(2),
289                                                       Int32(3)],
290                                                      signature='v'))
291         self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
292                                                 String(u'b', variant_level=1),
293                                                 String(u'c', variant_level=1)],
294                                                'IdentityArray',
295                                                Array([String('a'),
296                                                       String('b'),
297                                                       String('c')],
298                                                      signature='v'))
299
300         if have_signatures:
301             self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
302                                                     Byte('\x02', variant_level=1),
303                                                     Byte('\x03', variant_level=1)],
304                                                    'IdentityArray',
305                                                    ByteArray('\x01\x02\x03'))
306             self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
307                                                     Int32(2, variant_level=1),
308                                                     Int32(3, variant_level=1)],
309                                                    'IdentityArray',
310                                                    [Int32(1),
311                                                     Int32(2),
312                                                     Int32(3)])
313             self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
314                                                     String(u'b', variant_level=1),
315                                                     String(u'c', variant_level=1)],
316                                                    'IdentityArray',
317                                                    ['a','b','c'])
318
319         self.assert_method_eq(INTERFACE_TESTS,
320                               [Byte(1), Byte(2), Byte(3)],
321                               'IdentityByteArray',
322                               ByteArray('\x01\x02\x03'))
323         if have_signatures:
324             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
325         self.assert_method_eq(INTERFACE_TESTS, [False,True], 'IdentityBoolArray', [False,True])
326         if have_signatures:
327             self.assert_method_eq(INTERFACE_TESTS, [False,True,True], 'IdentityBoolArray', [0,1,2])
328
329         self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
330         self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
331         self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
332         self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
333         self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
334         self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
335
336         if have_signatures:
337             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [1,2,3])
338             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [1,2,3])
339             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [1,2,3])
340             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [1,2,3])
341             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [1,2,3])
342             self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [1,2,3])
343
344         self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
345         if have_signatures:
346             self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
347         self.assert_method_eq(INTERFACE_TESTS, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
348         self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
349         if have_signatures:
350             self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [1,2,3])
351
352         self.assert_InvertMapping_eq(INTERFACE_TESTS, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
353
354         self.assert_method_eq(INTERFACE_TESTS, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
355         self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
356                               'Primitize', [String('x', variant_level=1)])
357         self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
358                               'Primitize', [String('x', variant_level=23)])
359         self.assert_method_eq(INTERFACE_TESTS,
360                               Array([String('x', variant_level=1),
361                                Byte(1, variant_level=1),
362                                Byte(2, variant_level=1)]),
363                               'Primitize',
364                               Array([String('x'), Byte(1), Byte(2)],
365                                     signature='v'))
366         self.assert_method_eq(INTERFACE_TESTS,
367                               Array([String('x', variant_level=1),
368                                Byte(1, variant_level=1),
369                                Byte(2, variant_level=1)]),
370                               'Primitize',
371                               Array([String('x'), Array([Byte(1), Byte(2)])],
372                                     signature='v'))
373         self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', True)
374         self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', False)
375         if have_signatures:
376             self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', 42)
377             self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', 0)
378
379
380 if __name__ == '__main__':
381     # FIXME: should be possible to export objects without a bus name
382     if 0:
383         client = Client(dbus.SessionBus(), '/Client')
384     else:
385         # the Java cross test's interpretation is that the client should be
386         # at /Test too
387         client = Client(dbus.SessionBus(), '/Test')
388     gobject.idle_add(client.run_client)
389
390     loop = gobject.MainLoop()
391     logger.info("running...")
392     loop.run()
393     logger.info("main loop exited.")