Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / task.py
1 # -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Scheduling utility methods and classes.
7
8 @author: Jp Calderone
9 """
10
11 __metaclass__ = type
12
13 import time
14
15 from zope.interface import implements
16
17 from twisted.python import reflect
18 from twisted.python.failure import Failure
19
20 from twisted.internet import base, defer
21 from twisted.internet.interfaces import IReactorTime
22
23
24 class LoopingCall:
25     """Call a function repeatedly.
26
27     If C{f} returns a deferred, rescheduling will not take place until the
28     deferred has fired. The result value is ignored.
29
30     @ivar f: The function to call.
31     @ivar a: A tuple of arguments to pass the function.
32     @ivar kw: A dictionary of keyword arguments to pass to the function.
33     @ivar clock: A provider of
34         L{twisted.internet.interfaces.IReactorTime}.  The default is
35         L{twisted.internet.reactor}. Feel free to set this to
36         something else, but it probably ought to be set *before*
37         calling L{start}.
38
39     @type running: C{bool}
40     @ivar running: A flag which is C{True} while C{f} is scheduled to be called
41         (or is currently being called). It is set to C{True} when L{start} is
42         called and set to C{False} when L{stop} is called or if C{f} raises an
43         exception. In either case, it will be C{False} by the time the
44         C{Deferred} returned by L{start} fires its callback or errback.
45
46     @type _expectNextCallAt: C{float}
47     @ivar _expectNextCallAt: The time at which this instance most recently
48         scheduled itself to run.
49
50     @type _realLastTime: C{float}
51     @ivar _realLastTime: When counting skips, the time at which the skip
52         counter was last invoked.
53
54     @type _runAtStart: C{bool}
55     @ivar _runAtStart: A flag indicating whether the 'now' argument was passed
56         to L{LoopingCall.start}.
57     """
58
59     call = None
60     running = False
61     deferred = None
62     interval = None
63     _expectNextCallAt = 0.0
64     _runAtStart = False
65     starttime = None
66
67     def __init__(self, f, *a, **kw):
68         self.f = f
69         self.a = a
70         self.kw = kw
71         from twisted.internet import reactor
72         self.clock = reactor
73
74
75     def withCount(cls, countCallable):
76         """
77         An alternate constructor for L{LoopingCall} that makes available the
78         number of calls which should have occurred since it was last invoked.
79
80         Note that this number is an C{int} value; It represents the discrete
81         number of calls that should have been made.  For example, if you are
82         using a looping call to display an animation with discrete frames, this
83         number would be the number of frames to advance.
84
85         The count is normally 1, but can be higher. For example, if the reactor
86         is blocked and takes too long to invoke the L{LoopingCall}, a Deferred
87         returned from a previous call is not fired before an interval has
88         elapsed, or if the callable itself blocks for longer than an interval,
89         preventing I{itself} from being called.
90
91         @param countCallable: A callable that will be invoked each time the
92             resulting LoopingCall is run, with an integer specifying the number
93             of calls that should have been invoked.
94
95         @type countCallable: 1-argument callable which takes an C{int}
96
97         @return: An instance of L{LoopingCall} with call counting enabled,
98             which provides the count as the first positional argument.
99
100         @rtype: L{LoopingCall}
101
102         @since: 9.0
103         """
104
105         def counter():
106             now = self.clock.seconds()
107             lastTime = self._realLastTime
108             if lastTime is None:
109                 lastTime = self.starttime
110                 if self._runAtStart:
111                     lastTime -= self.interval
112             self._realLastTime = now
113             lastInterval = self._intervalOf(lastTime)
114             thisInterval = self._intervalOf(now)
115             count = thisInterval - lastInterval
116             return countCallable(count)
117
118         self = cls(counter)
119
120         self._realLastTime = None
121
122         return self
123
124     withCount = classmethod(withCount)
125
126
127     def _intervalOf(self, t):
128         """
129         Determine the number of intervals passed as of the given point in
130         time.
131
132         @param t: The specified time (from the start of the L{LoopingCall}) to
133             be measured in intervals
134
135         @return: The C{int} number of intervals which have passed as of the
136             given point in time.
137         """
138         elapsedTime = t - self.starttime
139         intervalNum = int(elapsedTime / self.interval)
140         return intervalNum
141
142
143     def start(self, interval, now=True):
144         """
145         Start running function every interval seconds.
146
147         @param interval: The number of seconds between calls.  May be
148         less than one.  Precision will depend on the underlying
149         platform, the available hardware, and the load on the system.
150
151         @param now: If True, run this call right now.  Otherwise, wait
152         until the interval has elapsed before beginning.
153
154         @return: A Deferred whose callback will be invoked with
155         C{self} when C{self.stop} is called, or whose errback will be
156         invoked when the function raises an exception or returned a
157         deferred that has its errback invoked.
158         """
159         assert not self.running, ("Tried to start an already running "
160                                   "LoopingCall.")
161         if interval < 0:
162             raise ValueError, "interval must be >= 0"
163         self.running = True
164         d = self.deferred = defer.Deferred()
165         self.starttime = self.clock.seconds()
166         self._expectNextCallAt = self.starttime
167         self.interval = interval
168         self._runAtStart = now
169         if now:
170             self()
171         else:
172             self._reschedule()
173         return d
174
175     def stop(self):
176         """Stop running function.
177         """
178         assert self.running, ("Tried to stop a LoopingCall that was "
179                               "not running.")
180         self.running = False
181         if self.call is not None:
182             self.call.cancel()
183             self.call = None
184             d, self.deferred = self.deferred, None
185             d.callback(self)
186
187     def reset(self):
188         """
189         Skip the next iteration and reset the timer.
190
191         @since: 11.1
192         """
193         assert self.running, ("Tried to reset a LoopingCall that was "
194                               "not running.")
195         if self.call is not None:
196             self.call.cancel()
197             self.call = None
198             self._expectNextCallAt = self.clock.seconds()
199             self._reschedule()
200
201     def __call__(self):
202         def cb(result):
203             if self.running:
204                 self._reschedule()
205             else:
206                 d, self.deferred = self.deferred, None
207                 d.callback(self)
208
209         def eb(failure):
210             self.running = False
211             d, self.deferred = self.deferred, None
212             d.errback(failure)
213
214         self.call = None
215         d = defer.maybeDeferred(self.f, *self.a, **self.kw)
216         d.addCallback(cb)
217         d.addErrback(eb)
218
219
220     def _reschedule(self):
221         """
222         Schedule the next iteration of this looping call.
223         """
224         if self.interval == 0:
225             self.call = self.clock.callLater(0, self)
226             return
227
228         currentTime = self.clock.seconds()
229         # Find how long is left until the interval comes around again.
230         untilNextTime = (self._expectNextCallAt - currentTime) % self.interval
231         # Make sure it is in the future, in case more than one interval worth
232         # of time passed since the previous call was made.
233         nextTime = max(
234             self._expectNextCallAt + self.interval, currentTime + untilNextTime)
235         # If the interval falls on the current time exactly, skip it and
236         # schedule the call for the next interval.
237         if nextTime == currentTime:
238             nextTime += self.interval
239         self._expectNextCallAt = nextTime
240         self.call = self.clock.callLater(nextTime - currentTime, self)
241
242
243     def __repr__(self):
244         if hasattr(self.f, 'func_name'):
245             func = self.f.func_name
246             if hasattr(self.f, 'im_class'):
247                 func = self.f.im_class.__name__ + '.' + func
248         else:
249             func = reflect.safe_repr(self.f)
250
251         return 'LoopingCall<%r>(%s, *%s, **%s)' % (
252             self.interval, func, reflect.safe_repr(self.a),
253             reflect.safe_repr(self.kw))
254
255
256
257 class SchedulerError(Exception):
258     """
259     The operation could not be completed because the scheduler or one of its
260     tasks was in an invalid state.  This exception should not be raised
261     directly, but is a superclass of various scheduler-state-related
262     exceptions.
263     """
264
265
266
267 class SchedulerStopped(SchedulerError):
268     """
269     The operation could not complete because the scheduler was stopped in
270     progress or was already stopped.
271     """
272
273
274
275 class TaskFinished(SchedulerError):
276     """
277     The operation could not complete because the task was already completed,
278     stopped, encountered an error or otherwise permanently stopped running.
279     """
280
281
282
283 class TaskDone(TaskFinished):
284     """
285     The operation could not complete because the task was already completed.
286     """
287
288
289
290 class TaskStopped(TaskFinished):
291     """
292     The operation could not complete because the task was stopped.
293     """
294
295
296
297 class TaskFailed(TaskFinished):
298     """
299     The operation could not complete because the task died with an unhandled
300     error.
301     """
302
303
304
305 class NotPaused(SchedulerError):
306     """
307     This exception is raised when a task is resumed which was not previously
308     paused.
309     """
310
311
312
313 class _Timer(object):
314     MAX_SLICE = 0.01
315     def __init__(self):
316         self.end = time.time() + self.MAX_SLICE
317
318
319     def __call__(self):
320         return time.time() >= self.end
321
322
323
324 _EPSILON = 0.00000001
325 def _defaultScheduler(x):
326     from twisted.internet import reactor
327     return reactor.callLater(_EPSILON, x)
328
329
330 class CooperativeTask(object):
331     """
332     A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
333     paused, resumed, and stopped.  It can also have its completion (or
334     termination) monitored.
335
336     @see: L{CooperativeTask.cooperate}
337
338     @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
339         asked to do work.
340
341     @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
342         participates in, which is used to re-insert it upon resume.
343
344     @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
345         completes, fails, or finishes.
346
347     @type _deferreds: L{list}
348
349     @type _cooperator: L{Cooperator}
350
351     @ivar _pauseCount: the number of times that this L{CooperativeTask} has
352         been paused; if 0, it is running.
353
354     @type _pauseCount: L{int}
355
356     @ivar _completionState: The completion-state of this L{CooperativeTask}.
357         C{None} if the task is not yet completed, an instance of L{TaskStopped}
358         if C{stop} was called to stop this task early, of L{TaskFailed} if the
359         application code in the iterator raised an exception which caused it to
360         terminate, and of L{TaskDone} if it terminated normally via raising
361         L{StopIteration}.
362
363     @type _completionState: L{TaskFinished}
364     """
365
366     def __init__(self, iterator, cooperator):
367         """
368         A private constructor: to create a new L{CooperativeTask}, see
369         L{Cooperator.cooperate}.
370         """
371         self._iterator = iterator
372         self._cooperator = cooperator
373         self._deferreds = []
374         self._pauseCount = 0
375         self._completionState = None
376         self._completionResult = None
377         cooperator._addTask(self)
378
379
380     def whenDone(self):
381         """
382         Get a L{defer.Deferred} notification of when this task is complete.
383
384         @return: a L{defer.Deferred} that fires with the C{iterator} that this
385             L{CooperativeTask} was created with when the iterator has been
386             exhausted (i.e. its C{next} method has raised L{StopIteration}), or
387             fails with the exception raised by C{next} if it raises some other
388             exception.
389
390         @rtype: L{defer.Deferred}
391         """
392         d = defer.Deferred()
393         if self._completionState is None:
394             self._deferreds.append(d)
395         else:
396             d.callback(self._completionResult)
397         return d
398
399
400     def pause(self):
401         """
402         Pause this L{CooperativeTask}.  Stop doing work until
403         L{CooperativeTask.resume} is called.  If C{pause} is called more than
404         once, C{resume} must be called an equal number of times to resume this
405         task.
406
407         @raise TaskFinished: if this task has already finished or completed.
408         """
409         self._checkFinish()
410         self._pauseCount += 1
411         if self._pauseCount == 1:
412             self._cooperator._removeTask(self)
413
414
415     def resume(self):
416         """
417         Resume processing of a paused L{CooperativeTask}.
418
419         @raise NotPaused: if this L{CooperativeTask} is not paused.
420         """
421         if self._pauseCount == 0:
422             raise NotPaused()
423         self._pauseCount -= 1
424         if self._pauseCount == 0 and self._completionState is None:
425             self._cooperator._addTask(self)
426
427
428     def _completeWith(self, completionState, deferredResult):
429         """
430         @param completionState: a L{TaskFinished} exception or a subclass
431             thereof, indicating what exception should be raised when subsequent
432             operations are performed.
433
434         @param deferredResult: the result to fire all the deferreds with.
435         """
436         self._completionState = completionState
437         self._completionResult = deferredResult
438         if not self._pauseCount:
439             self._cooperator._removeTask(self)
440
441         # The Deferreds need to be invoked after all this is completed, because
442         # a Deferred may want to manipulate other tasks in a Cooperator.  For
443         # example, if you call "stop()" on a cooperator in a callback on a
444         # Deferred returned from whenDone(), this CooperativeTask must be gone
445         # from the Cooperator by that point so that _completeWith is not
446         # invoked reentrantly; that would cause these Deferreds to blow up with
447         # an AlreadyCalledError, or the _removeTask to fail with a ValueError.
448         for d in self._deferreds:
449             d.callback(deferredResult)
450
451
452     def stop(self):
453         """
454         Stop further processing of this task.
455
456         @raise TaskFinished: if this L{CooperativeTask} has previously
457             completed, via C{stop}, completion, or failure.
458         """
459         self._checkFinish()
460         self._completeWith(TaskStopped(), Failure(TaskStopped()))
461
462
463     def _checkFinish(self):
464         """
465         If this task has been stopped, raise the appropriate subclass of
466         L{TaskFinished}.
467         """
468         if self._completionState is not None:
469             raise self._completionState
470
471
472     def _oneWorkUnit(self):
473         """
474         Perform one unit of work for this task, retrieving one item from its
475         iterator, stopping if there are no further items in the iterator, and
476         pausing if the result was a L{defer.Deferred}.
477         """
478         try:
479             result = self._iterator.next()
480         except StopIteration:
481             self._completeWith(TaskDone(), self._iterator)
482         except:
483             self._completeWith(TaskFailed(), Failure())
484         else:
485             if isinstance(result, defer.Deferred):
486                 self.pause()
487                 def failLater(f):
488                     self._completeWith(TaskFailed(), f)
489                 result.addCallbacks(lambda result: self.resume(),
490                                     failLater)
491
492
493
494 class Cooperator(object):
495     """
496     Cooperative task scheduler.
497     """
498
499     def __init__(self,
500                  terminationPredicateFactory=_Timer,
501                  scheduler=_defaultScheduler,
502                  started=True):
503         """
504         Create a scheduler-like object to which iterators may be added.
505
506         @param terminationPredicateFactory: A no-argument callable which will
507         be invoked at the beginning of each step and should return a
508         no-argument callable which will return True when the step should be
509         terminated.  The default factory is time-based and allows iterators to
510         run for 1/100th of a second at a time.
511
512         @param scheduler: A one-argument callable which takes a no-argument
513         callable and should invoke it at some future point.  This will be used
514         to schedule each step of this Cooperator.
515
516         @param started: A boolean which indicates whether iterators should be
517         stepped as soon as they are added, or if they will be queued up until
518         L{Cooperator.start} is called.
519         """
520         self._tasks = []
521         self._metarator = iter(())
522         self._terminationPredicateFactory = terminationPredicateFactory
523         self._scheduler = scheduler
524         self._delayedCall = None
525         self._stopped = False
526         self._started = started
527
528
529     def coiterate(self, iterator, doneDeferred=None):
530         """
531         Add an iterator to the list of iterators this L{Cooperator} is
532         currently running.
533
534         @param doneDeferred: If specified, this will be the Deferred used as
535             the completion deferred.  It is suggested that you use the default,
536             which creates a new Deferred for you.
537
538         @return: a Deferred that will fire when the iterator finishes.
539         """
540         if doneDeferred is None:
541             doneDeferred = defer.Deferred()
542         CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
543         return doneDeferred
544
545
546     def cooperate(self, iterator):
547         """
548         Start running the given iterator as a long-running cooperative task, by
549         calling next() on it as a periodic timed event.
550
551         @param iterator: the iterator to invoke.
552
553         @return: a L{CooperativeTask} object representing this task.
554         """
555         return CooperativeTask(iterator, self)
556
557
558     def _addTask(self, task):
559         """
560         Add a L{CooperativeTask} object to this L{Cooperator}.
561         """
562         if self._stopped:
563             self._tasks.append(task) # XXX silly, I know, but _completeWith
564                                      # does the inverse
565             task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
566         else:
567             self._tasks.append(task)
568             self._reschedule()
569
570
571     def _removeTask(self, task):
572         """
573         Remove a L{CooperativeTask} from this L{Cooperator}.
574         """
575         self._tasks.remove(task)
576         # If no work left to do, cancel the delayed call:
577         if not self._tasks and self._delayedCall:
578             self._delayedCall.cancel()
579             self._delayedCall = None
580
581
582     def _tasksWhileNotStopped(self):
583         """
584         Yield all L{CooperativeTask} objects in a loop as long as this
585         L{Cooperator}'s termination condition has not been met.
586         """
587         terminator = self._terminationPredicateFactory()
588         while self._tasks:
589             for t in self._metarator:
590                 yield t
591                 if terminator():
592                     return
593             self._metarator = iter(self._tasks)
594
595
596     def _tick(self):
597         """
598         Run one scheduler tick.
599         """
600         self._delayedCall = None
601         for taskObj in self._tasksWhileNotStopped():
602             taskObj._oneWorkUnit()
603         self._reschedule()
604
605
606     _mustScheduleOnStart = False
607     def _reschedule(self):
608         if not self._started:
609             self._mustScheduleOnStart = True
610             return
611         if self._delayedCall is None and self._tasks:
612             self._delayedCall = self._scheduler(self._tick)
613
614
615     def start(self):
616         """
617         Begin scheduling steps.
618         """
619         self._stopped = False
620         self._started = True
621         if self._mustScheduleOnStart:
622             del self._mustScheduleOnStart
623             self._reschedule()
624
625
626     def stop(self):
627         """
628         Stop scheduling steps.  Errback the completion Deferreds of all
629         iterators which have been added and forget about them.
630         """
631         self._stopped = True
632         for taskObj in self._tasks:
633             taskObj._completeWith(SchedulerStopped(),
634                                   Failure(SchedulerStopped()))
635         self._tasks = []
636         if self._delayedCall is not None:
637             self._delayedCall.cancel()
638             self._delayedCall = None
639
640
641
642 _theCooperator = Cooperator()
643
644 def coiterate(iterator):
645     """
646     Cooperatively iterate over the given iterator, dividing runtime between it
647     and all other iterators which have been passed to this function and not yet
648     exhausted.
649     """
650     return _theCooperator.coiterate(iterator)
651
652
653
654 def cooperate(iterator):
655     """
656     Start running the given iterator as a long-running cooperative task, by
657     calling next() on it as a periodic timed event.
658
659     @param iterator: the iterator to invoke.
660
661     @return: a L{CooperativeTask} object representing this task.
662     """
663     return _theCooperator.cooperate(iterator)
664
665
666
667 class Clock:
668     """
669     Provide a deterministic, easily-controlled implementation of
670     L{IReactorTime.callLater}.  This is commonly useful for writing
671     deterministic unit tests for code which schedules events using this API.
672     """
673     implements(IReactorTime)
674
675     rightNow = 0.0
676
677     def __init__(self):
678         self.calls = []
679
680
681     def seconds(self):
682         """
683         Pretend to be time.time().  This is used internally when an operation
684         such as L{IDelayedCall.reset} needs to determine a a time value
685         relative to the current time.
686
687         @rtype: C{float}
688         @return: The time which should be considered the current time.
689         """
690         return self.rightNow
691
692
693     def _sortCalls(self):
694         """
695         Sort the pending calls according to the time they are scheduled.
696         """
697         self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
698
699
700     def callLater(self, when, what, *a, **kw):
701         """
702         See L{twisted.internet.interfaces.IReactorTime.callLater}.
703         """
704         dc = base.DelayedCall(self.seconds() + when,
705                                what, a, kw,
706                                self.calls.remove,
707                                lambda c: None,
708                                self.seconds)
709         self.calls.append(dc)
710         self._sortCalls()
711         return dc
712
713
714     def getDelayedCalls(self):
715         """
716         See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
717         """
718         return self.calls
719
720
721     def advance(self, amount):
722         """
723         Move time on this clock forward by the given amount and run whatever
724         pending calls should be run.
725
726         @type amount: C{float}
727         @param amount: The number of seconds which to advance this clock's
728         time.
729         """
730         self.rightNow += amount
731         self._sortCalls()
732         while self.calls and self.calls[0].getTime() <= self.seconds():
733             call = self.calls.pop(0)
734             call.called = 1
735             call.func(*call.args, **call.kw)
736             self._sortCalls()
737
738
739     def pump(self, timings):
740         """
741         Advance incrementally by the given set of times.
742
743         @type timings: iterable of C{float}
744         """
745         for amount in timings:
746             self.advance(amount)
747
748
749
750 def deferLater(clock, delay, callable, *args, **kw):
751     """
752     Call the given function after a certain period of time has passed.
753
754     @type clock: L{IReactorTime} provider
755     @param clock: The object which will be used to schedule the delayed
756         call.
757
758     @type delay: C{float} or C{int}
759     @param delay: The number of seconds to wait before calling the function.
760
761     @param callable: The object to call after the delay.
762
763     @param *args: The positional arguments to pass to C{callable}.
764
765     @param **kw: The keyword arguments to pass to C{callable}.
766
767     @rtype: L{defer.Deferred}
768
769     @return: A deferred that fires with the result of the callable when the
770         specified time has elapsed.
771     """
772     def deferLaterCancel(deferred):
773         delayedCall.cancel()
774     d = defer.Deferred(deferLaterCancel)
775     d.addCallback(lambda ignored: callable(*args, **kw))
776     delayedCall = clock.callLater(delay, d.callback, None)
777     return d
778
779
780
781 __all__ = [
782     'LoopingCall',
783
784     'Clock',
785
786     'SchedulerStopped', 'Cooperator', 'coiterate',
787
788     'deferLater',
789     ]