1 # -*- Mode: Python; py-indent-offset: 4 -*-
2 # vim: tabstop=4 shiftwidth=4 expandtab
7 sys.path.insert(0, "../")
9 from gi.repository import GObject
10 from gi.repository import GLib
11 from gi.repository import Gio
14 class TestGDBusClient(unittest.TestCase):
16 self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
18 self.dbus_proxy = Gio.DBusProxy.new_sync(self.bus,
19 Gio.DBusProxyFlags.NONE, None,
20 'org.freedesktop.DBus',
21 '/org/freedesktop/DBus',
22 'org.freedesktop.DBus', None)
24 def test_native_calls_sync(self):
25 result = self.dbus_proxy.call_sync('ListNames', None,
26 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
27 self.assertTrue(isinstance(result, GLib.Variant))
28 result = result.unpack()[0] # result is always a tuple
29 self.assertTrue(len(result) > 1)
30 self.assertTrue('org.freedesktop.DBus' in result)
32 result = self.dbus_proxy.call_sync('GetNameOwner',
33 GLib.Variant('(s)', ('org.freedesktop.DBus',)),
34 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
35 self.assertTrue(isinstance(result, GLib.Variant))
36 self.assertEqual(type(result.unpack()[0]), type(''))
38 def test_native_calls_sync_errors(self):
39 # error case: invalid argument types
41 self.dbus_proxy.call_sync('GetConnectionUnixProcessID', None,
42 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
43 self.fail('call with invalid arguments should raise an exception')
44 except Exception as e:
45 self.assertTrue('InvalidArgs' in str(e))
47 # error case: invalid argument
49 self.dbus_proxy.call_sync('GetConnectionUnixProcessID',
50 GLib.Variant('(s)', (' unknown',)),
51 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
52 self.fail('call with invalid arguments should raise an exception')
53 except Exception as e:
54 self.assertTrue('NameHasNoOwner' in str(e))
56 # error case: unknown method
58 self.dbus_proxy.call_sync('UnknownMethod', None,
59 Gio.DBusCallFlags.NO_AUTO_START, 500, None)
60 self.fail('call for unknown method should raise an exception')
61 except Exception as e:
62 self.assertTrue('UnknownMethod' in str(e))
64 def test_native_calls_async(self):
65 def call_done(obj, result, user_data):
67 user_data['result'] = obj.call_finish(result)
69 user_data['main_loop'].quit()
71 main_loop = GObject.MainLoop()
72 data = {'main_loop': main_loop}
73 self.dbus_proxy.call('ListNames', None,
74 Gio.DBusCallFlags.NO_AUTO_START, 500, None,
78 self.assertTrue(isinstance(data['result'], GLib.Variant))
79 result = data['result'].unpack()[0] # result is always a tuple
80 self.assertTrue(len(result) > 1)
81 self.assertTrue('org.freedesktop.DBus' in result)
83 def test_native_calls_async_errors(self):
84 def call_done(obj, result, user_data):
86 obj.call_finish(result)
87 self.fail('call_finish() for unknown method should raise an exception')
88 except Exception as e:
89 self.assertTrue('UnknownMethod' in str(e))
91 user_data['main_loop'].quit()
93 main_loop = GObject.MainLoop()
94 data = {'main_loop': main_loop}
95 self.dbus_proxy.call('UnknownMethod', None,
96 Gio.DBusCallFlags.NO_AUTO_START, 500, None, call_done, data)
99 def test_python_calls_sync(self):
100 # single value return tuples get unboxed to the one element
101 result = self.dbus_proxy.ListNames('()')
102 self.assertTrue(isinstance(result, list))
103 self.assertTrue(len(result) > 1)
104 self.assertTrue('org.freedesktop.DBus' in result)
106 result = self.dbus_proxy.GetNameOwner('(s)', 'org.freedesktop.DBus')
107 self.assertEqual(type(result), type(''))
109 # empty return tuples get unboxed to None
110 self.assertEqual(self.dbus_proxy.ReloadConfig('()'), None)
112 # multiple return values remain a tuple; unfortunately D-BUS itself
113 # does not have any method returning multiple results, so try talking
114 # to notification-daemon (and don't fail the test if it does not exist)
116 notification_daemon = Gio.DBusProxy.new_sync(self.bus,
117 Gio.DBusProxyFlags.NONE, None,
118 'org.freedesktop.Notifications',
119 '/org/freedesktop/Notifications',
120 'org.freedesktop.Notifications', None)
121 result = notification_daemon.GetServerInformation('()')
122 self.assertTrue(isinstance(result, tuple))
123 self.assertEqual(len(result), 4)
125 self.assertEqual(type(i), type(''))
126 except Exception as e:
127 if 'Error.ServiceUnknown' not in str(e):
130 # test keyword argument; timeout=0 will fail immediately
132 self.dbus_proxy.GetConnectionUnixProcessID('(s)', '1', timeout=0)
133 self.fail('call with timeout=0 should raise an exception')
134 except Exception as e:
135 self.assertTrue('Timeout' in str(e), str(e))
137 def test_python_calls_sync_noargs(self):
138 # methods without arguments don't need an explicit signature
139 result = self.dbus_proxy.ListNames()
140 self.assertTrue(isinstance(result, list))
141 self.assertTrue(len(result) > 1)
142 self.assertTrue('org.freedesktop.DBus' in result)
144 def test_python_calls_sync_errors(self):
145 # error case: invalid argument types
147 self.dbus_proxy.GetConnectionUnixProcessID('()')
148 self.fail('call with invalid arguments should raise an exception')
149 except Exception as e:
150 self.assertTrue('InvalidArgs' in str(e), str(e))
153 self.dbus_proxy.GetConnectionUnixProcessID(None, 'foo')
154 self.fail('call with None signature should raise an exception')
155 except TypeError as e:
156 self.assertTrue('signature' in str(e), str(e))
158 def test_python_calls_async(self):
159 def call_done(obj, result, user_data):
160 user_data['result'] = result
161 user_data['main_loop'].quit()
163 main_loop = GObject.MainLoop()
164 data = {'main_loop': main_loop}
165 self.dbus_proxy.ListNames('()', result_handler=call_done,
169 result = data['result']
170 self.assertEqual(type(result), type([]))
171 self.assertTrue(len(result) > 1)
172 self.assertTrue('org.freedesktop.DBus' in result)
174 def test_python_calls_async_error_result(self):
175 # when only specifying a result handler, this will get the error
176 def call_done(obj, result, user_data):
177 user_data['result'] = result
178 user_data['main_loop'].quit()
180 main_loop = GObject.MainLoop()
181 data = {'main_loop': main_loop}
182 self.dbus_proxy.ListNames('(s)', 'invalid_argument',
183 result_handler=call_done, user_data=data)
186 self.assertTrue(isinstance(data['result'], Exception))
187 self.assertTrue('InvalidArgs' in str(data['result']), str(data['result']))
189 def test_python_calls_async_error(self):
190 # when specifying an explicit error handler, this will get the error
191 def call_done(obj, result, user_data):
192 user_data['main_loop'].quit()
193 self.fail('result handler should not be called')
195 def call_error(obj, error, user_data):
196 user_data['error'] = error
197 user_data['main_loop'].quit()
199 main_loop = GObject.MainLoop()
200 data = {'main_loop': main_loop}
201 self.dbus_proxy.ListNames('(s)', 'invalid_argument',
202 result_handler=call_done, error_handler=call_error,
206 self.assertTrue(isinstance(data['error'], Exception))
207 self.assertTrue('InvalidArgs' in str(data['error']), str(data['error']))