Imported Upstream version 3.3.1
[platform/upstream/pygobject2.git] / tests / test_gdbus.py
1 # -*- Mode: Python; py-indent-offset: 4 -*-
2 # vim: tabstop=4 shiftwidth=4 expandtab
3
4 import unittest
5
6 import sys
7 sys.path.insert(0, "../")
8
9 from gi.repository import GObject
10 from gi.repository import GLib
11 from gi.repository import Gio
12
13
14 class TestGDBusClient(unittest.TestCase):
15     def setUp(self):
16         self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
17
18         self.dbus_proxy = Gio.DBusProxy.new_sync(self.bus,
19                 Gio.DBusProxyFlags.NONE, None,
20                 'org.freedesktop.DBus',
21                 '/org/freedesktop/DBus',
22                 'org.freedesktop.DBus', None)
23
24     def test_native_calls_sync(self):
25         result = self.dbus_proxy.call_sync('ListNames', None,
26                 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
27         self.assertTrue(isinstance(result, GLib.Variant))
28         result = result.unpack()[0]  # result is always a tuple
29         self.assertTrue(len(result) > 1)
30         self.assertTrue('org.freedesktop.DBus' in result)
31
32         result = self.dbus_proxy.call_sync('GetNameOwner',
33                 GLib.Variant('(s)', ('org.freedesktop.DBus',)),
34                 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
35         self.assertTrue(isinstance(result, GLib.Variant))
36         self.assertEqual(type(result.unpack()[0]), type(''))
37
38     def test_native_calls_sync_errors(self):
39         # error case: invalid argument types
40         try:
41             self.dbus_proxy.call_sync('GetConnectionUnixProcessID', None,
42                     Gio.DBusCallFlags.NO_AUTO_START, 500, None)
43             self.fail('call with invalid arguments should raise an exception')
44         except Exception as e:
45             self.assertTrue('InvalidArgs' in str(e))
46
47         # error case: invalid argument
48         try:
49             self.dbus_proxy.call_sync('GetConnectionUnixProcessID',
50                     GLib.Variant('(s)', (' unknown',)),
51                     Gio.DBusCallFlags.NO_AUTO_START, 500, None)
52             self.fail('call with invalid arguments should raise an exception')
53         except Exception as e:
54             self.assertTrue('NameHasNoOwner' in str(e))
55
56         # error case: unknown method
57         try:
58             self.dbus_proxy.call_sync('UnknownMethod', None,
59                     Gio.DBusCallFlags.NO_AUTO_START, 500, None)
60             self.fail('call for unknown method should raise an exception')
61         except Exception as e:
62             self.assertTrue('UnknownMethod' in str(e))
63
64     def test_native_calls_async(self):
65         def call_done(obj, result, user_data):
66             try:
67                 user_data['result'] = obj.call_finish(result)
68             finally:
69                 user_data['main_loop'].quit()
70
71         main_loop = GObject.MainLoop()
72         data = {'main_loop': main_loop}
73         self.dbus_proxy.call('ListNames', None,
74                 Gio.DBusCallFlags.NO_AUTO_START, 500, None,
75                 call_done, data)
76         main_loop.run()
77
78         self.assertTrue(isinstance(data['result'], GLib.Variant))
79         result = data['result'].unpack()[0]  # result is always a tuple
80         self.assertTrue(len(result) > 1)
81         self.assertTrue('org.freedesktop.DBus' in result)
82
83     def test_native_calls_async_errors(self):
84         def call_done(obj, result, user_data):
85             try:
86                 obj.call_finish(result)
87                 self.fail('call_finish() for unknown method should raise an exception')
88             except Exception as e:
89                 self.assertTrue('UnknownMethod' in str(e))
90             finally:
91                 user_data['main_loop'].quit()
92
93         main_loop = GObject.MainLoop()
94         data = {'main_loop': main_loop}
95         self.dbus_proxy.call('UnknownMethod', None,
96                 Gio.DBusCallFlags.NO_AUTO_START, 500, None, call_done, data)
97         main_loop.run()
98
99     def test_python_calls_sync(self):
100         # single value return tuples get unboxed to the one element
101         result = self.dbus_proxy.ListNames('()')
102         self.assertTrue(isinstance(result, list))
103         self.assertTrue(len(result) > 1)
104         self.assertTrue('org.freedesktop.DBus' in result)
105
106         result = self.dbus_proxy.GetNameOwner('(s)', 'org.freedesktop.DBus')
107         self.assertEqual(type(result), type(''))
108
109         # empty return tuples get unboxed to None
110         self.assertEqual(self.dbus_proxy.ReloadConfig('()'), None)
111
112         # multiple return values remain a tuple; unfortunately D-BUS itself
113         # does not have any method returning multiple results, so try talking
114         # to notification-daemon (and don't fail the test if it does not exist)
115         try:
116             notification_daemon = Gio.DBusProxy.new_sync(self.bus,
117                     Gio.DBusProxyFlags.NONE, None,
118                     'org.freedesktop.Notifications',
119                     '/org/freedesktop/Notifications',
120                     'org.freedesktop.Notifications', None)
121             result = notification_daemon.GetServerInformation('()')
122             self.assertTrue(isinstance(result, tuple))
123             self.assertEqual(len(result), 4)
124             for i in result:
125                 self.assertEqual(type(i), type(''))
126         except Exception as e:
127             if 'Error.ServiceUnknown' not in str(e):
128                 raise
129
130         # test keyword argument; timeout=0 will fail immediately
131         try:
132             self.dbus_proxy.GetConnectionUnixProcessID('(s)', '1', timeout=0)
133             self.fail('call with timeout=0 should raise an exception')
134         except Exception as e:
135             self.assertTrue('Timeout' in str(e), str(e))
136
137     def test_python_calls_sync_noargs(self):
138         # methods without arguments don't need an explicit signature
139         result = self.dbus_proxy.ListNames()
140         self.assertTrue(isinstance(result, list))
141         self.assertTrue(len(result) > 1)
142         self.assertTrue('org.freedesktop.DBus' in result)
143
144     def test_python_calls_sync_errors(self):
145         # error case: invalid argument types
146         try:
147             self.dbus_proxy.GetConnectionUnixProcessID('()')
148             self.fail('call with invalid arguments should raise an exception')
149         except Exception as e:
150             self.assertTrue('InvalidArgs' in str(e), str(e))
151
152         try:
153             self.dbus_proxy.GetConnectionUnixProcessID(None, 'foo')
154             self.fail('call with None signature should raise an exception')
155         except TypeError as e:
156             self.assertTrue('signature' in str(e), str(e))
157
158     def test_python_calls_async(self):
159         def call_done(obj, result, user_data):
160             user_data['result'] = result
161             user_data['main_loop'].quit()
162
163         main_loop = GObject.MainLoop()
164         data = {'main_loop': main_loop}
165         self.dbus_proxy.ListNames('()', result_handler=call_done,
166                 user_data=data)
167         main_loop.run()
168
169         result = data['result']
170         self.assertEqual(type(result), type([]))
171         self.assertTrue(len(result) > 1)
172         self.assertTrue('org.freedesktop.DBus' in result)
173
174     def test_python_calls_async_error_result(self):
175         # when only specifying a result handler, this will get the error
176         def call_done(obj, result, user_data):
177             user_data['result'] = result
178             user_data['main_loop'].quit()
179
180         main_loop = GObject.MainLoop()
181         data = {'main_loop': main_loop}
182         self.dbus_proxy.ListNames('(s)', 'invalid_argument',
183                 result_handler=call_done, user_data=data)
184         main_loop.run()
185
186         self.assertTrue(isinstance(data['result'], Exception))
187         self.assertTrue('InvalidArgs' in str(data['result']), str(data['result']))
188
189     def test_python_calls_async_error(self):
190         # when specifying an explicit error handler, this will get the error
191         def call_done(obj, result, user_data):
192             user_data['main_loop'].quit()
193             self.fail('result handler should not be called')
194
195         def call_error(obj, error, user_data):
196             user_data['error'] = error
197             user_data['main_loop'].quit()
198
199         main_loop = GObject.MainLoop()
200         data = {'main_loop': main_loop}
201         self.dbus_proxy.ListNames('(s)', 'invalid_argument',
202                 result_handler=call_done, error_handler=call_error,
203                 user_data=data)
204         main_loop.run()
205
206         self.assertTrue(isinstance(data['error'], Exception))
207         self.assertTrue('InvalidArgs' in str(data['error']), str(data['error']))