1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Test cases for twisted.words.xish.utility
8 from twisted.trial import unittest
10 from twisted.python.util import OrderedDict
11 from twisted.words.xish import utility
12 from twisted.words.xish.domish import Element
13 from twisted.words.xish.utility import EventDispatcher
15 class CallbackTracker:
17 Test helper for tracking callbacks.
19 Increases a counter on each call to L{call} and stores the object
29 self.called = self.called + 1
34 class OrderedCallbackTracker:
36 Test helper for tracking callbacks and their order.
43 def call1(self, object):
44 self.callList.append(self.call1)
47 def call2(self, object):
48 self.callList.append(self.call2)
51 def call3(self, object):
52 self.callList.append(self.call3)
56 class EventDispatcherTest(unittest.TestCase):
58 Tests for L{EventDispatcher}.
63 cb1 = CallbackTracker()
64 cb2 = CallbackTracker()
65 cb3 = CallbackTracker()
67 d.addObserver("/message/body", cb1.call)
68 d.addObserver("/message", cb1.call)
69 d.addObserver("/presence", cb2.call)
70 d.addObserver("//event/testevent", cb3.call)
72 msg = Element(("ns", "message"))
73 msg.addElement("body")
75 pres = Element(("ns", "presence"))
76 pres.addElement("presence")
79 self.assertEqual(cb1.called, 2)
80 self.assertEqual(cb1.obj, msg)
81 self.assertEqual(cb2.called, 0)
84 self.assertEqual(cb1.called, 2)
85 self.assertEqual(cb2.called, 1)
86 self.assertEqual(cb2.obj, pres)
87 self.assertEqual(cb3.called, 0)
89 d.dispatch(d, "//event/testevent")
90 self.assertEqual(cb3.called, 1)
91 self.assertEqual(cb3.obj, d)
93 d.removeObserver("/presence", cb2.call)
95 self.assertEqual(cb2.called, 1)
98 def test_addObserverTwice(self):
100 Test adding two observers for the same query.
102 When the event is dispath both of the observers need to be called.
104 d = EventDispatcher()
105 cb1 = CallbackTracker()
106 cb2 = CallbackTracker()
108 d.addObserver("//event/testevent", cb1.call)
109 d.addObserver("//event/testevent", cb2.call)
110 d.dispatch(d, "//event/testevent")
112 self.assertEqual(cb1.called, 1)
113 self.assertEqual(cb1.obj, d)
114 self.assertEqual(cb2.called, 1)
115 self.assertEqual(cb2.obj, d)
118 def test_addObserverInDispatch(self):
120 Test for registration of an observer during dispatch.
122 d = EventDispatcher()
123 msg = Element(("ns", "message"))
124 cb = CallbackTracker()
127 d.addObserver("/message", cb.call)
129 d.addOnetimeObserver("/message", onMessage)
132 self.assertEqual(cb.called, 0)
135 self.assertEqual(cb.called, 1)
138 self.assertEqual(cb.called, 2)
141 def test_addOnetimeObserverInDispatch(self):
143 Test for registration of a onetime observer during dispatch.
145 d = EventDispatcher()
146 msg = Element(("ns", "message"))
147 cb = CallbackTracker()
150 d.addOnetimeObserver("/message", cb.call)
152 d.addOnetimeObserver("/message", onMessage)
155 self.assertEqual(cb.called, 0)
158 self.assertEqual(cb.called, 1)
161 self.assertEqual(cb.called, 1)
164 def testOnetimeDispatch(self):
165 d = EventDispatcher()
166 msg = Element(("ns", "message"))
167 cb = CallbackTracker()
169 d.addOnetimeObserver("/message", cb.call)
171 self.assertEqual(cb.called, 1)
173 self.assertEqual(cb.called, 1)
176 def testDispatcherResult(self):
177 d = EventDispatcher()
178 msg = Element(("ns", "message"))
179 pres = Element(("ns", "presence"))
180 cb = CallbackTracker()
182 d.addObserver("/presence", cb.call)
183 result = d.dispatch(msg)
184 self.assertEqual(False, result)
186 result = d.dispatch(pres)
187 self.assertEqual(True, result)
190 def testOrderedXPathDispatch(self):
191 d = EventDispatcher()
192 cb = OrderedCallbackTracker()
193 d.addObserver("/message/body", cb.call2)
194 d.addObserver("/message", cb.call3, -1)
195 d.addObserver("/message/body", cb.call1, 1)
197 msg = Element(("ns", "message"))
198 msg.addElement("body")
200 self.assertEqual(cb.callList, [cb.call1, cb.call2, cb.call3],
201 "Calls out of order: %s" %
202 repr([c.__name__ for c in cb.callList]))
205 # Observers are put into CallbackLists that are then put into dictionaries
206 # keyed by the event trigger. Upon removal of the last observer for a
207 # particular event trigger, the (now empty) CallbackList and corresponding
208 # event trigger should be removed from those dictionaries to prevent
209 # slowdown and memory leakage.
211 def test_cleanUpRemoveEventObserver(self):
213 Test observer clean-up after removeObserver for named events.
216 d = EventDispatcher()
217 cb = CallbackTracker()
219 d.addObserver('//event/test', cb.call)
220 d.dispatch(None, '//event/test')
221 self.assertEqual(1, cb.called)
222 d.removeObserver('//event/test', cb.call)
223 self.assertEqual(0, len(d._eventObservers.pop(0)))
226 def test_cleanUpRemoveXPathObserver(self):
228 Test observer clean-up after removeObserver for XPath events.
231 d = EventDispatcher()
232 cb = CallbackTracker()
233 msg = Element((None, "message"))
235 d.addObserver('/message', cb.call)
237 self.assertEqual(1, cb.called)
238 d.removeObserver('/message', cb.call)
239 self.assertEqual(0, len(d._xpathObservers.pop(0)))
242 def test_cleanUpOnetimeEventObserver(self):
244 Test observer clean-up after onetime named events.
247 d = EventDispatcher()
248 cb = CallbackTracker()
250 d.addOnetimeObserver('//event/test', cb.call)
251 d.dispatch(None, '//event/test')
252 self.assertEqual(1, cb.called)
253 self.assertEqual(0, len(d._eventObservers.pop(0)))
256 def test_cleanUpOnetimeXPathObserver(self):
258 Test observer clean-up after onetime XPath events.
261 d = EventDispatcher()
262 cb = CallbackTracker()
263 msg = Element((None, "message"))
265 d.addOnetimeObserver('/message', cb.call)
267 self.assertEqual(1, cb.called)
268 self.assertEqual(0, len(d._xpathObservers.pop(0)))
271 def test_observerRaisingException(self):
273 Test that exceptions in observers do not bubble up to dispatch.
275 The exceptions raised in observers should be logged and other
276 observers should be called as if nothing happened.
279 class OrderedCallbackList(utility.CallbackList):
281 self.callbacks = OrderedDict()
283 class TestError(Exception):
289 d = EventDispatcher()
290 cb = CallbackTracker()
292 originalCallbackList = utility.CallbackList
295 utility.CallbackList = OrderedCallbackList
297 d.addObserver('//event/test', raiseError)
298 d.addObserver('//event/test', cb.call)
300 d.dispatch(None, '//event/test')
302 self.fail("TestError raised. Should have been logged instead.")
304 self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
305 self.assertEqual(1, cb.called)
307 utility.CallbackList = originalCallbackList
311 class XmlPipeTest(unittest.TestCase):
313 Tests for L{twisted.words.xish.utility.XmlPipe}.
317 self.pipe = utility.XmlPipe()
320 def test_sendFromSource(self):
322 Send an element from the source and observe it from the sink.
328 self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
329 element = Element(('testns', 'test'))
330 self.pipe.source.send(element)
331 self.assertEqual([element], called)
334 def test_sendFromSink(self):
336 Send an element from the sink and observe it from the source.
342 self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
343 element = Element(('testns', 'test'))
344 self.pipe.sink.send(element)
345 self.assertEqual([element], called)