1 # -*- test-case-name: twisted.words.test.test_xishutil -*-
3 # Copyright (c) Twisted Matrix Laboratories.
4 # See LICENSE for details.
7 Event Dispatching and Callback utilities.
10 from twisted.python import log
11 from twisted.words.xish import xpath
13 class _MethodWrapper(object):
15 Internal class for tracking method calls.
17 def __init__(self, method, *args, **kwargs):
23 def __call__(self, *args, **kwargs):
24 nargs = self.args + args
25 nkwargs = self.kwargs.copy()
26 nkwargs.update(kwargs)
27 self.method(*nargs, **nkwargs)
33 Container for callbacks.
35 Event queries are linked to lists of callables. When a matching event
36 occurs, these callables are called in sequence. One-time callbacks
37 are removed from the list after the first time the event was triggered.
39 Arguments to callbacks are split spread across two sets. The first set,
40 callback specific, is passed to C{addCallback} and is used for all
41 subsequent event triggers. The second set is passed to C{callback} and is
42 event specific. Positional arguments in the second set come after the
43 positional arguments of the first set. Keyword arguments in the second set
44 override those in the first set.
46 @ivar callbacks: The registered callbacks as mapping from the callable to a
47 tuple of a wrapper for that callable that keeps the
48 callback specific arguments and a boolean that signifies
49 if it is to be called only once.
50 @type callbacks: C{dict}
57 def addCallback(self, onetime, method, *args, **kwargs):
61 The arguments passed are used as callback specific arguments.
63 @param onetime: If C{True}, this callback is called at most once.
64 @type onetime: C{bool}
65 @param method: The callback callable to be added.
66 @param args: Positional arguments to the callable.
68 @param kwargs: Keyword arguments to the callable.
72 if not method in self.callbacks:
73 self.callbacks[method] = (_MethodWrapper(method, *args, **kwargs),
77 def removeCallback(self, method):
81 @param method: The callable to be removed.
84 if method in self.callbacks:
85 del self.callbacks[method]
88 def callback(self, *args, **kwargs):
90 Call all registered callbacks.
92 The passed arguments are event specific and augment and override
93 the callback specific arguments as described above.
95 @note: Exceptions raised by callbacks are trapped and logged. They will
96 not propagate up to make sure other callbacks will still be
97 called, and the event dispatching always succeeds.
99 @param args: Positional arguments to the callable.
101 @param kwargs: Keyword arguments to the callable.
102 @type kwargs: C{dict}
105 for key, (methodwrapper, onetime) in self.callbacks.items():
107 methodwrapper(*args, **kwargs)
112 del self.callbacks[key]
117 Return if list of registered callbacks is empty.
122 return len(self.callbacks) == 0
126 class EventDispatcher:
128 Event dispatching service.
130 The C{EventDispatcher} allows observers to be registered for certain events
131 that are dispatched. There are two types of events: XPath events and Named
134 Every dispatch is triggered by calling L{dispatch} with a data object and,
135 for named events, the name of the event.
137 When an XPath type event is dispatched, the associated object is assumed to
138 be an L{Element<twisted.words.xish.domish.Element>} instance, which is
139 matched against all registered XPath queries. For every match, the
140 respective observer will be called with the data object.
142 A named event will simply call each registered observer for that particular
143 event name, with the data object. Unlike XPath type events, the data object
144 is not restricted to L{Element<twisted.words.xish.domish.Element>}, but can
147 When registering observers, the event that is to be observed is specified
148 using an L{xpath.XPathQuery} instance or a string. In the latter case, the
149 string can also contain the string representation of an XPath expression.
150 To distinguish these from named events, each named event should start with
151 a special prefix that is stored in C{self.prefix}. It defaults to
154 Observers registered using L{addObserver} are persistent: after the
155 observer has been triggered by a dispatch, it remains registered for a
156 possible next dispatch. If instead L{addOnetimeObserver} was used to
157 observe an event, the observer is removed from the list of observers after
158 the first observed event.
160 Observers can also be prioritized, by providing an optional C{priority}
161 parameter to the L{addObserver} and L{addOnetimeObserver} methods. Higher
162 priority observers are then called before lower priority observers.
164 Finally, observers can be unregistered by using L{removeObserver}.
167 def __init__(self, eventprefix="//event/"):
168 self.prefix = eventprefix
169 self._eventObservers = {}
170 self._xpathObservers = {}
171 self._dispatchDepth = 0 # Flag indicating levels of dispatching
173 self._updateQueue = [] # Queued updates for observer ops
176 def _getEventAndObservers(self, event):
177 if isinstance(event, xpath.XPathQuery):
179 observers = self._xpathObservers
181 if self.prefix == event[:len(self.prefix)]:
183 observers = self._eventObservers
186 event = xpath.internQuery(event)
187 observers = self._xpathObservers
189 return event, observers
192 def addOnetimeObserver(self, event, observerfn, priority=0, *args, **kwargs):
194 Register a one-time observer for an event.
196 Like L{addObserver}, but is only triggered at most once. See there
197 for a description of the parameters.
199 self._addObserver(True, event, observerfn, priority, *args, **kwargs)
202 def addObserver(self, event, observerfn, priority=0, *args, **kwargs):
204 Register an observer for an event.
206 Each observer will be registered with a certain priority. Higher
207 priority observers get called before lower priority observers.
209 @param event: Name or XPath query for the event to be monitored.
210 @type event: C{str} or L{xpath.XPathQuery}.
211 @param observerfn: Function to be called when the specified event
212 has been triggered. This callable takes
213 one parameter: the data object that triggered
214 the event. When specified, the C{*args} and
215 C{**kwargs} parameters to addObserver are being used
216 as additional parameters to the registered observer
218 @param priority: (Optional) priority of this observer in relation to
219 other observer that match the same event. Defaults to
221 @type priority: C{int}
223 self._addObserver(False, event, observerfn, priority, *args, **kwargs)
226 def _addObserver(self, onetime, event, observerfn, priority, *args, **kwargs):
227 # If this is happening in the middle of the dispatch, queue
228 # it up for processing after the dispatch completes
229 if self._dispatchDepth > 0:
230 self._updateQueue.append(lambda:self._addObserver(onetime, event, observerfn, priority, *args, **kwargs))
233 event, observers = self._getEventAndObservers(event)
235 if priority not in observers:
237 observers[priority] = {event: cbl}
239 priorityObservers = observers[priority]
240 if event not in priorityObservers:
242 observers[priority][event] = cbl
244 cbl = priorityObservers[event]
246 cbl.addCallback(onetime, observerfn, *args, **kwargs)
249 def removeObserver(self, event, observerfn):
251 Remove callable as observer for an event.
253 The observer callable is removed for all priority levels for the
256 @param event: Event for which the observer callable was registered.
257 @type event: C{str} or L{xpath.XPathQuery}
258 @param observerfn: Observer callable to be unregistered.
261 # If this is happening in the middle of the dispatch, queue
262 # it up for processing after the dispatch completes
263 if self._dispatchDepth > 0:
264 self._updateQueue.append(lambda:self.removeObserver(event, observerfn))
267 event, observers = self._getEventAndObservers(event)
270 for priority, priorityObservers in observers.iteritems():
271 for query, callbacklist in priorityObservers.iteritems():
273 callbacklist.removeCallback(observerfn)
274 if callbacklist.isEmpty():
275 emptyLists.append((priority, query))
277 for priority, query in emptyLists:
278 del observers[priority][query]
281 def dispatch(self, obj, event=None):
285 When C{event} is C{None}, an XPath type event is triggered, and
286 C{obj} is assumed to be an instance of
287 L{Element<twisted.words.xish.domish.Element>}. Otherwise, C{event}
288 holds the name of the named event being triggered. In the latter case,
289 C{obj} can be anything.
291 @param obj: The object to be dispatched.
292 @param event: Optional event name.
298 self._dispatchDepth += 1
302 observers = self._eventObservers
303 match = lambda query, obj: query == event
306 observers = self._xpathObservers
307 match = lambda query, obj: query.matches(obj)
309 priorities = observers.keys()
314 for priority in priorities:
315 for query, callbacklist in observers[priority].iteritems():
316 if match(query, obj):
317 callbacklist.callback(obj)
319 if callbacklist.isEmpty():
320 emptyLists.append((priority, query))
322 for priority, query in emptyLists:
323 del observers[priority][query]
325 self._dispatchDepth -= 1
327 # If this is a dispatch within a dispatch, don't
328 # do anything with the updateQueue -- it needs to
329 # wait until we've back all the way out of the stack
330 if self._dispatchDepth == 0:
331 # Deal with pending update operations
332 for f in self._updateQueue:
334 self._updateQueue = []
340 class XmlPipe(object):
344 Connects two objects that communicate stanzas through an XML stream like
345 interface. Each of the ends of the pipe (sink and source) can be used to
346 send XML stanzas to the other side, or add observers to process XML stanzas
347 that were sent from the other side.
349 XML pipes are usually used in place of regular XML streams that are
350 transported over TCP. This is the reason for the use of the names source
351 and sink for both ends of the pipe. The source side corresponds with the
352 entity that initiated the TCP connection, whereas the sink corresponds with
353 the entity that accepts that connection. In this object, though, the source
354 and sink are treated equally.
357 L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink
358 and source objects are assumed to represent an eternal connected and
359 initialized XML stream. As such, events corresponding to connection,
360 disconnection, initialization and stream errors are not dispatched or
364 @ivar source: Source XML stream.
365 @ivar sink: Sink XML stream.
369 self.source = EventDispatcher()
370 self.sink = EventDispatcher()
371 self.source.send = lambda obj: self.sink.dispatch(obj)
372 self.sink.send = lambda obj: self.source.dispatch(obj)