Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / internet / defer.py
1 # -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Support for results that aren't immediately available.
7
8 Maintainer: Glyph Lefkowitz
9
10 @var _NO_RESULT: The result used to represent the fact that there is no
11     result. B{Never ever ever use this as an actual result for a Deferred}.  You
12     have been warned.
13
14 @var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred
15     chain.  Always accompanied by a Deferred instance in the args tuple pointing
16     at the Deferred which is chained to the Deferred which has this marker.
17 """
18
19 import traceback
20 import types
21 import warnings
22 from sys import exc_info
23
24 # Twisted imports
25 from twisted.python import log, failure, lockfile
26 from twisted.python.util import unsignedID, mergeFunctionMetadata
27
28
29
30 class AlreadyCalledError(Exception):
31     pass
32
33
34 class CancelledError(Exception):
35     """
36     This error is raised by default when a L{Deferred} is cancelled.
37     """
38
39
40 class TimeoutError(Exception):
41     """
42     This exception is deprecated.  It is used only by the deprecated
43     L{Deferred.setTimeout} method.
44     """
45
46
47
48 def logError(err):
49     log.err(err)
50     return err
51
52
53
54 def succeed(result):
55     """
56     Return a L{Deferred} that has already had C{.callback(result)} called.
57
58     This is useful when you're writing synchronous code to an
59     asynchronous interface: i.e., some code is calling you expecting a
60     L{Deferred} result, but you don't actually need to do anything
61     asynchronous. Just return C{defer.succeed(theResult)}.
62
63     See L{fail} for a version of this function that uses a failing
64     L{Deferred} rather than a successful one.
65
66     @param result: The result to give to the Deferred's 'callback'
67            method.
68
69     @rtype: L{Deferred}
70     """
71     d = Deferred()
72     d.callback(result)
73     return d
74
75
76
77 def fail(result=None):
78     """
79     Return a L{Deferred} that has already had C{.errback(result)} called.
80
81     See L{succeed}'s docstring for rationale.
82
83     @param result: The same argument that L{Deferred.errback} takes.
84
85     @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
86         current exception state.
87
88     @rtype: L{Deferred}
89     """
90     d = Deferred()
91     d.errback(result)
92     return d
93
94
95
96 def execute(callable, *args, **kw):
97     """
98     Create a L{Deferred} from a callable and arguments.
99
100     Call the given function with the given arguments.  Return a L{Deferred}
101     which has been fired with its callback as the result of that invocation
102     or its C{errback} with a L{Failure} for the exception thrown.
103     """
104     try:
105         result = callable(*args, **kw)
106     except:
107         return fail()
108     else:
109         return succeed(result)
110
111
112
113 def maybeDeferred(f, *args, **kw):
114     """
115     Invoke a function that may or may not return a L{Deferred}.
116
117     Call the given function with the given arguments.  If the returned
118     object is a L{Deferred}, return it.  If the returned object is a L{Failure},
119     wrap it with L{fail} and return it.  Otherwise, wrap it in L{succeed} and
120     return it.  If an exception is raised, convert it to a L{Failure}, wrap it
121     in L{fail}, and then return it.
122
123     @type f: Any callable
124     @param f: The callable to invoke
125
126     @param args: The arguments to pass to C{f}
127     @param kw: The keyword arguments to pass to C{f}
128
129     @rtype: L{Deferred}
130     @return: The result of the function call, wrapped in a L{Deferred} if
131     necessary.
132     """
133     try:
134         result = f(*args, **kw)
135     except:
136         return fail(failure.Failure(captureVars=Deferred.debug))
137
138     if isinstance(result, Deferred):
139         return result
140     elif isinstance(result, failure.Failure):
141         return fail(result)
142     else:
143         return succeed(result)
144
145
146
147 def timeout(deferred):
148     deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
149
150
151
152 def passthru(arg):
153     return arg
154
155
156
157 def setDebugging(on):
158     """
159     Enable or disable L{Deferred} debugging.
160
161     When debugging is on, the call stacks from creation and invocation are
162     recorded, and added to any L{AlreadyCalledErrors} we raise.
163     """
164     Deferred.debug=bool(on)
165
166
167
168 def getDebugging():
169     """
170     Determine whether L{Deferred} debugging is enabled.
171     """
172     return Deferred.debug
173
174
175 # See module docstring.
176 _NO_RESULT = object()
177 _CONTINUE = object()
178
179
180
181 class Deferred:
182     """
183     This is a callback which will be put off until later.
184
185     Why do we want this? Well, in cases where a function in a threaded
186     program would block until it gets a result, for Twisted it should
187     not block. Instead, it should return a L{Deferred}.
188
189     This can be implemented for protocols that run over the network by
190     writing an asynchronous protocol for L{twisted.internet}. For methods
191     that come from outside packages that are not under our control, we use
192     threads (see for example L{twisted.enterprise.adbapi}).
193
194     For more information about Deferreds, see doc/core/howto/defer.html or
195     U{http://twistedmatrix.com/documents/current/core/howto/defer.html}
196
197     When creating a Deferred, you may provide a canceller function, which
198     will be called by d.cancel() to let you do any clean-up necessary if the
199     user decides not to wait for the deferred to complete.
200
201     @ivar called: A flag which is C{False} until either C{callback} or
202         C{errback} is called and afterwards always C{True}.
203     @type called: C{bool}
204
205     @ivar paused: A counter of how many unmatched C{pause} calls have been made
206         on this instance.
207     @type paused: C{int}
208
209     @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism
210         which is C{True} if the Deferred has no canceller and has been
211         cancelled, C{False} otherwise.  If C{True}, it can be expected that
212         C{callback} or C{errback} will eventually be called and the result
213         should be silently discarded.
214     @type _suppressAlreadyCalled: C{bool}
215
216     @ivar _runningCallbacks: A flag which is C{True} while this instance is
217         executing its callback chain, used to stop recursive execution of
218         L{_runCallbacks}
219     @type _runningCallbacks: C{bool}
220
221     @ivar _chainedTo: If this Deferred is waiting for the result of another
222         Deferred, this is a reference to the other Deferred.  Otherwise, C{None}.
223     """
224
225     called = False
226     paused = 0
227     _debugInfo = None
228     _suppressAlreadyCalled = False
229
230     # Are we currently running a user-installed callback?  Meant to prevent
231     # recursive running of callbacks when a reentrant call to add a callback is
232     # used.
233     _runningCallbacks = False
234
235     # Keep this class attribute for now, for compatibility with code that
236     # sets it directly.
237     debug = False
238
239     _chainedTo = None
240
241     def __init__(self, canceller=None):
242         """
243         Initialize a L{Deferred}.
244
245         @param canceller: a callable used to stop the pending operation
246             scheduled by this L{Deferred} when L{Deferred.cancel} is
247             invoked. The canceller will be passed the deferred whose
248             cancelation is requested (i.e., self).
249
250             If a canceller is not given, or does not invoke its argument's
251             C{callback} or C{errback} method, L{Deferred.cancel} will
252             invoke L{Deferred.errback} with a L{CancelledError}.
253
254             Note that if a canceller is not given, C{callback} or
255             C{errback} may still be invoked exactly once, even though
256             defer.py will have already invoked C{errback}, as described
257             above.  This allows clients of code which returns a L{Deferred}
258             to cancel it without requiring the L{Deferred} instantiator to
259             provide any specific implementation support for cancellation.
260             New in 10.1.
261
262         @type canceller: a 1-argument callable which takes a L{Deferred}. The
263             return result is ignored.
264         """
265         self.callbacks = []
266         self._canceller = canceller
267         if self.debug:
268             self._debugInfo = DebugInfo()
269             self._debugInfo.creator = traceback.format_stack()[:-1]
270
271
272     def addCallbacks(self, callback, errback=None,
273                      callbackArgs=None, callbackKeywords=None,
274                      errbackArgs=None, errbackKeywords=None):
275         """
276         Add a pair of callbacks (success and error) to this L{Deferred}.
277
278         These will be executed when the 'master' callback is run.
279
280         @return: C{self}.
281         @rtype: a L{Deferred}
282         """
283         assert callable(callback)
284         assert errback == None or callable(errback)
285         cbs = ((callback, callbackArgs, callbackKeywords),
286                (errback or (passthru), errbackArgs, errbackKeywords))
287         self.callbacks.append(cbs)
288
289         if self.called:
290             self._runCallbacks()
291         return self
292
293
294     def addCallback(self, callback, *args, **kw):
295         """
296         Convenience method for adding just a callback.
297
298         See L{addCallbacks}.
299         """
300         return self.addCallbacks(callback, callbackArgs=args,
301                                  callbackKeywords=kw)
302
303
304     def addErrback(self, errback, *args, **kw):
305         """
306         Convenience method for adding just an errback.
307
308         See L{addCallbacks}.
309         """
310         return self.addCallbacks(passthru, errback,
311                                  errbackArgs=args,
312                                  errbackKeywords=kw)
313
314
315     def addBoth(self, callback, *args, **kw):
316         """
317         Convenience method for adding a single callable as both a callback
318         and an errback.
319
320         See L{addCallbacks}.
321         """
322         return self.addCallbacks(callback, callback,
323                                  callbackArgs=args, errbackArgs=args,
324                                  callbackKeywords=kw, errbackKeywords=kw)
325
326
327     def chainDeferred(self, d):
328         """
329         Chain another L{Deferred} to this L{Deferred}.
330
331         This method adds callbacks to this L{Deferred} to call C{d}'s callback
332         or errback, as appropriate. It is merely a shorthand way of performing
333         the following::
334
335             self.addCallbacks(d.callback, d.errback)
336
337         When you chain a deferred d2 to another deferred d1 with
338         d1.chainDeferred(d2), you are making d2 participate in the callback
339         chain of d1. Thus any event that fires d1 will also fire d2.
340         However, the converse is B{not} true; if d2 is fired d1 will not be
341         affected.
342
343         Note that unlike the case where chaining is caused by a L{Deferred}
344         being returned from a callback, it is possible to cause the call
345         stack size limit to be exceeded by chaining many L{Deferred}s
346         together with C{chainDeferred}.
347
348         @return: C{self}.
349         @rtype: a L{Deferred}
350         """
351         d._chainedTo = self
352         return self.addCallbacks(d.callback, d.errback)
353
354
355     def callback(self, result):
356         """
357         Run all success callbacks that have been added to this L{Deferred}.
358
359         Each callback will have its result passed as the first argument to
360         the next; this way, the callbacks act as a 'processing chain'.  If
361         the success-callback returns a L{Failure} or raises an L{Exception},
362         processing will continue on the *error* callback chain.  If a
363         callback (or errback) returns another L{Deferred}, this L{Deferred}
364         will be chained to it (and further callbacks will not run until that
365         L{Deferred} has a result).
366         """
367         assert not isinstance(result, Deferred)
368         self._startRunCallbacks(result)
369
370
371     def errback(self, fail=None):
372         """
373         Run all error callbacks that have been added to this L{Deferred}.
374
375         Each callback will have its result passed as the first
376         argument to the next; this way, the callbacks act as a
377         'processing chain'. Also, if the error-callback returns a non-Failure
378         or doesn't raise an L{Exception}, processing will continue on the
379         *success*-callback chain.
380
381         If the argument that's passed to me is not a L{failure.Failure} instance,
382         it will be embedded in one. If no argument is passed, a
383         L{failure.Failure} instance will be created based on the current
384         traceback stack.
385
386         Passing a string as `fail' is deprecated, and will be punished with
387         a warning message.
388
389         @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
390             no current exception state.
391         """
392         if fail is None:
393             fail = failure.Failure(captureVars=self.debug)
394         elif not isinstance(fail, failure.Failure):
395             fail = failure.Failure(fail)
396
397         self._startRunCallbacks(fail)
398
399
400     def pause(self):
401         """
402         Stop processing on a L{Deferred} until L{unpause}() is called.
403         """
404         self.paused = self.paused + 1
405
406
407     def unpause(self):
408         """
409         Process all callbacks made since L{pause}() was called.
410         """
411         self.paused = self.paused - 1
412         if self.paused:
413             return
414         if self.called:
415             self._runCallbacks()
416
417
418     def cancel(self):
419         """
420         Cancel this L{Deferred}.
421
422         If the L{Deferred} has not yet had its C{errback} or C{callback} method
423         invoked, call the canceller function provided to the constructor. If
424         that function does not invoke C{callback} or C{errback}, or if no
425         canceller function was provided, errback with L{CancelledError}.
426
427         If this L{Deferred} is waiting on another L{Deferred}, forward the
428         cancellation to the other L{Deferred}.
429         """
430         if not self.called:
431             canceller = self._canceller
432             if canceller:
433                 canceller(self)
434             else:
435                 # Arrange to eat the callback that will eventually be fired
436                 # since there was no real canceller.
437                 self._suppressAlreadyCalled = True
438             if not self.called:
439                 # There was no canceller, or the canceller didn't call
440                 # callback or errback.
441                 self.errback(failure.Failure(CancelledError()))
442         elif isinstance(self.result, Deferred):
443             # Waiting for another deferred -- cancel it instead.
444             self.result.cancel()
445
446
447     def _startRunCallbacks(self, result):
448         if self.called:
449             if self._suppressAlreadyCalled:
450                 self._suppressAlreadyCalled = False
451                 return
452             if self.debug:
453                 if self._debugInfo is None:
454                     self._debugInfo = DebugInfo()
455                 extra = "\n" + self._debugInfo._getDebugTracebacks()
456                 raise AlreadyCalledError(extra)
457             raise AlreadyCalledError
458         if self.debug:
459             if self._debugInfo is None:
460                 self._debugInfo = DebugInfo()
461             self._debugInfo.invoker = traceback.format_stack()[:-2]
462         self.called = True
463         self.result = result
464         self._runCallbacks()
465
466
467     def _continuation(self):
468         """
469         Build a tuple of callback and errback with L{_continue} to be used by
470         L{_addContinue} and L{_removeContinue} on another Deferred.
471         """
472         return ((_CONTINUE, (self,), None),
473                 (_CONTINUE, (self,), None))
474
475
476     def _runCallbacks(self):
477         """
478         Run the chain of callbacks once a result is available.
479
480         This consists of a simple loop over all of the callbacks, calling each
481         with the current result and making the current result equal to the
482         return value (or raised exception) of that call.
483
484         If C{self._runningCallbacks} is true, this loop won't run at all, since
485         it is already running above us on the call stack.  If C{self.paused} is
486         true, the loop also won't run, because that's what it means to be
487         paused.
488
489         The loop will terminate before processing all of the callbacks if a
490         C{Deferred} without a result is encountered.
491
492         If a C{Deferred} I{with} a result is encountered, that result is taken
493         and the loop proceeds.
494
495         @note: The implementation is complicated slightly by the fact that
496             chaining (associating two Deferreds with each other such that one
497             will wait for the result of the other, as happens when a Deferred is
498             returned from a callback on another Deferred) is supported
499             iteratively rather than recursively, to avoid running out of stack
500             frames when processing long chains.
501         """
502         if self._runningCallbacks:
503             # Don't recursively run callbacks
504             return
505
506         # Keep track of all the Deferreds encountered while propagating results
507         # up a chain.  The way a Deferred gets onto this stack is by having
508         # added its _continuation() to the callbacks list of a second Deferred
509         # and then that second Deferred being fired.  ie, if ever had _chainedTo
510         # set to something other than None, you might end up on this stack.
511         chain = [self]
512
513         while chain:
514             current = chain[-1]
515
516             if current.paused:
517                 # This Deferred isn't going to produce a result at all.  All the
518                 # Deferreds up the chain waiting on it will just have to...
519                 # wait.
520                 return
521
522             finished = True
523             current._chainedTo = None
524             while current.callbacks:
525                 item = current.callbacks.pop(0)
526                 callback, args, kw = item[
527                     isinstance(current.result, failure.Failure)]
528                 args = args or ()
529                 kw = kw or {}
530
531                 # Avoid recursion if we can.
532                 if callback is _CONTINUE:
533                     # Give the waiting Deferred our current result and then
534                     # forget about that result ourselves.
535                     chainee = args[0]
536                     chainee.result = current.result
537                     current.result = None
538                     # Making sure to update _debugInfo
539                     if current._debugInfo is not None:
540                         current._debugInfo.failResult = None
541                     chainee.paused -= 1
542                     chain.append(chainee)
543                     # Delay cleaning this Deferred and popping it from the chain
544                     # until after we've dealt with chainee.
545                     finished = False
546                     break
547
548                 try:
549                     current._runningCallbacks = True
550                     try:
551                         current.result = callback(current.result, *args, **kw)
552                     finally:
553                         current._runningCallbacks = False
554                 except:
555                     # Including full frame information in the Failure is quite
556                     # expensive, so we avoid it unless self.debug is set.
557                     current.result = failure.Failure(captureVars=self.debug)
558                 else:
559                     if isinstance(current.result, Deferred):
560                         # The result is another Deferred.  If it has a result,
561                         # we can take it and keep going.
562                         resultResult = getattr(current.result, 'result', _NO_RESULT)
563                         if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
564                             # Nope, it didn't.  Pause and chain.
565                             current.pause()
566                             current._chainedTo = current.result
567                             # Note: current.result has no result, so it's not
568                             # running its callbacks right now.  Therefore we can
569                             # append to the callbacks list directly instead of
570                             # using addCallbacks.
571                             current.result.callbacks.append(current._continuation())
572                             break
573                         else:
574                             # Yep, it did.  Steal it.
575                             current.result.result = None
576                             # Make sure _debugInfo's failure state is updated.
577                             if current.result._debugInfo is not None:
578                                 current.result._debugInfo.failResult = None
579                             current.result = resultResult
580
581             if finished:
582                 # As much of the callback chain - perhaps all of it - as can be
583                 # processed right now has been.  The current Deferred is waiting on
584                 # another Deferred or for more callbacks.  Before finishing with it,
585                 # make sure its _debugInfo is in the proper state.
586                 if isinstance(current.result, failure.Failure):
587                     # Stash the Failure in the _debugInfo for unhandled error
588                     # reporting.
589                     current.result.cleanFailure()
590                     if current._debugInfo is None:
591                         current._debugInfo = DebugInfo()
592                     current._debugInfo.failResult = current.result
593                 else:
594                     # Clear out any Failure in the _debugInfo, since the result
595                     # is no longer a Failure.
596                     if current._debugInfo is not None:
597                         current._debugInfo.failResult = None
598
599                 # This Deferred is done, pop it from the chain and move back up
600                 # to the Deferred which supplied us with our result.
601                 chain.pop()
602
603
604     def __str__(self):
605         """
606         Return a string representation of this C{Deferred}.
607         """
608         cname = self.__class__.__name__
609         result = getattr(self, 'result', _NO_RESULT)
610         myID = hex(unsignedID(self))
611         if self._chainedTo is not None:
612             result = ' waiting on Deferred at %s' % (hex(unsignedID(self._chainedTo)),)
613         elif result is _NO_RESULT:
614             result = ''
615         else:
616             result = ' current result: %r' % (result,)
617         return "<%s at %s%s>" % (cname, myID, result)
618     __repr__ = __str__
619
620
621
622 class DebugInfo:
623     """
624     Deferred debug helper.
625     """
626
627     failResult = None
628
629     def _getDebugTracebacks(self):
630         info = ''
631         if hasattr(self, "creator"):
632             info += " C: Deferred was created:\n C:"
633             info += "".join(self.creator).rstrip().replace("\n","\n C:")
634             info += "\n"
635         if hasattr(self, "invoker"):
636             info += " I: First Invoker was:\n I:"
637             info += "".join(self.invoker).rstrip().replace("\n","\n I:")
638             info += "\n"
639         return info
640
641
642     def __del__(self):
643         """
644         Print tracebacks and die.
645
646         If the *last* (and I do mean *last*) callback leaves me in an error
647         state, print a traceback (if said errback is a L{Failure}).
648         """
649         if self.failResult is not None:
650             log.msg("Unhandled error in Deferred:", isError=True)
651             debugInfo = self._getDebugTracebacks()
652             if debugInfo != '':
653                 log.msg("(debug: " + debugInfo + ")", isError=True)
654             log.err(self.failResult)
655
656
657
658 class FirstError(Exception):
659     """
660     First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
661
662     @ivar subFailure: The L{Failure} that occurred.
663     @type subFailure: L{Failure}
664
665     @ivar index: The index of the L{Deferred} in the L{DeferredList} where
666         it happened.
667     @type index: C{int}
668     """
669     def __init__(self, failure, index):
670         Exception.__init__(self, failure, index)
671         self.subFailure = failure
672         self.index = index
673
674
675     def __repr__(self):
676         """
677         The I{repr} of L{FirstError} instances includes the repr of the
678         wrapped failure's exception and the index of the L{FirstError}.
679         """
680         return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
681
682
683     def __str__(self):
684         """
685         The I{str} of L{FirstError} instances includes the I{str} of the
686         entire wrapped failure (including its traceback and exception) and
687         the index of the L{FirstError}.
688         """
689         return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
690
691
692     def __cmp__(self, other):
693         """
694         Comparison between L{FirstError} and other L{FirstError} instances
695         is defined as the comparison of the index and sub-failure of each
696         instance.  L{FirstError} instances don't compare equal to anything
697         that isn't a L{FirstError} instance.
698
699         @since: 8.2
700         """
701         if isinstance(other, FirstError):
702             return cmp(
703                 (self.index, self.subFailure),
704                 (other.index, other.subFailure))
705         return -1
706
707
708
709 class DeferredList(Deferred):
710     """
711     L{DeferredList} is a tool for collecting the results of several Deferreds.
712
713     This tracks a list of L{Deferred}s for their results, and makes a single
714     callback when they have all completed.  By default, the ultimate result is a
715     list of (success, result) tuples, 'success' being a boolean.
716     L{DeferredList} exposes the same API that L{Deferred} does, so callbacks and
717     errbacks can be added to it in the same way.
718
719     L{DeferredList} is implemented by adding callbacks and errbacks to each
720     L{Deferred} in the list passed to it.  This means callbacks and errbacks
721     added to the Deferreds before they are passed to L{DeferredList} will change
722     the result that L{DeferredList} sees (i.e., L{DeferredList} is not special).
723     Callbacks and errbacks can also be added to the Deferreds after they are
724     passed to L{DeferredList} and L{DeferredList} may change the result that
725     they see.
726
727     See the documentation for the C{__init__} arguments for more information.
728     """
729
730     fireOnOneCallback = False
731     fireOnOneErrback = False
732
733     def __init__(self, deferredList, fireOnOneCallback=False,
734                  fireOnOneErrback=False, consumeErrors=False):
735         """
736         Initialize a DeferredList.
737
738         @param deferredList: The list of deferreds to track.
739         @type deferredList:  C{list} of L{Deferred}s
740
741         @param fireOnOneCallback: (keyword param) a flag indicating that this
742             L{DeferredList} will fire when the first L{Deferred} in
743             C{deferredList} fires with a non-failure result without waiting for
744             any of the other Deferreds.  When this flag is set, the DeferredList
745             will fire with a two-tuple: the first element is the result of the
746             Deferred which fired; the second element is the index in
747             C{deferredList} of that Deferred.
748         @type fireOnOneCallback: C{bool}
749
750         @param fireOnOneErrback: (keyword param) a flag indicating that this
751             L{DeferredList} will fire when the first L{Deferred} in
752             C{deferredList} fires with a failure result without waiting for any
753             of the other Deferreds.  When this flag is set, if a Deferred in the
754             list errbacks, the DeferredList will errback with a L{FirstError}
755             failure wrapping the failure of that Deferred.
756         @type fireOnOneErrback: C{bool}
757
758         @param consumeErrors: (keyword param) a flag indicating that failures in
759             any of the included L{Deferreds} should not be propagated to
760             errbacks added to the individual L{Deferreds} after this
761             L{DeferredList} is constructed.  After constructing the
762             L{DeferredList}, any errors in the individual L{Deferred}s will be
763             converted to a callback result of C{None}.  This is useful to
764             prevent spurious 'Unhandled error in Deferred' messages from being
765             logged.  This does not prevent C{fireOnOneErrback} from working.
766         @type consumeErrors: C{bool}
767         """
768         self.resultList = [None] * len(deferredList)
769         Deferred.__init__(self)
770         if len(deferredList) == 0 and not fireOnOneCallback:
771             self.callback(self.resultList)
772
773         # These flags need to be set *before* attaching callbacks to the
774         # deferreds, because the callbacks use these flags, and will run
775         # synchronously if any of the deferreds are already fired.
776         self.fireOnOneCallback = fireOnOneCallback
777         self.fireOnOneErrback = fireOnOneErrback
778         self.consumeErrors = consumeErrors
779         self.finishedCount = 0
780
781         index = 0
782         for deferred in deferredList:
783             deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
784                                   callbackArgs=(index,SUCCESS),
785                                   errbackArgs=(index,FAILURE))
786             index = index + 1
787
788
789     def _cbDeferred(self, result, index, succeeded):
790         """
791         (internal) Callback for when one of my deferreds fires.
792         """
793         self.resultList[index] = (succeeded, result)
794
795         self.finishedCount += 1
796         if not self.called:
797             if succeeded == SUCCESS and self.fireOnOneCallback:
798                 self.callback((result, index))
799             elif succeeded == FAILURE and self.fireOnOneErrback:
800                 self.errback(failure.Failure(FirstError(result, index)))
801             elif self.finishedCount == len(self.resultList):
802                 self.callback(self.resultList)
803
804         if succeeded == FAILURE and self.consumeErrors:
805             result = None
806
807         return result
808
809
810
811 def _parseDListResult(l, fireOnOneErrback=False):
812     if __debug__:
813         for success, value in l:
814             assert success
815     return [x[1] for x in l]
816
817
818
819 def gatherResults(deferredList, consumeErrors=False):
820     """
821     Returns, via a L{Deferred}, a list with the results of the given
822     L{Deferred}s - in effect, a "join" of multiple deferred operations.
823
824     The returned L{Deferred} will fire when I{all} of the provided L{Deferred}s
825     have fired, or when any one of them has failed.
826
827     This differs from L{DeferredList} in that you don't need to parse
828     the result for success/failure.
829
830     @type deferredList:  C{list} of L{Deferred}s
831
832     @param consumeErrors: (keyword param) a flag, defaulting to False,
833         indicating that failures in any of the given L{Deferreds} should not be
834         propagated to errbacks added to the individual L{Deferreds} after this
835         L{gatherResults} invocation.  Any such errors in the individual
836         L{Deferred}s will be converted to a callback result of C{None}.  This
837         is useful to prevent spurious 'Unhandled error in Deferred' messages
838         from being logged.  This parameter is available since 11.1.0.
839     @type consumeErrors: C{bool}
840     """
841     d = DeferredList(deferredList, fireOnOneErrback=True,
842                                    consumeErrors=consumeErrors)
843     d.addCallback(_parseDListResult)
844     return d
845
846
847
848 # Constants for use with DeferredList
849
850 SUCCESS = True
851 FAILURE = False
852
853
854
855 ## deferredGenerator
856
857 class waitForDeferred:
858     """
859     See L{deferredGenerator}.
860     """
861
862     def __init__(self, d):
863         if not isinstance(d, Deferred):
864             raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
865         self.d = d
866
867
868     def getResult(self):
869         if isinstance(self.result, failure.Failure):
870             self.result.raiseException()
871         return self.result
872
873
874
875 def _deferGenerator(g, deferred):
876     """
877     See L{deferredGenerator}.
878     """
879     result = None
880
881     # This function is complicated by the need to prevent unbounded recursion
882     # arising from repeatedly yielding immediately ready deferreds.  This while
883     # loop and the waiting variable solve that by manually unfolding the
884     # recursion.
885
886     waiting = [True, # defgen is waiting for result?
887                None] # result
888
889     while 1:
890         try:
891             result = g.next()
892         except StopIteration:
893             deferred.callback(result)
894             return deferred
895         except:
896             deferred.errback()
897             return deferred
898
899         # Deferred.callback(Deferred) raises an error; we catch this case
900         # early here and give a nicer error message to the user in case
901         # they yield a Deferred.
902         if isinstance(result, Deferred):
903             return fail(TypeError("Yield waitForDeferred(d), not d!"))
904
905         if isinstance(result, waitForDeferred):
906             # a waitForDeferred was yielded, get the result.
907             # Pass result in so it don't get changed going around the loop
908             # This isn't a problem for waiting, as it's only reused if
909             # gotResult has already been executed.
910             def gotResult(r, result=result):
911                 result.result = r
912                 if waiting[0]:
913                     waiting[0] = False
914                     waiting[1] = r
915                 else:
916                     _deferGenerator(g, deferred)
917             result.d.addBoth(gotResult)
918             if waiting[0]:
919                 # Haven't called back yet, set flag so that we get reinvoked
920                 # and return from the loop
921                 waiting[0] = False
922                 return deferred
923             # Reset waiting to initial values for next loop
924             waiting[0] = True
925             waiting[1] = None
926
927             result = None
928
929
930
931 def deferredGenerator(f):
932     """
933     deferredGenerator and waitForDeferred help you write L{Deferred}-using code
934     that looks like a regular sequential function. If your code has a minimum
935     requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
936     which can accomplish the same thing in a more concise manner.
937
938     There are two important functions involved: L{waitForDeferred}, and
939     L{deferredGenerator}.  They are used together, like this::
940
941         @deferredGenerator
942         def thingummy():
943             thing = waitForDeferred(makeSomeRequestResultingInDeferred())
944             yield thing
945             thing = thing.getResult()
946             print thing #the result! hoorj!
947
948     L{waitForDeferred} returns something that you should immediately yield; when
949     your generator is resumed, calling C{thing.getResult()} will either give you
950     the result of the L{Deferred} if it was a success, or raise an exception if it
951     was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
952     not call it, I{your program will not work}.
953
954     L{deferredGenerator} takes one of these waitForDeferred-using generator
955     functions and converts it into a function that returns a L{Deferred}. The
956     result of the L{Deferred} will be the last value that your generator yielded
957     unless the last value is a L{waitForDeferred} instance, in which case the
958     result will be C{None}.  If the function raises an unhandled exception, the
959     L{Deferred} will errback instead.  Remember that C{return result} won't work;
960     use C{yield result; return} in place of that.
961
962     Note that not yielding anything from your generator will make the L{Deferred}
963     result in C{None}. Yielding a L{Deferred} from your generator is also an error
964     condition; always yield C{waitForDeferred(d)} instead.
965
966     The L{Deferred} returned from your deferred generator may also errback if your
967     generator raised an exception.  For example::
968
969         @deferredGenerator
970         def thingummy():
971             thing = waitForDeferred(makeSomeRequestResultingInDeferred())
972             yield thing
973             thing = thing.getResult()
974             if thing == 'I love Twisted':
975                 # will become the result of the Deferred
976                 yield 'TWISTED IS GREAT!'
977                 return
978             else:
979                 # will trigger an errback
980                 raise Exception('DESTROY ALL LIFE')
981
982     Put succinctly, these functions connect deferred-using code with this 'fake
983     blocking' style in both directions: L{waitForDeferred} converts from a
984     L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
985     'blocking' style to a L{Deferred}.
986     """
987
988     def unwindGenerator(*args, **kwargs):
989         return _deferGenerator(f(*args, **kwargs), Deferred())
990     return mergeFunctionMetadata(f, unwindGenerator)
991
992
993 ## inlineCallbacks
994
995 # BaseException is only in Py 2.5.
996 try:
997     BaseException
998 except NameError:
999     BaseException=Exception
1000
1001
1002
1003 class _DefGen_Return(BaseException):
1004     def __init__(self, value):
1005         self.value = value
1006
1007
1008
1009 def returnValue(val):
1010     """
1011     Return val from a L{inlineCallbacks} generator.
1012
1013     Note: this is currently implemented by raising an exception
1014     derived from L{BaseException}.  You might want to change any
1015     'except:' clauses to an 'except Exception:' clause so as not to
1016     catch this exception.
1017
1018     Also: while this function currently will work when called from
1019     within arbitrary functions called from within the generator, do
1020     not rely upon this behavior.
1021     """
1022     raise _DefGen_Return(val)
1023
1024
1025
1026 def _inlineCallbacks(result, g, deferred):
1027     """
1028     See L{inlineCallbacks}.
1029     """
1030     # This function is complicated by the need to prevent unbounded recursion
1031     # arising from repeatedly yielding immediately ready deferreds.  This while
1032     # loop and the waiting variable solve that by manually unfolding the
1033     # recursion.
1034
1035     waiting = [True, # waiting for result?
1036                None] # result
1037
1038     while 1:
1039         try:
1040             # Send the last result back as the result of the yield expression.
1041             isFailure = isinstance(result, failure.Failure)
1042             if isFailure:
1043                 result = result.throwExceptionIntoGenerator(g)
1044             else:
1045                 result = g.send(result)
1046         except StopIteration:
1047             # fell off the end, or "return" statement
1048             deferred.callback(None)
1049             return deferred
1050         except _DefGen_Return, e:
1051             # returnValue() was called; time to give a result to the original
1052             # Deferred.  First though, let's try to identify the potentially
1053             # confusing situation which results when returnValue() is
1054             # accidentally invoked from a different function, one that wasn't
1055             # decorated with @inlineCallbacks.
1056
1057             # The traceback starts in this frame (the one for
1058             # _inlineCallbacks); the next one down should be the application
1059             # code.
1060             appCodeTrace = exc_info()[2].tb_next
1061             if isFailure:
1062                 # If we invoked this generator frame by throwing an exception
1063                 # into it, then throwExceptionIntoGenerator will consume an
1064                 # additional stack frame itself, so we need to skip that too.
1065                 appCodeTrace = appCodeTrace.tb_next
1066             # Now that we've identified the frame being exited by the
1067             # exception, let's figure out if returnValue was called from it
1068             # directly.  returnValue itself consumes a stack frame, so the
1069             # application code will have a tb_next, but it will *not* have a
1070             # second tb_next.
1071             if appCodeTrace.tb_next.tb_next:
1072                 # If returnValue was invoked non-local to the frame which it is
1073                 # exiting, identify the frame that ultimately invoked
1074                 # returnValue so that we can warn the user, as this behavior is
1075                 # confusing.
1076                 ultimateTrace = appCodeTrace
1077                 while ultimateTrace.tb_next.tb_next:
1078                     ultimateTrace = ultimateTrace.tb_next
1079                 filename = ultimateTrace.tb_frame.f_code.co_filename
1080                 lineno = ultimateTrace.tb_lineno
1081                 warnings.warn_explicit(
1082                     "returnValue() in %r causing %r to exit: "
1083                     "returnValue should only be invoked by functions decorated "
1084                     "with inlineCallbacks" % (
1085                         ultimateTrace.tb_frame.f_code.co_name,
1086                         appCodeTrace.tb_frame.f_code.co_name),
1087                     DeprecationWarning, filename, lineno)
1088             deferred.callback(e.value)
1089             return deferred
1090         except:
1091             deferred.errback()
1092             return deferred
1093
1094         if isinstance(result, Deferred):
1095             # a deferred was yielded, get the result.
1096             def gotResult(r):
1097                 if waiting[0]:
1098                     waiting[0] = False
1099                     waiting[1] = r
1100                 else:
1101                     _inlineCallbacks(r, g, deferred)
1102
1103             result.addBoth(gotResult)
1104             if waiting[0]:
1105                 # Haven't called back yet, set flag so that we get reinvoked
1106                 # and return from the loop
1107                 waiting[0] = False
1108                 return deferred
1109
1110             result = waiting[1]
1111             # Reset waiting to initial values for next loop.  gotResult uses
1112             # waiting, but this isn't a problem because gotResult is only
1113             # executed once, and if it hasn't been executed yet, the return
1114             # branch above would have been taken.
1115
1116
1117             waiting[0] = True
1118             waiting[1] = None
1119
1120
1121     return deferred
1122
1123
1124
1125 def inlineCallbacks(f):
1126     """
1127     WARNING: this function will not work in Python 2.4 and earlier!
1128
1129     inlineCallbacks helps you write Deferred-using code that looks like a
1130     regular sequential function. This function uses features of Python 2.5
1131     generators.  If you need to be compatible with Python 2.4 or before, use
1132     the L{deferredGenerator} function instead, which accomplishes the same
1133     thing, but with somewhat more boilerplate.  For example::
1134
1135         @inlineCallBacks
1136         def thingummy():
1137             thing = yield makeSomeRequestResultingInDeferred()
1138             print thing #the result! hoorj!
1139
1140     When you call anything that results in a L{Deferred}, you can simply yield it;
1141     your generator will automatically be resumed when the Deferred's result is
1142     available. The generator will be sent the result of the L{Deferred} with the
1143     'send' method on generators, or if the result was a failure, 'throw'.
1144
1145     Things that are not L{Deferred}s may also be yielded, and your generator
1146     will be resumed with the same object sent back. This means C{yield}
1147     performs an operation roughly equivalent to L{maybeDeferred}.
1148
1149     Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
1150     will result in the return value of the generator (or will fail with a
1151     failure object if your generator raises an unhandled exception). Note that
1152     you can't use C{return result} to return a value; use C{returnValue(result)}
1153     instead. Falling off the end of the generator, or simply using C{return}
1154     will cause the L{Deferred} to have a result of C{None}.
1155
1156     Be aware that L{returnValue} will not accept a L{Deferred} as a parameter.
1157     If you believe the thing you'd like to return could be a L{Deferred}, do
1158     this::
1159
1160         result = yield result
1161         returnValue(result)
1162
1163     The L{Deferred} returned from your deferred generator may errback if your
1164     generator raised an exception::
1165
1166         @inlineCallbacks
1167         def thingummy():
1168             thing = yield makeSomeRequestResultingInDeferred()
1169             if thing == 'I love Twisted':
1170                 # will become the result of the Deferred
1171                 returnValue('TWISTED IS GREAT!')
1172             else:
1173                 # will trigger an errback
1174                 raise Exception('DESTROY ALL LIFE')
1175     """
1176     def unwindGenerator(*args, **kwargs):
1177         try:
1178             gen = f(*args, **kwargs)
1179         except _DefGen_Return:
1180             raise TypeError(
1181                 "inlineCallbacks requires %r to produce a generator; instead"
1182                 "caught returnValue being used in a non-generator" % (f,))
1183         if not isinstance(gen, types.GeneratorType):
1184             raise TypeError(
1185                 "inlineCallbacks requires %r to produce a generator; "
1186                 "instead got %r" % (f, gen))
1187         return _inlineCallbacks(None, gen, Deferred())
1188     return mergeFunctionMetadata(f, unwindGenerator)
1189
1190
1191 ## DeferredLock/DeferredQueue
1192
1193 class _ConcurrencyPrimitive(object):
1194     def __init__(self):
1195         self.waiting = []
1196
1197
1198     def _releaseAndReturn(self, r):
1199         self.release()
1200         return r
1201
1202
1203     def run(*args, **kwargs):
1204         """
1205         Acquire, run, release.
1206
1207         This function takes a callable as its first argument and any
1208         number of other positional and keyword arguments.  When the
1209         lock or semaphore is acquired, the callable will be invoked
1210         with those arguments.
1211
1212         The callable may return a L{Deferred}; if it does, the lock or
1213         semaphore won't be released until that L{Deferred} fires.
1214
1215         @return: L{Deferred} of function result.
1216         """
1217         if len(args) < 2:
1218             if not args:
1219                 raise TypeError("run() takes at least 2 arguments, none given.")
1220             raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
1221                 args[0].__class__.__name__,))
1222         self, f = args[:2]
1223         args = args[2:]
1224
1225         def execute(ignoredResult):
1226             d = maybeDeferred(f, *args, **kwargs)
1227             d.addBoth(self._releaseAndReturn)
1228             return d
1229
1230         d = self.acquire()
1231         d.addCallback(execute)
1232         return d
1233
1234
1235
1236 class DeferredLock(_ConcurrencyPrimitive):
1237     """
1238     A lock for event driven systems.
1239
1240     @ivar locked: C{True} when this Lock has been acquired, false at all other
1241         times.  Do not change this value, but it is useful to examine for the
1242         equivalent of a "non-blocking" acquisition.
1243     """
1244
1245     locked = False
1246
1247
1248     def _cancelAcquire(self, d):
1249         """
1250         Remove a deferred d from our waiting list, as the deferred has been
1251         canceled.
1252
1253         Note: We do not need to wrap this in a try/except to catch d not
1254         being in self.waiting because this canceller will not be called if
1255         d has fired. release() pops a deferred out of self.waiting and
1256         calls it, so the canceller will no longer be called.
1257
1258         @param d: The deferred that has been canceled.
1259         """
1260         self.waiting.remove(d)
1261
1262
1263     def acquire(self):
1264         """
1265         Attempt to acquire the lock.  Returns a L{Deferred} that fires on
1266         lock acquisition with the L{DeferredLock} as the value.  If the lock
1267         is locked, then the Deferred is placed at the end of a waiting list.
1268
1269         @return: a L{Deferred} which fires on lock acquisition.
1270         @rtype: a L{Deferred}
1271         """
1272         d = Deferred(canceller=self._cancelAcquire)
1273         if self.locked:
1274             self.waiting.append(d)
1275         else:
1276             self.locked = True
1277             d.callback(self)
1278         return d
1279
1280
1281     def release(self):
1282         """
1283         Release the lock.  If there is a waiting list, then the first
1284         L{Deferred} in that waiting list will be called back.
1285
1286         Should be called by whomever did the L{acquire}() when the shared
1287         resource is free.
1288         """
1289         assert self.locked, "Tried to release an unlocked lock"
1290         self.locked = False
1291         if self.waiting:
1292             # someone is waiting to acquire lock
1293             self.locked = True
1294             d = self.waiting.pop(0)
1295             d.callback(self)
1296
1297
1298
1299 class DeferredSemaphore(_ConcurrencyPrimitive):
1300     """
1301     A semaphore for event driven systems.
1302
1303     @ivar tokens: At most this many users may acquire this semaphore at
1304         once.
1305     @type tokens: C{int}
1306
1307     @ivar limit: The difference between C{tokens} and the number of users
1308         which have currently acquired this semaphore.
1309     @type limit: C{int}
1310     """
1311
1312     def __init__(self, tokens):
1313         _ConcurrencyPrimitive.__init__(self)
1314         if tokens < 1:
1315             raise ValueError("DeferredSemaphore requires tokens >= 1")
1316         self.tokens = tokens
1317         self.limit = tokens
1318
1319
1320     def _cancelAcquire(self, d):
1321         """
1322         Remove a deferred d from our waiting list, as the deferred has been
1323         canceled.
1324
1325         Note: We do not need to wrap this in a try/except to catch d not
1326         being in self.waiting because this canceller will not be called if
1327         d has fired. release() pops a deferred out of self.waiting and
1328         calls it, so the canceller will no longer be called.
1329
1330         @param d: The deferred that has been canceled.
1331         """
1332         self.waiting.remove(d)
1333
1334
1335     def acquire(self):
1336         """
1337         Attempt to acquire the token.
1338
1339         @return: a L{Deferred} which fires on token acquisition.
1340         """
1341         assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
1342         d = Deferred(canceller=self._cancelAcquire)
1343         if not self.tokens:
1344             self.waiting.append(d)
1345         else:
1346             self.tokens = self.tokens - 1
1347             d.callback(self)
1348         return d
1349
1350
1351     def release(self):
1352         """
1353         Release the token.
1354
1355         Should be called by whoever did the L{acquire}() when the shared
1356         resource is free.
1357         """
1358         assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
1359         self.tokens = self.tokens + 1
1360         if self.waiting:
1361             # someone is waiting to acquire token
1362             self.tokens = self.tokens - 1
1363             d = self.waiting.pop(0)
1364             d.callback(self)
1365
1366
1367
1368 class QueueOverflow(Exception):
1369     pass
1370
1371
1372
1373 class QueueUnderflow(Exception):
1374     pass
1375
1376
1377
1378 class DeferredQueue(object):
1379     """
1380     An event driven queue.
1381
1382     Objects may be added as usual to this queue.  When an attempt is
1383     made to retrieve an object when the queue is empty, a L{Deferred} is
1384     returned which will fire when an object becomes available.
1385
1386     @ivar size: The maximum number of objects to allow into the queue
1387     at a time.  When an attempt to add a new object would exceed this
1388     limit, L{QueueOverflow} is raised synchronously.  C{None} for no limit.
1389
1390     @ivar backlog: The maximum number of L{Deferred} gets to allow at
1391     one time.  When an attempt is made to get an object which would
1392     exceed this limit, L{QueueUnderflow} is raised synchronously.  C{None}
1393     for no limit.
1394     """
1395
1396     def __init__(self, size=None, backlog=None):
1397         self.waiting = []
1398         self.pending = []
1399         self.size = size
1400         self.backlog = backlog
1401
1402
1403     def _cancelGet(self, d):
1404         """
1405         Remove a deferred d from our waiting list, as the deferred has been
1406         canceled.
1407
1408         Note: We do not need to wrap this in a try/except to catch d not
1409         being in self.waiting because this canceller will not be called if
1410         d has fired. put() pops a deferred out of self.waiting and calls
1411         it, so the canceller will no longer be called.
1412
1413         @param d: The deferred that has been canceled.
1414         """
1415         self.waiting.remove(d)
1416
1417
1418     def put(self, obj):
1419         """
1420         Add an object to this queue.
1421
1422         @raise QueueOverflow: Too many objects are in this queue.
1423         """
1424         if self.waiting:
1425             self.waiting.pop(0).callback(obj)
1426         elif self.size is None or len(self.pending) < self.size:
1427             self.pending.append(obj)
1428         else:
1429             raise QueueOverflow()
1430
1431
1432     def get(self):
1433         """
1434         Attempt to retrieve and remove an object from the queue.
1435
1436         @return: a L{Deferred} which fires with the next object available in
1437         the queue.
1438
1439         @raise QueueUnderflow: Too many (more than C{backlog})
1440         L{Deferred}s are already waiting for an object from this queue.
1441         """
1442         if self.pending:
1443             return succeed(self.pending.pop(0))
1444         elif self.backlog is None or len(self.waiting) < self.backlog:
1445             d = Deferred(canceller=self._cancelGet)
1446             self.waiting.append(d)
1447             return d
1448         else:
1449             raise QueueUnderflow()
1450
1451
1452
1453 class AlreadyTryingToLockError(Exception):
1454     """
1455     Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a
1456     single L{DeferredFilesystemLock}.
1457     """
1458
1459
1460
1461 class DeferredFilesystemLock(lockfile.FilesystemLock):
1462     """
1463     A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is
1464     acquired.
1465
1466     @ivar _scheduler: The object in charge of scheduling retries. In this
1467         implementation this is parameterized for testing.
1468
1469     @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1470
1471     @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage
1472         the next retry for aquiring the lock.
1473
1474     @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout
1475         argument.  This is in charge of timing out our attempt to acquire the
1476         lock.
1477     """
1478     _interval = 1
1479     _tryLockCall = None
1480     _timeoutCall = None
1481
1482
1483     def __init__(self, name, scheduler=None):
1484         """
1485         @param name: The name of the lock to acquire
1486         @param scheduler: An object which provides L{IReactorTime}
1487         """
1488         lockfile.FilesystemLock.__init__(self, name)
1489
1490         if scheduler is None:
1491             from twisted.internet import reactor
1492             scheduler = reactor
1493
1494         self._scheduler = scheduler
1495
1496
1497     def deferUntilLocked(self, timeout=None):
1498         """
1499         Wait until we acquire this lock.  This method is not safe for
1500         concurrent use.
1501
1502         @type timeout: C{float} or C{int}
1503         @param timeout: the number of seconds after which to time out if the
1504             lock has not been acquired.
1505
1506         @return: a L{Deferred} which will callback when the lock is acquired, or
1507             errback with a L{TimeoutError} after timing out or an
1508             L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1509             been called and not successfully locked the file.
1510         """
1511         if self._tryLockCall is not None:
1512             return fail(
1513                 AlreadyTryingToLockError(
1514                     "deferUntilLocked isn't safe for concurrent use."))
1515
1516         d = Deferred()
1517
1518         def _cancelLock():
1519             self._tryLockCall.cancel()
1520             self._tryLockCall = None
1521             self._timeoutCall = None
1522
1523             if self.lock():
1524                 d.callback(None)
1525             else:
1526                 d.errback(failure.Failure(
1527                         TimeoutError("Timed out aquiring lock: %s after %fs" % (
1528                                 self.name,
1529                                 timeout))))
1530
1531         def _tryLock():
1532             if self.lock():
1533                 if self._timeoutCall is not None:
1534                     self._timeoutCall.cancel()
1535                     self._timeoutCall = None
1536
1537                 self._tryLockCall = None
1538
1539                 d.callback(None)
1540             else:
1541                 if timeout is not None and self._timeoutCall is None:
1542                     self._timeoutCall = self._scheduler.callLater(
1543                         timeout, _cancelLock)
1544
1545                 self._tryLockCall = self._scheduler.callLater(
1546                     self._interval, _tryLock)
1547
1548         _tryLock()
1549
1550         return d
1551
1552
1553
1554 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1555            "AlreadyCalledError", "TimeoutError", "gatherResults",
1556            "maybeDeferred",
1557            "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1558            "returnValue",
1559            "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1560            "DeferredFilesystemLock", "AlreadyTryingToLockError",
1561           ]