Branched from 2.0alpha and pushed for 2.0
[profile/ivi/dbus-python.git] / test / test-signals.py
1 #!/usr/bin/env python
2
3 # Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
4 # Copyright (C) 2005-2007 Collabora Ltd. <http://www.collabora.co.uk/>
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation
8 # files (the "Software"), to deal in the Software without
9 # restriction, including without limitation the rights to use, copy,
10 # modify, merge, publish, distribute, sublicense, and/or sell copies
11 # of the Software, and to permit persons to whom the Software is
12 # furnished to do so, subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
21 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
22 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 # DEALINGS IN THE SOFTWARE.
25
26 import sys
27 import os
28 import unittest
29 import time
30 import logging
31
32 builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"])
33 pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"])
34
35 import dbus
36 import _dbus_bindings
37 import gobject
38 import dbus.glib
39 import dbus.service
40
41
42 logging.basicConfig()
43 logging.getLogger().setLevel(1)
44 logger = logging.getLogger('test-signals')
45
46
47 pkg = dbus.__file__
48 if not pkg.startswith(pydir):
49     raise Exception("DBus modules (%s) are not being picked up from the package"%pkg)
50
51 if not _dbus_bindings.__file__.startswith(builddir):
52     raise Exception("DBus modules (%s) are not being picked up from the package"%_dbus_bindings.__file__)
53
54
55 NAME = "org.freedesktop.DBus.TestSuitePythonService"
56 IFACE = "org.freedesktop.DBus.TestSuiteInterface"
57 OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject"
58
59
60 class TestSignals(unittest.TestCase):
61     def setUp(self):
62         logger.info('setUp()')
63         self.bus = dbus.SessionBus()
64         self.remote_object = self.bus.get_object(NAME, OBJECT)
65         self.remote_object_fallback_trivial = self.bus.get_object(NAME,
66                 OBJECT + '/Fallback')
67         self.remote_object_fallback = self.bus.get_object(NAME,
68                 OBJECT + '/Fallback/Badger')
69         self.remote_object_follow = self.bus.get_object(NAME, OBJECT,
70                 follow_name_owner_changes=True)
71         self.iface = dbus.Interface(self.remote_object, IFACE)
72         self.iface_follow = dbus.Interface(self.remote_object_follow, IFACE)
73         self.fallback_iface = dbus.Interface(self.remote_object_fallback, IFACE)
74         self.fallback_trivial_iface = dbus.Interface(
75                 self.remote_object_fallback_trivial, IFACE)
76         self.in_test = None
77
78     def signal_test_impl(self, iface, name, test_removal=False):
79         self.in_test = name
80         # using append rather than assignment here to avoid scoping issues
81         result = []
82
83         def _timeout_handler():
84             logger.debug('_timeout_handler for %s: current state %s', name, self.in_test)
85             if self.in_test == name:
86                 main_loop.quit()
87         def _signal_handler(s, sender, path):
88             logger.debug('_signal_handler for %s: current state %s', name, self.in_test)
89             if self.in_test not in (name, name + '+removed'):
90                 return
91             logger.info('Received signal from %s:%s, argument is %r',
92                         sender, path, s)
93             result.append('received')
94             main_loop.quit()
95         def _rm_timeout_handler():
96             logger.debug('_timeout_handler for %s: current state %s', name, self.in_test)
97             if self.in_test == name + '+removed':
98                 main_loop.quit()
99
100         logger.info('Testing %s', name)
101         match = iface.connect_to_signal('SignalOneString', _signal_handler,
102                                         sender_keyword='sender',
103                                         path_keyword='path')
104         logger.info('Waiting for signal...')
105         iface.EmitSignal('SignalOneString', 0)
106         source_id = gobject.timeout_add(1000, _timeout_handler)
107         main_loop.run()
108         if not result:
109             raise AssertionError('Signal did not arrive within 1 second')
110         logger.debug('Removing match')
111         match.remove()
112         gobject.source_remove(source_id)
113         if test_removal:
114             self.in_test = name + '+removed'
115             logger.info('Testing %s', name)
116             result = []
117             iface.EmitSignal('SignalOneString', 0)
118             source_id = gobject.timeout_add(1000, _rm_timeout_handler)
119             main_loop.run()
120             if result:
121                 raise AssertionError('Signal should not have arrived, but did')
122             gobject.source_remove(source_id)
123
124     def testFallback(self):
125         self.signal_test_impl(self.fallback_iface, 'Fallback')
126
127     def testFallbackTrivial(self):
128         self.signal_test_impl(self.fallback_trivial_iface, 'FallbackTrivial')
129
130     def testSignal(self):
131         self.signal_test_impl(self.iface, 'Signal')
132
133     def testRemoval(self):
134         self.signal_test_impl(self.iface, 'Removal', True)
135
136     def testSignalAgain(self):
137         self.signal_test_impl(self.iface, 'SignalAgain')
138
139     def testRemovalAgain(self):
140         self.signal_test_impl(self.iface, 'RemovalAgain', True)
141
142     def testSignalF(self):
143         self.signal_test_impl(self.iface_follow, 'Signal')
144
145     def testRemovalF(self):
146         self.signal_test_impl(self.iface_follow, 'Removal', True)
147
148     def testSignalAgainF(self):
149         self.signal_test_impl(self.iface_follow, 'SignalAgain')
150
151     def testRemovalAgainF(self):
152         self.signal_test_impl(self.iface_follow, 'RemovalAgain', True)
153
154 if __name__ == '__main__':
155     main_loop = gobject.MainLoop()
156     gobject.threads_init()
157     dbus.glib.init_threads()
158
159     logger.info('Starting unit test')
160     unittest.main()