Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / words / test / test_xishutil.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Test cases for twisted.words.xish.utility
6 """
7
8 from twisted.trial import unittest
9
10 from twisted.python.util import OrderedDict
11 from twisted.words.xish import utility
12 from twisted.words.xish.domish import Element
13 from twisted.words.xish.utility import EventDispatcher
14
15 class CallbackTracker:
16     """
17     Test helper for tracking callbacks.
18
19     Increases a counter on each call to L{call} and stores the object
20     passed in the call.
21     """
22
23     def __init__(self):
24         self.called = 0
25         self.obj = None
26
27
28     def call(self, obj):
29         self.called = self.called + 1
30         self.obj = obj
31
32
33
34 class OrderedCallbackTracker:
35     """
36     Test helper for tracking callbacks and their order.
37     """
38
39     def __init__(self):
40         self.callList = []
41
42
43     def call1(self, object):
44         self.callList.append(self.call1)
45
46
47     def call2(self, object):
48         self.callList.append(self.call2)
49
50
51     def call3(self, object):
52         self.callList.append(self.call3)
53
54
55
56 class EventDispatcherTest(unittest.TestCase):
57     """
58     Tests for L{EventDispatcher}.
59     """
60
61     def testStuff(self):
62         d = EventDispatcher()
63         cb1 = CallbackTracker()
64         cb2 = CallbackTracker()
65         cb3 = CallbackTracker()
66
67         d.addObserver("/message/body", cb1.call)
68         d.addObserver("/message", cb1.call)
69         d.addObserver("/presence", cb2.call)
70         d.addObserver("//event/testevent", cb3.call)
71
72         msg = Element(("ns", "message"))
73         msg.addElement("body")
74
75         pres = Element(("ns", "presence"))
76         pres.addElement("presence")
77
78         d.dispatch(msg)
79         self.assertEqual(cb1.called, 2)
80         self.assertEqual(cb1.obj, msg)
81         self.assertEqual(cb2.called, 0)
82
83         d.dispatch(pres)
84         self.assertEqual(cb1.called, 2)
85         self.assertEqual(cb2.called, 1)
86         self.assertEqual(cb2.obj, pres)
87         self.assertEqual(cb3.called, 0)
88
89         d.dispatch(d, "//event/testevent")
90         self.assertEqual(cb3.called, 1)
91         self.assertEqual(cb3.obj, d)
92
93         d.removeObserver("/presence", cb2.call)
94         d.dispatch(pres)
95         self.assertEqual(cb2.called, 1)
96
97
98     def test_addObserverTwice(self):
99         """
100         Test adding two observers for the same query.
101
102         When the event is dispath both of the observers need to be called.
103         """
104         d = EventDispatcher()
105         cb1 = CallbackTracker()
106         cb2 = CallbackTracker()
107
108         d.addObserver("//event/testevent", cb1.call)
109         d.addObserver("//event/testevent", cb2.call)
110         d.dispatch(d, "//event/testevent")
111
112         self.assertEqual(cb1.called, 1)
113         self.assertEqual(cb1.obj, d)
114         self.assertEqual(cb2.called, 1)
115         self.assertEqual(cb2.obj, d)
116
117
118     def test_addObserverInDispatch(self):
119         """
120         Test for registration of an observer during dispatch.
121         """
122         d = EventDispatcher()
123         msg = Element(("ns", "message"))
124         cb = CallbackTracker()
125
126         def onMessage(_):
127             d.addObserver("/message", cb.call)
128
129         d.addOnetimeObserver("/message", onMessage)
130
131         d.dispatch(msg)
132         self.assertEqual(cb.called, 0)
133
134         d.dispatch(msg)
135         self.assertEqual(cb.called, 1)
136
137         d.dispatch(msg)
138         self.assertEqual(cb.called, 2)
139
140
141     def test_addOnetimeObserverInDispatch(self):
142         """
143         Test for registration of a onetime observer during dispatch.
144         """
145         d = EventDispatcher()
146         msg = Element(("ns", "message"))
147         cb = CallbackTracker()
148
149         def onMessage(msg):
150             d.addOnetimeObserver("/message", cb.call)
151
152         d.addOnetimeObserver("/message", onMessage)
153
154         d.dispatch(msg)
155         self.assertEqual(cb.called, 0)
156
157         d.dispatch(msg)
158         self.assertEqual(cb.called, 1)
159
160         d.dispatch(msg)
161         self.assertEqual(cb.called, 1)
162
163
164     def testOnetimeDispatch(self):
165         d = EventDispatcher()
166         msg = Element(("ns", "message"))
167         cb = CallbackTracker()
168
169         d.addOnetimeObserver("/message", cb.call)
170         d.dispatch(msg)
171         self.assertEqual(cb.called, 1)
172         d.dispatch(msg)
173         self.assertEqual(cb.called, 1)
174
175
176     def testDispatcherResult(self):
177         d = EventDispatcher()
178         msg = Element(("ns", "message"))
179         pres = Element(("ns", "presence"))
180         cb = CallbackTracker()
181
182         d.addObserver("/presence", cb.call)
183         result = d.dispatch(msg)
184         self.assertEqual(False, result)
185
186         result = d.dispatch(pres)
187         self.assertEqual(True, result)
188
189
190     def testOrderedXPathDispatch(self):
191         d = EventDispatcher()
192         cb = OrderedCallbackTracker()
193         d.addObserver("/message/body", cb.call2)
194         d.addObserver("/message", cb.call3, -1)
195         d.addObserver("/message/body", cb.call1, 1)
196
197         msg = Element(("ns", "message"))
198         msg.addElement("body")
199         d.dispatch(msg)
200         self.assertEqual(cb.callList, [cb.call1, cb.call2, cb.call3],
201                           "Calls out of order: %s" %
202                           repr([c.__name__ for c in cb.callList]))
203
204
205     # Observers are put into CallbackLists that are then put into dictionaries
206     # keyed by the event trigger. Upon removal of the last observer for a
207     # particular event trigger, the (now empty) CallbackList and corresponding
208     # event trigger should be removed from those dictionaries to prevent
209     # slowdown and memory leakage.
210
211     def test_cleanUpRemoveEventObserver(self):
212         """
213         Test observer clean-up after removeObserver for named events.
214         """
215
216         d = EventDispatcher()
217         cb = CallbackTracker()
218
219         d.addObserver('//event/test', cb.call)
220         d.dispatch(None, '//event/test')
221         self.assertEqual(1, cb.called)
222         d.removeObserver('//event/test', cb.call)
223         self.assertEqual(0, len(d._eventObservers.pop(0)))
224
225
226     def test_cleanUpRemoveXPathObserver(self):
227         """
228         Test observer clean-up after removeObserver for XPath events.
229         """
230
231         d = EventDispatcher()
232         cb = CallbackTracker()
233         msg = Element((None, "message"))
234
235         d.addObserver('/message', cb.call)
236         d.dispatch(msg)
237         self.assertEqual(1, cb.called)
238         d.removeObserver('/message', cb.call)
239         self.assertEqual(0, len(d._xpathObservers.pop(0)))
240
241
242     def test_cleanUpOnetimeEventObserver(self):
243         """
244         Test observer clean-up after onetime named events.
245         """
246
247         d = EventDispatcher()
248         cb = CallbackTracker()
249
250         d.addOnetimeObserver('//event/test', cb.call)
251         d.dispatch(None, '//event/test')
252         self.assertEqual(1, cb.called)
253         self.assertEqual(0, len(d._eventObservers.pop(0)))
254
255
256     def test_cleanUpOnetimeXPathObserver(self):
257         """
258         Test observer clean-up after onetime XPath events.
259         """
260
261         d = EventDispatcher()
262         cb = CallbackTracker()
263         msg = Element((None, "message"))
264
265         d.addOnetimeObserver('/message', cb.call)
266         d.dispatch(msg)
267         self.assertEqual(1, cb.called)
268         self.assertEqual(0, len(d._xpathObservers.pop(0)))
269
270
271     def test_observerRaisingException(self):
272         """
273         Test that exceptions in observers do not bubble up to dispatch.
274
275         The exceptions raised in observers should be logged and other
276         observers should be called as if nothing happened.
277         """
278
279         class OrderedCallbackList(utility.CallbackList):
280             def __init__(self):
281                 self.callbacks = OrderedDict()
282
283         class TestError(Exception):
284             pass
285
286         def raiseError(_):
287             raise TestError()
288
289         d = EventDispatcher()
290         cb = CallbackTracker()
291
292         originalCallbackList = utility.CallbackList
293
294         try:
295             utility.CallbackList = OrderedCallbackList
296
297             d.addObserver('//event/test', raiseError)
298             d.addObserver('//event/test', cb.call)
299             try:
300                 d.dispatch(None, '//event/test')
301             except TestError:
302                 self.fail("TestError raised. Should have been logged instead.")
303
304             self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
305             self.assertEqual(1, cb.called)
306         finally:
307             utility.CallbackList = originalCallbackList
308
309
310
311 class XmlPipeTest(unittest.TestCase):
312     """
313     Tests for L{twisted.words.xish.utility.XmlPipe}.
314     """
315
316     def setUp(self):
317         self.pipe = utility.XmlPipe()
318
319
320     def test_sendFromSource(self):
321         """
322         Send an element from the source and observe it from the sink.
323         """
324         def cb(obj):
325             called.append(obj)
326
327         called = []
328         self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
329         element = Element(('testns', 'test'))
330         self.pipe.source.send(element)
331         self.assertEqual([element], called)
332
333
334     def test_sendFromSink(self):
335         """
336         Send an element from the sink and observe it from the source.
337         """
338         def cb(obj):
339             called.append(obj)
340
341         called = []
342         self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
343         element = Element(('testns', 'test'))
344         self.pipe.sink.send(element)
345         self.assertEqual([element], called)