Imported Upstream version 2.26.0
[platform/upstream/pygobject2.git] / tests / test_signal.py
1 # -*- Mode: Python -*-
2
3 import gc
4 import unittest
5 import sys
6
7 import gobject
8 import testhelper
9 from compathelper import _long
10
11 class C(gobject.GObject):
12     __gsignals__ = { 'my_signal': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
13                                    (gobject.TYPE_INT,)) }
14     def do_my_signal(self, arg):
15         self.arg = arg
16
17 class D(C):
18     def do_my_signal(self, arg2):
19         self.arg2 = arg2
20         C.do_my_signal(self, arg2)
21
22 class TestSignalCreation(unittest.TestCase):
23     # Bug 540376.
24     def test_illegals(self):
25         self.assertRaises(TypeError, lambda: gobject.signal_new('test',
26                                                                 None,
27                                                                 0,
28                                                                 gobject.TYPE_NONE,
29                                                                 (gobject.TYPE_LONG,)))
30
31
32 class TestChaining(unittest.TestCase):
33     def setUp(self):
34         self.inst = C()
35         self.inst.connect("my_signal", self.my_signal_handler_cb, 1, 2, 3)
36
37     def my_signal_handler_cb(self, *args):
38         assert len(args) == 5
39         assert isinstance(args[0], C)
40         assert args[0] == self.inst
41
42         assert isinstance(args[1], int)
43         assert args[1] == 42
44
45         assert args[2:] == (1, 2, 3)
46
47     def testChaining(self):
48         self.inst.emit("my_signal", 42)
49         assert self.inst.arg == 42
50
51     def testChaining(self):
52         inst2 = D()
53         inst2.emit("my_signal", 44)
54         assert inst2.arg == 44
55         assert inst2.arg2 == 44
56
57 # This is for bug 153718
58 class TestGSignalsError(unittest.TestCase):
59     def testInvalidType(self, *args):
60         def foo():
61             class Foo(gobject.GObject):
62                 __gsignals__ = None
63         self.assertRaises(TypeError, foo)
64         gc.collect()
65
66     def testInvalidName(self, *args):
67         def foo():
68             class Foo(gobject.GObject):
69                 __gsignals__ = {'not-exists' : 'override'}
70         self.assertRaises(TypeError, foo)
71         gc.collect()
72
73 class TestGPropertyError(unittest.TestCase):
74     def testInvalidType(self, *args):
75         def foo():
76             class Foo(gobject.GObject):
77                 __gproperties__ = None
78         self.assertRaises(TypeError, foo)
79         gc.collect()
80
81     def testInvalidName(self, *args):
82         def foo():
83             class Foo(gobject.GObject):
84                 __gproperties__ = { None: None }
85
86         self.assertRaises(TypeError, foo)
87         gc.collect()
88
89 class TestList(unittest.TestCase):
90     def testListObject(self):
91         self.assertEqual(gobject.signal_list_names(C), ('my-signal',))
92
93
94 def my_accumulator(ihint, return_accu, handler_return, user_data):
95     """An accumulator that stops emission when the sum of handler
96     returned values reaches 3"""
97     assert user_data == "accum data"
98     if return_accu >= 3:
99         return False, return_accu
100     return True, return_accu + handler_return
101
102 class Foo(gobject.GObject):
103     __gsignals__ = {
104         'my-acc-signal': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_INT,
105                                    (), my_accumulator, "accum data"),
106         'my-other-acc-signal': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_BOOLEAN,
107                                 (), gobject.signal_accumulator_true_handled)
108         }
109
110 class TestAccumulator(unittest.TestCase):
111
112     def testAccumulator(self):
113         inst = Foo()
114         inst.connect("my-acc-signal", lambda obj: 1)
115         inst.connect("my-acc-signal", lambda obj: 2)
116         ## the value returned in the following handler will not be
117         ## considered, because at this point the accumulator already
118         ## reached its limit.
119         inst.connect("my-acc-signal", lambda obj: 3)
120         retval = inst.emit("my-acc-signal")
121         self.assertEqual(retval, 3)
122
123     def testAccumulatorTrueHandled(self):
124         inst = Foo()
125         inst.connect("my-other-acc-signal", self._true_handler1)
126         inst.connect("my-other-acc-signal", self._true_handler2)
127         ## the following handler will not be called because handler2
128         ## returns True, so it should stop the emission.
129         inst.connect("my-other-acc-signal", self._true_handler3)
130         self.__true_val = None
131         inst.emit("my-other-acc-signal")
132         self.assertEqual(self.__true_val, 2)
133
134     def _true_handler1(self, obj):
135         self.__true_val = 1
136         return False
137     def _true_handler2(self, obj):
138         self.__true_val = 2
139         return True
140     def _true_handler3(self, obj):
141         self.__true_val = 3
142         return False
143
144 class E(gobject.GObject):
145     __gsignals__ = { 'signal': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
146                                 ()) }
147     def __init__(self):
148         gobject.GObject.__init__(self)
149         self.status = 0
150
151     def do_signal(self):
152         assert self.status == 0
153         self.status = 1
154
155 class F(gobject.GObject):
156     __gsignals__ = { 'signal': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
157                                 ()) }
158     def __init__(self):
159         gobject.GObject.__init__(self)
160         self.status = 0
161
162     def do_signal(self):
163         self.status += 1
164
165 class TestEmissionHook(unittest.TestCase):
166     def testAdd(self):
167         self.hook = True
168         e = E()
169         e.connect('signal', self._callback)
170         gobject.add_emission_hook(E, "signal", self._emission_hook)
171         e.emit('signal')
172         self.assertEqual(e.status, 3)
173
174     def testRemove(self):
175         self.hook = False
176         e = E()
177         e.connect('signal', self._callback)
178         hook_id = gobject.add_emission_hook(E, "signal", self._emission_hook)
179         gobject.remove_emission_hook(E, "signal", hook_id)
180         e.emit('signal')
181         self.assertEqual(e.status, 3)
182
183     def _emission_hook(self, e):
184         self.assertEqual(e.status, 1)
185         e.status = 2
186
187     def _callback(self, e):
188         if self.hook:
189             self.assertEqual(e.status, 2)
190         else:
191             self.assertEqual(e.status, 1)
192         e.status = 3
193
194     def testCallbackReturnFalse(self):
195         self.hook = False
196         obj = F()
197         def _emission_hook(obj):
198             obj.status += 1
199             return False
200         hook_id = gobject.add_emission_hook(obj, "signal", _emission_hook)
201         obj.emit('signal')
202         obj.emit('signal')
203         self.assertEqual(obj.status, 3)
204
205     def testCallbackReturnTrue(self):
206         self.hook = False
207         obj = F()
208         def _emission_hook(obj):
209             obj.status += 1
210             return True
211         hook_id = gobject.add_emission_hook(obj, "signal", _emission_hook)
212         obj.emit('signal')
213         obj.emit('signal')
214         gobject.remove_emission_hook(obj, "signal", hook_id)
215         self.assertEqual(obj.status, 4)
216
217     def testCallbackReturnTrueButRemove(self):
218         self.hook = False
219         obj = F()
220         def _emission_hook(obj):
221             obj.status += 1
222             return True
223         hook_id = gobject.add_emission_hook(obj, "signal", _emission_hook)
224         obj.emit('signal')
225         gobject.remove_emission_hook(obj, "signal", hook_id)
226         obj.emit('signal')
227         self.assertEqual(obj.status, 3)
228
229 class TestClosures(unittest.TestCase):
230     def setUp(self):
231         self.count = 0
232
233     def _callback(self, e):
234         self.count += 1
235
236     def testDisconnect(self):
237         e = E()
238         e.connect('signal', self._callback)
239         e.disconnect_by_func(self._callback)
240         e.emit('signal')
241         self.assertEqual(self.count, 0)
242
243     def testHandlerBlock(self):
244         e = E()
245         e.connect('signal', self._callback)
246         e.handler_block_by_func(self._callback)
247         e.emit('signal')
248         self.assertEqual(self.count, 0)
249
250     def testHandlerUnBlock(self):
251         e = E()
252         signal_id = e.connect('signal', self._callback)
253         e.handler_block(signal_id)
254         e.handler_unblock_by_func(self._callback)
255         e.emit('signal')
256         self.assertEqual(self.count, 1)
257
258     def testHandlerBlockMethod(self):
259         # Filed as #375589
260         class A:
261             def __init__(self):
262                 self.a = 0
263
264             def callback(self, o):
265                 self.a = 1
266                 o.handler_block_by_func(self.callback)
267
268         inst = A()
269         e = E()
270         e.connect("signal", inst.callback)
271         e.emit('signal')
272         self.assertEqual(inst.a, 1)
273         gc.collect()
274
275     def testGString(self):
276         class C(gobject.GObject):
277             __gsignals__ = { 'my_signal': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_GSTRING,
278                                            (gobject.TYPE_GSTRING,)) }
279             def __init__(self, test):
280                 gobject.GObject.__init__(self)
281                 self.test = test
282             def do_my_signal(self, data):
283                 self.data = data
284                 self.test.assertEqual(len(data), 3)
285                 return ''.join([data[2], data[1], data[0]])
286         c = C(self)
287         data = c.emit("my_signal", "\01\00\02")
288         self.assertEqual(data, "\02\00\01")
289
290 class SigPropClass(gobject.GObject):
291     __gsignals__ = { 'my_signal': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
292                                    (gobject.TYPE_INT,)) }
293
294     __gproperties__ = {
295         'foo': (str, None, None, '', gobject.PARAM_WRITABLE|gobject.PARAM_CONSTRUCT),
296         }
297
298     signal_emission_failed = False
299
300     def do_my_signal(self, arg):
301         self.arg = arg
302
303     def do_set_property(self, pspec, value):
304         if pspec.name == 'foo':
305             self._foo = value
306         else:
307             raise AttributeError('unknown property %s' % pspec.name)
308         try:
309             self.emit("my-signal", 1)
310         except TypeError:
311             self.signal_emission_failed = True
312
313
314 class TestSigProp(unittest.TestCase):
315     def testEmitInPropertySetter(self):
316         obj = SigPropClass()
317         self.failIf(obj.signal_emission_failed)
318
319 f = gobject.SIGNAL_RUN_FIRST
320 l = gobject.SIGNAL_RUN_LAST
321 float = gobject.TYPE_FLOAT
322 double = gobject.TYPE_DOUBLE
323 uint = gobject.TYPE_UINT
324 ulong = gobject.TYPE_ULONG
325
326 class CM(gobject.GObject):
327     __gsignals__ = dict(
328         test1=(f, None, ()),
329         test2=(l, None, (str,)),
330         test3=(l, int, (double,)),
331         test4=(f, None, (bool, _long, float, double, int, uint, ulong)),
332         test_float=(l, float, (float,)),
333         test_double=(l, double, (double, )),
334         test_string=(l, str, (str, )),
335         test_object=(l, object, (object, )),
336     )
337
338 class _TestCMarshaller:
339     def setUp(self):
340         self.obj = CM()
341         testhelper.connectcallbacks(self.obj)
342
343     def testTest1(self):
344         self.obj.emit("test1")
345
346     def testTest2(self):
347         self.obj.emit("test2", "string")
348
349     def testTest3(self):
350         rv = self.obj.emit("test3", 42.0)
351         self.assertEqual(rv, 20)
352
353     def testTest4(self):
354         self.obj.emit("test4", True, _long(10), 3.14, 1.78, 20, _long(30), _long(31))
355
356     def testTestReturnFloat(self):
357         rv = self.obj.emit("test-float", 1.234)
358         self.failUnless(rv >= 1.233999 and rv <= 1.2400001, rv)
359
360     def testTestReturnDouble(self):
361         rv = self.obj.emit("test-double", 1.234)
362         self.assertEqual(rv, 1.234)
363
364     def testTestReturnString(self):
365         rv = self.obj.emit("test-string", "str")
366         self.assertEqual(rv, "str")
367
368     def testTestReturnObject(self):
369         rv = self.obj.emit("test-object", self)
370         self.assertEqual(rv, self)
371
372 if 'generic-c-marshaller' in gobject.features:
373     class TestCMarshaller(_TestCMarshaller, unittest.TestCase):
374         pass
375 else:
376     print()
377     print('** WARNING: LIBFFI disabled, not testing')
378     print()
379
380 # Test for 374653
381 class TestPyGValue(unittest.TestCase):
382     def testNoneNULLBoxedConversion(self):
383         class C(gobject.GObject):
384             __gsignals__ = dict(my_boxed_signal=(
385                 gobject.SIGNAL_RUN_LAST,
386                 gobject.type_from_name('GStrv'), ()))
387
388         obj = C()
389         obj.connect('my-boxed-signal', lambda obj: None)
390         sys.last_type = None
391         obj.emit('my-boxed-signal')
392         assert not sys.last_type
393
394 if __name__ == '__main__':
395     unittest.main()