1 # -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Support for results that aren't immediately available.
8 Maintainer: Glyph Lefkowitz
10 @var _NO_RESULT: The result used to represent the fact that there is no
11 result. B{Never ever ever use this as an actual result for a Deferred}. You
14 @var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred
15 chain. Always accompanied by a Deferred instance in the args tuple pointing
16 at the Deferred which is chained to the Deferred which has this marker.
22 from sys import exc_info
25 from twisted.python import log, failure, lockfile
26 from twisted.python.util import unsignedID, mergeFunctionMetadata
30 class AlreadyCalledError(Exception):
34 class CancelledError(Exception):
36 This error is raised by default when a L{Deferred} is cancelled.
40 class TimeoutError(Exception):
42 This exception is deprecated. It is used only by the deprecated
43 L{Deferred.setTimeout} method.
56 Return a L{Deferred} that has already had C{.callback(result)} called.
58 This is useful when you're writing synchronous code to an
59 asynchronous interface: i.e., some code is calling you expecting a
60 L{Deferred} result, but you don't actually need to do anything
61 asynchronous. Just return C{defer.succeed(theResult)}.
63 See L{fail} for a version of this function that uses a failing
64 L{Deferred} rather than a successful one.
66 @param result: The result to give to the Deferred's 'callback'
77 def fail(result=None):
79 Return a L{Deferred} that has already had C{.errback(result)} called.
81 See L{succeed}'s docstring for rationale.
83 @param result: The same argument that L{Deferred.errback} takes.
85 @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
86 current exception state.
96 def execute(callable, *args, **kw):
98 Create a L{Deferred} from a callable and arguments.
100 Call the given function with the given arguments. Return a L{Deferred}
101 which has been fired with its callback as the result of that invocation
102 or its C{errback} with a L{Failure} for the exception thrown.
105 result = callable(*args, **kw)
109 return succeed(result)
113 def maybeDeferred(f, *args, **kw):
115 Invoke a function that may or may not return a L{Deferred}.
117 Call the given function with the given arguments. If the returned
118 object is a L{Deferred}, return it. If the returned object is a L{Failure},
119 wrap it with L{fail} and return it. Otherwise, wrap it in L{succeed} and
120 return it. If an exception is raised, convert it to a L{Failure}, wrap it
121 in L{fail}, and then return it.
123 @type f: Any callable
124 @param f: The callable to invoke
126 @param args: The arguments to pass to C{f}
127 @param kw: The keyword arguments to pass to C{f}
130 @return: The result of the function call, wrapped in a L{Deferred} if
134 result = f(*args, **kw)
136 return fail(failure.Failure(captureVars=Deferred.debug))
138 if isinstance(result, Deferred):
140 elif isinstance(result, failure.Failure):
143 return succeed(result)
147 def timeout(deferred):
148 deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
157 def setDebugging(on):
159 Enable or disable L{Deferred} debugging.
161 When debugging is on, the call stacks from creation and invocation are
162 recorded, and added to any L{AlreadyCalledErrors} we raise.
164 Deferred.debug=bool(on)
170 Determine whether L{Deferred} debugging is enabled.
172 return Deferred.debug
175 # See module docstring.
176 _NO_RESULT = object()
183 This is a callback which will be put off until later.
185 Why do we want this? Well, in cases where a function in a threaded
186 program would block until it gets a result, for Twisted it should
187 not block. Instead, it should return a L{Deferred}.
189 This can be implemented for protocols that run over the network by
190 writing an asynchronous protocol for L{twisted.internet}. For methods
191 that come from outside packages that are not under our control, we use
192 threads (see for example L{twisted.enterprise.adbapi}).
194 For more information about Deferreds, see doc/core/howto/defer.html or
195 U{http://twistedmatrix.com/documents/current/core/howto/defer.html}
197 When creating a Deferred, you may provide a canceller function, which
198 will be called by d.cancel() to let you do any clean-up necessary if the
199 user decides not to wait for the deferred to complete.
201 @ivar called: A flag which is C{False} until either C{callback} or
202 C{errback} is called and afterwards always C{True}.
203 @type called: C{bool}
205 @ivar paused: A counter of how many unmatched C{pause} calls have been made
209 @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism
210 which is C{True} if the Deferred has no canceller and has been
211 cancelled, C{False} otherwise. If C{True}, it can be expected that
212 C{callback} or C{errback} will eventually be called and the result
213 should be silently discarded.
214 @type _suppressAlreadyCalled: C{bool}
216 @ivar _runningCallbacks: A flag which is C{True} while this instance is
217 executing its callback chain, used to stop recursive execution of
219 @type _runningCallbacks: C{bool}
221 @ivar _chainedTo: If this Deferred is waiting for the result of another
222 Deferred, this is a reference to the other Deferred. Otherwise, C{None}.
228 _suppressAlreadyCalled = False
230 # Are we currently running a user-installed callback? Meant to prevent
231 # recursive running of callbacks when a reentrant call to add a callback is
233 _runningCallbacks = False
235 # Keep this class attribute for now, for compatibility with code that
241 def __init__(self, canceller=None):
243 Initialize a L{Deferred}.
245 @param canceller: a callable used to stop the pending operation
246 scheduled by this L{Deferred} when L{Deferred.cancel} is
247 invoked. The canceller will be passed the deferred whose
248 cancelation is requested (i.e., self).
250 If a canceller is not given, or does not invoke its argument's
251 C{callback} or C{errback} method, L{Deferred.cancel} will
252 invoke L{Deferred.errback} with a L{CancelledError}.
254 Note that if a canceller is not given, C{callback} or
255 C{errback} may still be invoked exactly once, even though
256 defer.py will have already invoked C{errback}, as described
257 above. This allows clients of code which returns a L{Deferred}
258 to cancel it without requiring the L{Deferred} instantiator to
259 provide any specific implementation support for cancellation.
262 @type canceller: a 1-argument callable which takes a L{Deferred}. The
263 return result is ignored.
266 self._canceller = canceller
268 self._debugInfo = DebugInfo()
269 self._debugInfo.creator = traceback.format_stack()[:-1]
272 def addCallbacks(self, callback, errback=None,
273 callbackArgs=None, callbackKeywords=None,
274 errbackArgs=None, errbackKeywords=None):
276 Add a pair of callbacks (success and error) to this L{Deferred}.
278 These will be executed when the 'master' callback is run.
281 @rtype: a L{Deferred}
283 assert callable(callback)
284 assert errback == None or callable(errback)
285 cbs = ((callback, callbackArgs, callbackKeywords),
286 (errback or (passthru), errbackArgs, errbackKeywords))
287 self.callbacks.append(cbs)
294 def addCallback(self, callback, *args, **kw):
296 Convenience method for adding just a callback.
300 return self.addCallbacks(callback, callbackArgs=args,
304 def addErrback(self, errback, *args, **kw):
306 Convenience method for adding just an errback.
310 return self.addCallbacks(passthru, errback,
315 def addBoth(self, callback, *args, **kw):
317 Convenience method for adding a single callable as both a callback
322 return self.addCallbacks(callback, callback,
323 callbackArgs=args, errbackArgs=args,
324 callbackKeywords=kw, errbackKeywords=kw)
327 def chainDeferred(self, d):
329 Chain another L{Deferred} to this L{Deferred}.
331 This method adds callbacks to this L{Deferred} to call C{d}'s callback
332 or errback, as appropriate. It is merely a shorthand way of performing
335 self.addCallbacks(d.callback, d.errback)
337 When you chain a deferred d2 to another deferred d1 with
338 d1.chainDeferred(d2), you are making d2 participate in the callback
339 chain of d1. Thus any event that fires d1 will also fire d2.
340 However, the converse is B{not} true; if d2 is fired d1 will not be
343 Note that unlike the case where chaining is caused by a L{Deferred}
344 being returned from a callback, it is possible to cause the call
345 stack size limit to be exceeded by chaining many L{Deferred}s
346 together with C{chainDeferred}.
349 @rtype: a L{Deferred}
352 return self.addCallbacks(d.callback, d.errback)
355 def callback(self, result):
357 Run all success callbacks that have been added to this L{Deferred}.
359 Each callback will have its result passed as the first argument to
360 the next; this way, the callbacks act as a 'processing chain'. If
361 the success-callback returns a L{Failure} or raises an L{Exception},
362 processing will continue on the *error* callback chain. If a
363 callback (or errback) returns another L{Deferred}, this L{Deferred}
364 will be chained to it (and further callbacks will not run until that
365 L{Deferred} has a result).
367 assert not isinstance(result, Deferred)
368 self._startRunCallbacks(result)
371 def errback(self, fail=None):
373 Run all error callbacks that have been added to this L{Deferred}.
375 Each callback will have its result passed as the first
376 argument to the next; this way, the callbacks act as a
377 'processing chain'. Also, if the error-callback returns a non-Failure
378 or doesn't raise an L{Exception}, processing will continue on the
379 *success*-callback chain.
381 If the argument that's passed to me is not a L{failure.Failure} instance,
382 it will be embedded in one. If no argument is passed, a
383 L{failure.Failure} instance will be created based on the current
386 Passing a string as `fail' is deprecated, and will be punished with
389 @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
390 no current exception state.
393 fail = failure.Failure(captureVars=self.debug)
394 elif not isinstance(fail, failure.Failure):
395 fail = failure.Failure(fail)
397 self._startRunCallbacks(fail)
402 Stop processing on a L{Deferred} until L{unpause}() is called.
404 self.paused = self.paused + 1
409 Process all callbacks made since L{pause}() was called.
411 self.paused = self.paused - 1
420 Cancel this L{Deferred}.
422 If the L{Deferred} has not yet had its C{errback} or C{callback} method
423 invoked, call the canceller function provided to the constructor. If
424 that function does not invoke C{callback} or C{errback}, or if no
425 canceller function was provided, errback with L{CancelledError}.
427 If this L{Deferred} is waiting on another L{Deferred}, forward the
428 cancellation to the other L{Deferred}.
431 canceller = self._canceller
435 # Arrange to eat the callback that will eventually be fired
436 # since there was no real canceller.
437 self._suppressAlreadyCalled = True
439 # There was no canceller, or the canceller didn't call
440 # callback or errback.
441 self.errback(failure.Failure(CancelledError()))
442 elif isinstance(self.result, Deferred):
443 # Waiting for another deferred -- cancel it instead.
447 def _startRunCallbacks(self, result):
449 if self._suppressAlreadyCalled:
450 self._suppressAlreadyCalled = False
453 if self._debugInfo is None:
454 self._debugInfo = DebugInfo()
455 extra = "\n" + self._debugInfo._getDebugTracebacks()
456 raise AlreadyCalledError(extra)
457 raise AlreadyCalledError
459 if self._debugInfo is None:
460 self._debugInfo = DebugInfo()
461 self._debugInfo.invoker = traceback.format_stack()[:-2]
467 def _continuation(self):
469 Build a tuple of callback and errback with L{_continue} to be used by
470 L{_addContinue} and L{_removeContinue} on another Deferred.
472 return ((_CONTINUE, (self,), None),
473 (_CONTINUE, (self,), None))
476 def _runCallbacks(self):
478 Run the chain of callbacks once a result is available.
480 This consists of a simple loop over all of the callbacks, calling each
481 with the current result and making the current result equal to the
482 return value (or raised exception) of that call.
484 If C{self._runningCallbacks} is true, this loop won't run at all, since
485 it is already running above us on the call stack. If C{self.paused} is
486 true, the loop also won't run, because that's what it means to be
489 The loop will terminate before processing all of the callbacks if a
490 C{Deferred} without a result is encountered.
492 If a C{Deferred} I{with} a result is encountered, that result is taken
493 and the loop proceeds.
495 @note: The implementation is complicated slightly by the fact that
496 chaining (associating two Deferreds with each other such that one
497 will wait for the result of the other, as happens when a Deferred is
498 returned from a callback on another Deferred) is supported
499 iteratively rather than recursively, to avoid running out of stack
500 frames when processing long chains.
502 if self._runningCallbacks:
503 # Don't recursively run callbacks
506 # Keep track of all the Deferreds encountered while propagating results
507 # up a chain. The way a Deferred gets onto this stack is by having
508 # added its _continuation() to the callbacks list of a second Deferred
509 # and then that second Deferred being fired. ie, if ever had _chainedTo
510 # set to something other than None, you might end up on this stack.
517 # This Deferred isn't going to produce a result at all. All the
518 # Deferreds up the chain waiting on it will just have to...
523 current._chainedTo = None
524 while current.callbacks:
525 item = current.callbacks.pop(0)
526 callback, args, kw = item[
527 isinstance(current.result, failure.Failure)]
531 # Avoid recursion if we can.
532 if callback is _CONTINUE:
533 # Give the waiting Deferred our current result and then
534 # forget about that result ourselves.
536 chainee.result = current.result
537 current.result = None
538 # Making sure to update _debugInfo
539 if current._debugInfo is not None:
540 current._debugInfo.failResult = None
542 chain.append(chainee)
543 # Delay cleaning this Deferred and popping it from the chain
544 # until after we've dealt with chainee.
549 current._runningCallbacks = True
551 current.result = callback(current.result, *args, **kw)
553 current._runningCallbacks = False
555 # Including full frame information in the Failure is quite
556 # expensive, so we avoid it unless self.debug is set.
557 current.result = failure.Failure(captureVars=self.debug)
559 if isinstance(current.result, Deferred):
560 # The result is another Deferred. If it has a result,
561 # we can take it and keep going.
562 resultResult = getattr(current.result, 'result', _NO_RESULT)
563 if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
564 # Nope, it didn't. Pause and chain.
566 current._chainedTo = current.result
567 # Note: current.result has no result, so it's not
568 # running its callbacks right now. Therefore we can
569 # append to the callbacks list directly instead of
570 # using addCallbacks.
571 current.result.callbacks.append(current._continuation())
574 # Yep, it did. Steal it.
575 current.result.result = None
576 # Make sure _debugInfo's failure state is updated.
577 if current.result._debugInfo is not None:
578 current.result._debugInfo.failResult = None
579 current.result = resultResult
582 # As much of the callback chain - perhaps all of it - as can be
583 # processed right now has been. The current Deferred is waiting on
584 # another Deferred or for more callbacks. Before finishing with it,
585 # make sure its _debugInfo is in the proper state.
586 if isinstance(current.result, failure.Failure):
587 # Stash the Failure in the _debugInfo for unhandled error
589 current.result.cleanFailure()
590 if current._debugInfo is None:
591 current._debugInfo = DebugInfo()
592 current._debugInfo.failResult = current.result
594 # Clear out any Failure in the _debugInfo, since the result
595 # is no longer a Failure.
596 if current._debugInfo is not None:
597 current._debugInfo.failResult = None
599 # This Deferred is done, pop it from the chain and move back up
600 # to the Deferred which supplied us with our result.
606 Return a string representation of this C{Deferred}.
608 cname = self.__class__.__name__
609 result = getattr(self, 'result', _NO_RESULT)
610 myID = hex(unsignedID(self))
611 if self._chainedTo is not None:
612 result = ' waiting on Deferred at %s' % (hex(unsignedID(self._chainedTo)),)
613 elif result is _NO_RESULT:
616 result = ' current result: %r' % (result,)
617 return "<%s at %s%s>" % (cname, myID, result)
624 Deferred debug helper.
629 def _getDebugTracebacks(self):
631 if hasattr(self, "creator"):
632 info += " C: Deferred was created:\n C:"
633 info += "".join(self.creator).rstrip().replace("\n","\n C:")
635 if hasattr(self, "invoker"):
636 info += " I: First Invoker was:\n I:"
637 info += "".join(self.invoker).rstrip().replace("\n","\n I:")
644 Print tracebacks and die.
646 If the *last* (and I do mean *last*) callback leaves me in an error
647 state, print a traceback (if said errback is a L{Failure}).
649 if self.failResult is not None:
650 log.msg("Unhandled error in Deferred:", isError=True)
651 debugInfo = self._getDebugTracebacks()
653 log.msg("(debug: " + debugInfo + ")", isError=True)
654 log.err(self.failResult)
658 class FirstError(Exception):
660 First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
662 @ivar subFailure: The L{Failure} that occurred.
663 @type subFailure: L{Failure}
665 @ivar index: The index of the L{Deferred} in the L{DeferredList} where
669 def __init__(self, failure, index):
670 Exception.__init__(self, failure, index)
671 self.subFailure = failure
677 The I{repr} of L{FirstError} instances includes the repr of the
678 wrapped failure's exception and the index of the L{FirstError}.
680 return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
685 The I{str} of L{FirstError} instances includes the I{str} of the
686 entire wrapped failure (including its traceback and exception) and
687 the index of the L{FirstError}.
689 return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
692 def __cmp__(self, other):
694 Comparison between L{FirstError} and other L{FirstError} instances
695 is defined as the comparison of the index and sub-failure of each
696 instance. L{FirstError} instances don't compare equal to anything
697 that isn't a L{FirstError} instance.
701 if isinstance(other, FirstError):
703 (self.index, self.subFailure),
704 (other.index, other.subFailure))
709 class DeferredList(Deferred):
711 L{DeferredList} is a tool for collecting the results of several Deferreds.
713 This tracks a list of L{Deferred}s for their results, and makes a single
714 callback when they have all completed. By default, the ultimate result is a
715 list of (success, result) tuples, 'success' being a boolean.
716 L{DeferredList} exposes the same API that L{Deferred} does, so callbacks and
717 errbacks can be added to it in the same way.
719 L{DeferredList} is implemented by adding callbacks and errbacks to each
720 L{Deferred} in the list passed to it. This means callbacks and errbacks
721 added to the Deferreds before they are passed to L{DeferredList} will change
722 the result that L{DeferredList} sees (i.e., L{DeferredList} is not special).
723 Callbacks and errbacks can also be added to the Deferreds after they are
724 passed to L{DeferredList} and L{DeferredList} may change the result that
727 See the documentation for the C{__init__} arguments for more information.
730 fireOnOneCallback = False
731 fireOnOneErrback = False
733 def __init__(self, deferredList, fireOnOneCallback=False,
734 fireOnOneErrback=False, consumeErrors=False):
736 Initialize a DeferredList.
738 @param deferredList: The list of deferreds to track.
739 @type deferredList: C{list} of L{Deferred}s
741 @param fireOnOneCallback: (keyword param) a flag indicating that this
742 L{DeferredList} will fire when the first L{Deferred} in
743 C{deferredList} fires with a non-failure result without waiting for
744 any of the other Deferreds. When this flag is set, the DeferredList
745 will fire with a two-tuple: the first element is the result of the
746 Deferred which fired; the second element is the index in
747 C{deferredList} of that Deferred.
748 @type fireOnOneCallback: C{bool}
750 @param fireOnOneErrback: (keyword param) a flag indicating that this
751 L{DeferredList} will fire when the first L{Deferred} in
752 C{deferredList} fires with a failure result without waiting for any
753 of the other Deferreds. When this flag is set, if a Deferred in the
754 list errbacks, the DeferredList will errback with a L{FirstError}
755 failure wrapping the failure of that Deferred.
756 @type fireOnOneErrback: C{bool}
758 @param consumeErrors: (keyword param) a flag indicating that failures in
759 any of the included L{Deferreds} should not be propagated to
760 errbacks added to the individual L{Deferreds} after this
761 L{DeferredList} is constructed. After constructing the
762 L{DeferredList}, any errors in the individual L{Deferred}s will be
763 converted to a callback result of C{None}. This is useful to
764 prevent spurious 'Unhandled error in Deferred' messages from being
765 logged. This does not prevent C{fireOnOneErrback} from working.
766 @type consumeErrors: C{bool}
768 self.resultList = [None] * len(deferredList)
769 Deferred.__init__(self)
770 if len(deferredList) == 0 and not fireOnOneCallback:
771 self.callback(self.resultList)
773 # These flags need to be set *before* attaching callbacks to the
774 # deferreds, because the callbacks use these flags, and will run
775 # synchronously if any of the deferreds are already fired.
776 self.fireOnOneCallback = fireOnOneCallback
777 self.fireOnOneErrback = fireOnOneErrback
778 self.consumeErrors = consumeErrors
779 self.finishedCount = 0
782 for deferred in deferredList:
783 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
784 callbackArgs=(index,SUCCESS),
785 errbackArgs=(index,FAILURE))
789 def _cbDeferred(self, result, index, succeeded):
791 (internal) Callback for when one of my deferreds fires.
793 self.resultList[index] = (succeeded, result)
795 self.finishedCount += 1
797 if succeeded == SUCCESS and self.fireOnOneCallback:
798 self.callback((result, index))
799 elif succeeded == FAILURE and self.fireOnOneErrback:
800 self.errback(failure.Failure(FirstError(result, index)))
801 elif self.finishedCount == len(self.resultList):
802 self.callback(self.resultList)
804 if succeeded == FAILURE and self.consumeErrors:
811 def _parseDListResult(l, fireOnOneErrback=False):
813 for success, value in l:
815 return [x[1] for x in l]
819 def gatherResults(deferredList, consumeErrors=False):
821 Returns, via a L{Deferred}, a list with the results of the given
822 L{Deferred}s - in effect, a "join" of multiple deferred operations.
824 The returned L{Deferred} will fire when I{all} of the provided L{Deferred}s
825 have fired, or when any one of them has failed.
827 This differs from L{DeferredList} in that you don't need to parse
828 the result for success/failure.
830 @type deferredList: C{list} of L{Deferred}s
832 @param consumeErrors: (keyword param) a flag, defaulting to False,
833 indicating that failures in any of the given L{Deferreds} should not be
834 propagated to errbacks added to the individual L{Deferreds} after this
835 L{gatherResults} invocation. Any such errors in the individual
836 L{Deferred}s will be converted to a callback result of C{None}. This
837 is useful to prevent spurious 'Unhandled error in Deferred' messages
838 from being logged. This parameter is available since 11.1.0.
839 @type consumeErrors: C{bool}
841 d = DeferredList(deferredList, fireOnOneErrback=True,
842 consumeErrors=consumeErrors)
843 d.addCallback(_parseDListResult)
848 # Constants for use with DeferredList
857 class waitForDeferred:
859 See L{deferredGenerator}.
862 def __init__(self, d):
863 if not isinstance(d, Deferred):
864 raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
869 if isinstance(self.result, failure.Failure):
870 self.result.raiseException()
875 def _deferGenerator(g, deferred):
877 See L{deferredGenerator}.
881 # This function is complicated by the need to prevent unbounded recursion
882 # arising from repeatedly yielding immediately ready deferreds. This while
883 # loop and the waiting variable solve that by manually unfolding the
886 waiting = [True, # defgen is waiting for result?
892 except StopIteration:
893 deferred.callback(result)
899 # Deferred.callback(Deferred) raises an error; we catch this case
900 # early here and give a nicer error message to the user in case
901 # they yield a Deferred.
902 if isinstance(result, Deferred):
903 return fail(TypeError("Yield waitForDeferred(d), not d!"))
905 if isinstance(result, waitForDeferred):
906 # a waitForDeferred was yielded, get the result.
907 # Pass result in so it don't get changed going around the loop
908 # This isn't a problem for waiting, as it's only reused if
909 # gotResult has already been executed.
910 def gotResult(r, result=result):
916 _deferGenerator(g, deferred)
917 result.d.addBoth(gotResult)
919 # Haven't called back yet, set flag so that we get reinvoked
920 # and return from the loop
923 # Reset waiting to initial values for next loop
931 def deferredGenerator(f):
933 deferredGenerator and waitForDeferred help you write L{Deferred}-using code
934 that looks like a regular sequential function. If your code has a minimum
935 requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
936 which can accomplish the same thing in a more concise manner.
938 There are two important functions involved: L{waitForDeferred}, and
939 L{deferredGenerator}. They are used together, like this::
943 thing = waitForDeferred(makeSomeRequestResultingInDeferred())
945 thing = thing.getResult()
946 print thing #the result! hoorj!
948 L{waitForDeferred} returns something that you should immediately yield; when
949 your generator is resumed, calling C{thing.getResult()} will either give you
950 the result of the L{Deferred} if it was a success, or raise an exception if it
951 was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do
952 not call it, I{your program will not work}.
954 L{deferredGenerator} takes one of these waitForDeferred-using generator
955 functions and converts it into a function that returns a L{Deferred}. The
956 result of the L{Deferred} will be the last value that your generator yielded
957 unless the last value is a L{waitForDeferred} instance, in which case the
958 result will be C{None}. If the function raises an unhandled exception, the
959 L{Deferred} will errback instead. Remember that C{return result} won't work;
960 use C{yield result; return} in place of that.
962 Note that not yielding anything from your generator will make the L{Deferred}
963 result in C{None}. Yielding a L{Deferred} from your generator is also an error
964 condition; always yield C{waitForDeferred(d)} instead.
966 The L{Deferred} returned from your deferred generator may also errback if your
967 generator raised an exception. For example::
971 thing = waitForDeferred(makeSomeRequestResultingInDeferred())
973 thing = thing.getResult()
974 if thing == 'I love Twisted':
975 # will become the result of the Deferred
976 yield 'TWISTED IS GREAT!'
979 # will trigger an errback
980 raise Exception('DESTROY ALL LIFE')
982 Put succinctly, these functions connect deferred-using code with this 'fake
983 blocking' style in both directions: L{waitForDeferred} converts from a
984 L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
985 'blocking' style to a L{Deferred}.
988 def unwindGenerator(*args, **kwargs):
989 return _deferGenerator(f(*args, **kwargs), Deferred())
990 return mergeFunctionMetadata(f, unwindGenerator)
995 # BaseException is only in Py 2.5.
999 BaseException=Exception
1003 class _DefGen_Return(BaseException):
1004 def __init__(self, value):
1009 def returnValue(val):
1011 Return val from a L{inlineCallbacks} generator.
1013 Note: this is currently implemented by raising an exception
1014 derived from L{BaseException}. You might want to change any
1015 'except:' clauses to an 'except Exception:' clause so as not to
1016 catch this exception.
1018 Also: while this function currently will work when called from
1019 within arbitrary functions called from within the generator, do
1020 not rely upon this behavior.
1022 raise _DefGen_Return(val)
1026 def _inlineCallbacks(result, g, deferred):
1028 See L{inlineCallbacks}.
1030 # This function is complicated by the need to prevent unbounded recursion
1031 # arising from repeatedly yielding immediately ready deferreds. This while
1032 # loop and the waiting variable solve that by manually unfolding the
1035 waiting = [True, # waiting for result?
1040 # Send the last result back as the result of the yield expression.
1041 isFailure = isinstance(result, failure.Failure)
1043 result = result.throwExceptionIntoGenerator(g)
1045 result = g.send(result)
1046 except StopIteration:
1047 # fell off the end, or "return" statement
1048 deferred.callback(None)
1050 except _DefGen_Return, e:
1051 # returnValue() was called; time to give a result to the original
1052 # Deferred. First though, let's try to identify the potentially
1053 # confusing situation which results when returnValue() is
1054 # accidentally invoked from a different function, one that wasn't
1055 # decorated with @inlineCallbacks.
1057 # The traceback starts in this frame (the one for
1058 # _inlineCallbacks); the next one down should be the application
1060 appCodeTrace = exc_info()[2].tb_next
1062 # If we invoked this generator frame by throwing an exception
1063 # into it, then throwExceptionIntoGenerator will consume an
1064 # additional stack frame itself, so we need to skip that too.
1065 appCodeTrace = appCodeTrace.tb_next
1066 # Now that we've identified the frame being exited by the
1067 # exception, let's figure out if returnValue was called from it
1068 # directly. returnValue itself consumes a stack frame, so the
1069 # application code will have a tb_next, but it will *not* have a
1071 if appCodeTrace.tb_next.tb_next:
1072 # If returnValue was invoked non-local to the frame which it is
1073 # exiting, identify the frame that ultimately invoked
1074 # returnValue so that we can warn the user, as this behavior is
1076 ultimateTrace = appCodeTrace
1077 while ultimateTrace.tb_next.tb_next:
1078 ultimateTrace = ultimateTrace.tb_next
1079 filename = ultimateTrace.tb_frame.f_code.co_filename
1080 lineno = ultimateTrace.tb_lineno
1081 warnings.warn_explicit(
1082 "returnValue() in %r causing %r to exit: "
1083 "returnValue should only be invoked by functions decorated "
1084 "with inlineCallbacks" % (
1085 ultimateTrace.tb_frame.f_code.co_name,
1086 appCodeTrace.tb_frame.f_code.co_name),
1087 DeprecationWarning, filename, lineno)
1088 deferred.callback(e.value)
1094 if isinstance(result, Deferred):
1095 # a deferred was yielded, get the result.
1101 _inlineCallbacks(r, g, deferred)
1103 result.addBoth(gotResult)
1105 # Haven't called back yet, set flag so that we get reinvoked
1106 # and return from the loop
1111 # Reset waiting to initial values for next loop. gotResult uses
1112 # waiting, but this isn't a problem because gotResult is only
1113 # executed once, and if it hasn't been executed yet, the return
1114 # branch above would have been taken.
1125 def inlineCallbacks(f):
1127 WARNING: this function will not work in Python 2.4 and earlier!
1129 inlineCallbacks helps you write Deferred-using code that looks like a
1130 regular sequential function. This function uses features of Python 2.5
1131 generators. If you need to be compatible with Python 2.4 or before, use
1132 the L{deferredGenerator} function instead, which accomplishes the same
1133 thing, but with somewhat more boilerplate. For example::
1137 thing = yield makeSomeRequestResultingInDeferred()
1138 print thing #the result! hoorj!
1140 When you call anything that results in a L{Deferred}, you can simply yield it;
1141 your generator will automatically be resumed when the Deferred's result is
1142 available. The generator will be sent the result of the L{Deferred} with the
1143 'send' method on generators, or if the result was a failure, 'throw'.
1145 Things that are not L{Deferred}s may also be yielded, and your generator
1146 will be resumed with the same object sent back. This means C{yield}
1147 performs an operation roughly equivalent to L{maybeDeferred}.
1149 Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
1150 will result in the return value of the generator (or will fail with a
1151 failure object if your generator raises an unhandled exception). Note that
1152 you can't use C{return result} to return a value; use C{returnValue(result)}
1153 instead. Falling off the end of the generator, or simply using C{return}
1154 will cause the L{Deferred} to have a result of C{None}.
1156 Be aware that L{returnValue} will not accept a L{Deferred} as a parameter.
1157 If you believe the thing you'd like to return could be a L{Deferred}, do
1160 result = yield result
1163 The L{Deferred} returned from your deferred generator may errback if your
1164 generator raised an exception::
1168 thing = yield makeSomeRequestResultingInDeferred()
1169 if thing == 'I love Twisted':
1170 # will become the result of the Deferred
1171 returnValue('TWISTED IS GREAT!')
1173 # will trigger an errback
1174 raise Exception('DESTROY ALL LIFE')
1176 def unwindGenerator(*args, **kwargs):
1178 gen = f(*args, **kwargs)
1179 except _DefGen_Return:
1181 "inlineCallbacks requires %r to produce a generator; instead"
1182 "caught returnValue being used in a non-generator" % (f,))
1183 if not isinstance(gen, types.GeneratorType):
1185 "inlineCallbacks requires %r to produce a generator; "
1186 "instead got %r" % (f, gen))
1187 return _inlineCallbacks(None, gen, Deferred())
1188 return mergeFunctionMetadata(f, unwindGenerator)
1191 ## DeferredLock/DeferredQueue
1193 class _ConcurrencyPrimitive(object):
1198 def _releaseAndReturn(self, r):
1203 def run(*args, **kwargs):
1205 Acquire, run, release.
1207 This function takes a callable as its first argument and any
1208 number of other positional and keyword arguments. When the
1209 lock or semaphore is acquired, the callable will be invoked
1210 with those arguments.
1212 The callable may return a L{Deferred}; if it does, the lock or
1213 semaphore won't be released until that L{Deferred} fires.
1215 @return: L{Deferred} of function result.
1219 raise TypeError("run() takes at least 2 arguments, none given.")
1220 raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
1221 args[0].__class__.__name__,))
1225 def execute(ignoredResult):
1226 d = maybeDeferred(f, *args, **kwargs)
1227 d.addBoth(self._releaseAndReturn)
1231 d.addCallback(execute)
1236 class DeferredLock(_ConcurrencyPrimitive):
1238 A lock for event driven systems.
1240 @ivar locked: C{True} when this Lock has been acquired, false at all other
1241 times. Do not change this value, but it is useful to examine for the
1242 equivalent of a "non-blocking" acquisition.
1248 def _cancelAcquire(self, d):
1250 Remove a deferred d from our waiting list, as the deferred has been
1253 Note: We do not need to wrap this in a try/except to catch d not
1254 being in self.waiting because this canceller will not be called if
1255 d has fired. release() pops a deferred out of self.waiting and
1256 calls it, so the canceller will no longer be called.
1258 @param d: The deferred that has been canceled.
1260 self.waiting.remove(d)
1265 Attempt to acquire the lock. Returns a L{Deferred} that fires on
1266 lock acquisition with the L{DeferredLock} as the value. If the lock
1267 is locked, then the Deferred is placed at the end of a waiting list.
1269 @return: a L{Deferred} which fires on lock acquisition.
1270 @rtype: a L{Deferred}
1272 d = Deferred(canceller=self._cancelAcquire)
1274 self.waiting.append(d)
1283 Release the lock. If there is a waiting list, then the first
1284 L{Deferred} in that waiting list will be called back.
1286 Should be called by whomever did the L{acquire}() when the shared
1289 assert self.locked, "Tried to release an unlocked lock"
1292 # someone is waiting to acquire lock
1294 d = self.waiting.pop(0)
1299 class DeferredSemaphore(_ConcurrencyPrimitive):
1301 A semaphore for event driven systems.
1303 @ivar tokens: At most this many users may acquire this semaphore at
1305 @type tokens: C{int}
1307 @ivar limit: The difference between C{tokens} and the number of users
1308 which have currently acquired this semaphore.
1312 def __init__(self, tokens):
1313 _ConcurrencyPrimitive.__init__(self)
1315 raise ValueError("DeferredSemaphore requires tokens >= 1")
1316 self.tokens = tokens
1320 def _cancelAcquire(self, d):
1322 Remove a deferred d from our waiting list, as the deferred has been
1325 Note: We do not need to wrap this in a try/except to catch d not
1326 being in self.waiting because this canceller will not be called if
1327 d has fired. release() pops a deferred out of self.waiting and
1328 calls it, so the canceller will no longer be called.
1330 @param d: The deferred that has been canceled.
1332 self.waiting.remove(d)
1337 Attempt to acquire the token.
1339 @return: a L{Deferred} which fires on token acquisition.
1341 assert self.tokens >= 0, "Internal inconsistency?? tokens should never be negative"
1342 d = Deferred(canceller=self._cancelAcquire)
1344 self.waiting.append(d)
1346 self.tokens = self.tokens - 1
1355 Should be called by whoever did the L{acquire}() when the shared
1358 assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
1359 self.tokens = self.tokens + 1
1361 # someone is waiting to acquire token
1362 self.tokens = self.tokens - 1
1363 d = self.waiting.pop(0)
1368 class QueueOverflow(Exception):
1373 class QueueUnderflow(Exception):
1378 class DeferredQueue(object):
1380 An event driven queue.
1382 Objects may be added as usual to this queue. When an attempt is
1383 made to retrieve an object when the queue is empty, a L{Deferred} is
1384 returned which will fire when an object becomes available.
1386 @ivar size: The maximum number of objects to allow into the queue
1387 at a time. When an attempt to add a new object would exceed this
1388 limit, L{QueueOverflow} is raised synchronously. C{None} for no limit.
1390 @ivar backlog: The maximum number of L{Deferred} gets to allow at
1391 one time. When an attempt is made to get an object which would
1392 exceed this limit, L{QueueUnderflow} is raised synchronously. C{None}
1396 def __init__(self, size=None, backlog=None):
1400 self.backlog = backlog
1403 def _cancelGet(self, d):
1405 Remove a deferred d from our waiting list, as the deferred has been
1408 Note: We do not need to wrap this in a try/except to catch d not
1409 being in self.waiting because this canceller will not be called if
1410 d has fired. put() pops a deferred out of self.waiting and calls
1411 it, so the canceller will no longer be called.
1413 @param d: The deferred that has been canceled.
1415 self.waiting.remove(d)
1420 Add an object to this queue.
1422 @raise QueueOverflow: Too many objects are in this queue.
1425 self.waiting.pop(0).callback(obj)
1426 elif self.size is None or len(self.pending) < self.size:
1427 self.pending.append(obj)
1429 raise QueueOverflow()
1434 Attempt to retrieve and remove an object from the queue.
1436 @return: a L{Deferred} which fires with the next object available in
1439 @raise QueueUnderflow: Too many (more than C{backlog})
1440 L{Deferred}s are already waiting for an object from this queue.
1443 return succeed(self.pending.pop(0))
1444 elif self.backlog is None or len(self.waiting) < self.backlog:
1445 d = Deferred(canceller=self._cancelGet)
1446 self.waiting.append(d)
1449 raise QueueUnderflow()
1453 class AlreadyTryingToLockError(Exception):
1455 Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a
1456 single L{DeferredFilesystemLock}.
1461 class DeferredFilesystemLock(lockfile.FilesystemLock):
1463 A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is
1466 @ivar _scheduler: The object in charge of scheduling retries. In this
1467 implementation this is parameterized for testing.
1469 @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1471 @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage
1472 the next retry for aquiring the lock.
1474 @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout
1475 argument. This is in charge of timing out our attempt to acquire the
1483 def __init__(self, name, scheduler=None):
1485 @param name: The name of the lock to acquire
1486 @param scheduler: An object which provides L{IReactorTime}
1488 lockfile.FilesystemLock.__init__(self, name)
1490 if scheduler is None:
1491 from twisted.internet import reactor
1494 self._scheduler = scheduler
1497 def deferUntilLocked(self, timeout=None):
1499 Wait until we acquire this lock. This method is not safe for
1502 @type timeout: C{float} or C{int}
1503 @param timeout: the number of seconds after which to time out if the
1504 lock has not been acquired.
1506 @return: a L{Deferred} which will callback when the lock is acquired, or
1507 errback with a L{TimeoutError} after timing out or an
1508 L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1509 been called and not successfully locked the file.
1511 if self._tryLockCall is not None:
1513 AlreadyTryingToLockError(
1514 "deferUntilLocked isn't safe for concurrent use."))
1519 self._tryLockCall.cancel()
1520 self._tryLockCall = None
1521 self._timeoutCall = None
1526 d.errback(failure.Failure(
1527 TimeoutError("Timed out aquiring lock: %s after %fs" % (
1533 if self._timeoutCall is not None:
1534 self._timeoutCall.cancel()
1535 self._timeoutCall = None
1537 self._tryLockCall = None
1541 if timeout is not None and self._timeoutCall is None:
1542 self._timeoutCall = self._scheduler.callLater(
1543 timeout, _cancelLock)
1545 self._tryLockCall = self._scheduler.callLater(
1546 self._interval, _tryLock)
1554 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1555 "AlreadyCalledError", "TimeoutError", "gatherResults",
1557 "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1559 "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1560 "DeferredFilesystemLock", "AlreadyTryingToLockError",