Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_cooperator.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 This module contains tests for L{twisted.internet.task.Cooperator} and
6 related functionality.
7 """
8
9 from twisted.internet import reactor, defer, task
10 from twisted.trial import unittest
11
12
13
14 class FakeDelayedCall(object):
15     """
16     Fake delayed call which lets us simulate the scheduler.
17     """
18     def __init__(self, func):
19         """
20         A function to run, later.
21         """
22         self.func = func
23         self.cancelled = False
24
25
26     def cancel(self):
27         """
28         Don't run my function later.
29         """
30         self.cancelled = True
31
32
33
34 class FakeScheduler(object):
35     """
36     A fake scheduler for testing against.
37     """
38     def __init__(self):
39         """
40         Create a fake scheduler with a list of work to do.
41         """
42         self.work = []
43
44
45     def __call__(self, thunk):
46         """
47         Schedule a unit of work to be done later.
48         """
49         unit = FakeDelayedCall(thunk)
50         self.work.append(unit)
51         return unit
52
53
54     def pump(self):
55         """
56         Do all of the work that is currently available to be done.
57         """
58         work, self.work = self.work, []
59         for unit in work:
60             if not unit.cancelled:
61                 unit.func()
62
63
64
65 class TestCooperator(unittest.TestCase):
66     RESULT = 'done'
67
68     def ebIter(self, err):
69         err.trap(task.SchedulerStopped)
70         return self.RESULT
71
72
73     def cbIter(self, ign):
74         self.fail()
75
76
77     def testStoppedRejectsNewTasks(self):
78         """
79         Test that Cooperators refuse new tasks when they have been stopped.
80         """
81         def testwith(stuff):
82             c = task.Cooperator()
83             c.stop()
84             d = c.coiterate(iter(()), stuff)
85             d.addCallback(self.cbIter)
86             d.addErrback(self.ebIter)
87             return d.addCallback(lambda result:
88                                  self.assertEqual(result, self.RESULT))
89         return testwith(None).addCallback(lambda ign: testwith(defer.Deferred()))
90
91
92     def testStopRunning(self):
93         """
94         Test that a running iterator will not run to completion when the
95         cooperator is stopped.
96         """
97         c = task.Cooperator()
98         def myiter():
99             for myiter.value in range(3):
100                 yield myiter.value
101         myiter.value = -1
102         d = c.coiterate(myiter())
103         d.addCallback(self.cbIter)
104         d.addErrback(self.ebIter)
105         c.stop()
106         def doasserts(result):
107             self.assertEqual(result, self.RESULT)
108             self.assertEqual(myiter.value, -1)
109         d.addCallback(doasserts)
110         return d
111
112
113     def testStopOutstanding(self):
114         """
115         An iterator run with L{Cooperator.coiterate} paused on a L{Deferred}
116         yielded by that iterator will fire its own L{Deferred} (the one
117         returned by C{coiterate}) when L{Cooperator.stop} is called.
118         """
119         testControlD = defer.Deferred()
120         outstandingD = defer.Deferred()
121         def myiter():
122             reactor.callLater(0, testControlD.callback, None)
123             yield outstandingD
124             self.fail()
125         c = task.Cooperator()
126         d = c.coiterate(myiter())
127         def stopAndGo(ign):
128             c.stop()
129             outstandingD.callback('arglebargle')
130
131         testControlD.addCallback(stopAndGo)
132         d.addCallback(self.cbIter)
133         d.addErrback(self.ebIter)
134
135         return d.addCallback(
136             lambda result: self.assertEqual(result, self.RESULT))
137
138
139     def testUnexpectedError(self):
140         c = task.Cooperator()
141         def myiter():
142             if 0:
143                 yield None
144             else:
145                 raise RuntimeError()
146         d = c.coiterate(myiter())
147         return self.assertFailure(d, RuntimeError)
148
149
150     def testUnexpectedErrorActuallyLater(self):
151         def myiter():
152             D = defer.Deferred()
153             reactor.callLater(0, D.errback, RuntimeError())
154             yield D
155
156         c = task.Cooperator()
157         d = c.coiterate(myiter())
158         return self.assertFailure(d, RuntimeError)
159
160
161     def testUnexpectedErrorNotActuallyLater(self):
162         def myiter():
163             yield defer.fail(RuntimeError())
164
165         c = task.Cooperator()
166         d = c.coiterate(myiter())
167         return self.assertFailure(d, RuntimeError)
168
169
170     def testCooperation(self):
171         L = []
172         def myiter(things):
173             for th in things:
174                 L.append(th)
175                 yield None
176
177         groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)]
178
179         c = task.Cooperator()
180         tasks = []
181         for stuff in groupsOfThings:
182             tasks.append(c.coiterate(myiter(stuff)))
183
184         return defer.DeferredList(tasks).addCallback(
185             lambda ign: self.assertEqual(tuple(L), sum(zip(*groupsOfThings), ())))
186
187
188     def testResourceExhaustion(self):
189         output = []
190         def myiter():
191             for i in range(100):
192                 output.append(i)
193                 if i == 9:
194                     _TPF.stopped = True
195                 yield i
196
197         class _TPF:
198             stopped = False
199             def __call__(self):
200                 return self.stopped
201
202         c = task.Cooperator(terminationPredicateFactory=_TPF)
203         c.coiterate(myiter()).addErrback(self.ebIter)
204         c._delayedCall.cancel()
205         # testing a private method because only the test case will ever care
206         # about this, so we have to carefully clean up after ourselves.
207         c._tick()
208         c.stop()
209         self.failUnless(_TPF.stopped)
210         self.assertEqual(output, range(10))
211
212
213     def testCallbackReCoiterate(self):
214         """
215         If a callback to a deferred returned by coiterate calls coiterate on
216         the same Cooperator, we should make sure to only do the minimal amount
217         of scheduling work.  (This test was added to demonstrate a specific bug
218         that was found while writing the scheduler.)
219         """
220         calls = []
221
222         class FakeCall:
223             def __init__(self, func):
224                 self.func = func
225
226             def __repr__(self):
227                 return '<FakeCall %r>' % (self.func,)
228
229         def sched(f):
230             self.failIf(calls, repr(calls))
231             calls.append(FakeCall(f))
232             return calls[-1]
233
234         c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda: lambda: True)
235         d = c.coiterate(iter(()))
236
237         done = []
238         def anotherTask(ign):
239             c.coiterate(iter(())).addBoth(done.append)
240
241         d.addCallback(anotherTask)
242
243         work = 0
244         while not done:
245             work += 1
246             while calls:
247                 calls.pop(0).func()
248                 work += 1
249             if work > 50:
250                 self.fail("Cooperator took too long")
251
252
253     def test_removingLastTaskStopsScheduledCall(self):
254         """
255         If the last task in a Cooperator is removed, the scheduled call for
256         the next tick is cancelled, since it is no longer necessary.
257
258         This behavior is useful for tests that want to assert they have left
259         no reactor state behind when they're done.
260         """
261         calls = [None]
262         def sched(f):
263             calls[0] = FakeDelayedCall(f)
264             return calls[0]
265         coop = task.Cooperator(scheduler=sched)
266
267         # Add two task; this should schedule the tick:
268         task1 = coop.cooperate(iter([1, 2]))
269         task2 = coop.cooperate(iter([1, 2]))
270         self.assertEqual(calls[0].func, coop._tick)
271
272         # Remove first task; scheduled call should still be going:
273         task1.stop()
274         self.assertEqual(calls[0].cancelled, False)
275         self.assertEqual(coop._delayedCall, calls[0])
276
277         # Remove second task; scheduled call should be cancelled:
278         task2.stop()
279         self.assertEqual(calls[0].cancelled, True)
280         self.assertEqual(coop._delayedCall, None)
281
282         # Add another task; scheduled call will be recreated:
283         task3 = coop.cooperate(iter([1, 2]))
284         self.assertEqual(calls[0].cancelled, False)
285         self.assertEqual(coop._delayedCall, calls[0])
286
287
288
289 class UnhandledException(Exception):
290     """
291     An exception that should go unhandled.
292     """
293
294
295
296 class AliasTests(unittest.TestCase):
297     """
298     Integration test to verify that the global singleton aliases do what
299     they're supposed to.
300     """
301
302     def test_cooperate(self):
303         """
304         L{twisted.internet.task.cooperate} ought to run the generator that it is
305         """
306         d = defer.Deferred()
307         def doit():
308             yield 1
309             yield 2
310             yield 3
311             d.callback("yay")
312         it = doit()
313         theTask = task.cooperate(it)
314         self.assertIn(theTask, task._theCooperator._tasks)
315         return d
316
317
318
319 class RunStateTests(unittest.TestCase):
320     """
321     Tests to verify the behavior of L{CooperativeTask.pause},
322     L{CooperativeTask.resume}, L{CooperativeTask.stop}, exhausting the
323     underlying iterator, and their interactions with each other.
324     """
325
326     def setUp(self):
327         """
328         Create a cooperator with a fake scheduler and a termination predicate
329         that ensures only one unit of work will take place per tick.
330         """
331         self._doDeferNext = False
332         self._doStopNext = False
333         self._doDieNext = False
334         self.work = []
335         self.scheduler = FakeScheduler()
336         self.cooperator = task.Cooperator(
337             scheduler=self.scheduler,
338             # Always stop after one iteration of work (return a function which
339             # returns a function which always returns True)
340             terminationPredicateFactory=lambda: lambda: True)
341         self.task = self.cooperator.cooperate(self.worker())
342         self.cooperator.start()
343
344
345     def worker(self):
346         """
347         This is a sample generator which yields Deferreds when we are testing
348         deferral and an ascending integer count otherwise.
349         """
350         i = 0
351         while True:
352             i += 1
353             if self._doDeferNext:
354                 self._doDeferNext = False
355                 d = defer.Deferred()
356                 self.work.append(d)
357                 yield d
358             elif self._doStopNext:
359                 return
360             elif self._doDieNext:
361                 raise UnhandledException()
362             else:
363                 self.work.append(i)
364                 yield i
365
366
367     def tearDown(self):
368         """
369         Drop references to interesting parts of the fixture to allow Deferred
370         errors to be noticed when things start failing.
371         """
372         del self.task
373         del self.scheduler
374
375
376     def deferNext(self):
377         """
378         Defer the next result from my worker iterator.
379         """
380         self._doDeferNext = True
381
382
383     def stopNext(self):
384         """
385         Make the next result from my worker iterator be completion (raising
386         StopIteration).
387         """
388         self._doStopNext = True
389
390
391     def dieNext(self):
392         """
393         Make the next result from my worker iterator be raising an
394         L{UnhandledException}.
395         """
396         def ignoreUnhandled(failure):
397             failure.trap(UnhandledException)
398             return None
399         self._doDieNext = True
400
401
402     def test_pauseResume(self):
403         """
404         Cooperators should stop running their tasks when they're paused, and
405         start again when they're resumed.
406         """
407         # first, sanity check
408         self.scheduler.pump()
409         self.assertEqual(self.work, [1])
410         self.scheduler.pump()
411         self.assertEqual(self.work, [1, 2])
412
413         # OK, now for real
414         self.task.pause()
415         self.scheduler.pump()
416         self.assertEqual(self.work, [1, 2])
417         self.task.resume()
418         # Resuming itself shoult not do any work
419         self.assertEqual(self.work, [1, 2])
420         self.scheduler.pump()
421         # But when the scheduler rolls around again...
422         self.assertEqual(self.work, [1, 2, 3])
423
424
425     def test_resumeNotPaused(self):
426         """
427         L{CooperativeTask.resume} should raise a L{TaskNotPaused} exception if
428         it was not paused; e.g. if L{CooperativeTask.pause} was not invoked
429         more times than L{CooperativeTask.resume} on that object.
430         """
431         self.assertRaises(task.NotPaused, self.task.resume)
432         self.task.pause()
433         self.task.resume()
434         self.assertRaises(task.NotPaused, self.task.resume)
435
436
437     def test_pauseTwice(self):
438         """
439         Pauses on tasks should behave like a stack. If a task is paused twice,
440         it needs to be resumed twice.
441         """
442         # pause once
443         self.task.pause()
444         self.scheduler.pump()
445         self.assertEqual(self.work, [])
446         # pause twice
447         self.task.pause()
448         self.scheduler.pump()
449         self.assertEqual(self.work, [])
450         # resume once (it shouldn't)
451         self.task.resume()
452         self.scheduler.pump()
453         self.assertEqual(self.work, [])
454         # resume twice (now it should go)
455         self.task.resume()
456         self.scheduler.pump()
457         self.assertEqual(self.work, [1])
458
459
460     def test_pauseWhileDeferred(self):
461         """
462         C{pause()}ing a task while it is waiting on an outstanding
463         L{defer.Deferred} should put the task into a state where the
464         outstanding L{defer.Deferred} must be called back I{and} the task is
465         C{resume}d before it will continue processing.
466         """
467         self.deferNext()
468         self.scheduler.pump()
469         self.assertEqual(len(self.work), 1)
470         self.failUnless(isinstance(self.work[0], defer.Deferred))
471         self.scheduler.pump()
472         self.assertEqual(len(self.work), 1)
473         self.task.pause()
474         self.scheduler.pump()
475         self.assertEqual(len(self.work), 1)
476         self.task.resume()
477         self.scheduler.pump()
478         self.assertEqual(len(self.work), 1)
479         self.work[0].callback("STUFF!")
480         self.scheduler.pump()
481         self.assertEqual(len(self.work), 2)
482         self.assertEqual(self.work[1], 2)
483
484
485     def test_whenDone(self):
486         """
487         L{CooperativeTask.whenDone} returns a Deferred which fires when the
488         Cooperator's iterator is exhausted.  It returns a new Deferred each
489         time it is called; callbacks added to other invocations will not modify
490         the value that subsequent invocations will fire with.
491         """
492
493         deferred1 = self.task.whenDone()
494         deferred2 = self.task.whenDone()
495         results1 = []
496         results2 = []
497         final1 = []
498         final2 = []
499
500         def callbackOne(result):
501             results1.append(result)
502             return 1
503
504         def callbackTwo(result):
505             results2.append(result)
506             return 2
507
508         deferred1.addCallback(callbackOne)
509         deferred2.addCallback(callbackTwo)
510
511         deferred1.addCallback(final1.append)
512         deferred2.addCallback(final2.append)
513
514         # exhaust the task iterator
515         # callbacks fire
516         self.stopNext()
517         self.scheduler.pump()
518
519         self.assertEqual(len(results1), 1)
520         self.assertEqual(len(results2), 1)
521
522         self.assertIdentical(results1[0], self.task._iterator)
523         self.assertIdentical(results2[0], self.task._iterator)
524
525         self.assertEqual(final1, [1])
526         self.assertEqual(final2, [2])
527
528
529     def test_whenDoneError(self):
530         """
531         L{CooperativeTask.whenDone} returns a L{defer.Deferred} that will fail
532         when the iterable's C{next} method raises an exception, with that
533         exception.
534         """
535         deferred1 = self.task.whenDone()
536         results = []
537         deferred1.addErrback(results.append)
538         self.dieNext()
539         self.scheduler.pump()
540         self.assertEqual(len(results), 1)
541         self.assertEqual(results[0].check(UnhandledException), UnhandledException)
542
543
544     def test_whenDoneStop(self):
545         """
546         L{CooperativeTask.whenDone} returns a L{defer.Deferred} that fails with
547         L{TaskStopped} when the C{stop} method is called on that
548         L{CooperativeTask}.
549         """
550         deferred1 = self.task.whenDone()
551         errors = []
552         deferred1.addErrback(errors.append)
553         self.task.stop()
554         self.assertEqual(len(errors), 1)
555         self.assertEqual(errors[0].check(task.TaskStopped), task.TaskStopped)
556
557
558     def test_whenDoneAlreadyDone(self):
559         """
560         L{CooperativeTask.whenDone} will return a L{defer.Deferred} that will
561         succeed immediately if its iterator has already completed.
562         """
563         self.stopNext()
564         self.scheduler.pump()
565         results = []
566         self.task.whenDone().addCallback(results.append)
567         self.assertEqual(results, [self.task._iterator])
568
569
570     def test_stopStops(self):
571         """
572         C{stop()}ping a task should cause it to be removed from the run just as
573         C{pause()}ing, with the distinction that C{resume()} will raise a
574         L{TaskStopped} exception.
575         """
576         self.task.stop()
577         self.scheduler.pump()
578         self.assertEqual(len(self.work), 0)
579         self.assertRaises(task.TaskStopped, self.task.stop)
580         self.assertRaises(task.TaskStopped, self.task.pause)
581         # Sanity check - it's still not scheduled, is it?
582         self.scheduler.pump()
583         self.assertEqual(self.work, [])
584
585
586     def test_pauseStopResume(self):
587         """
588         C{resume()}ing a paused, stopped task should be a no-op; it should not
589         raise an exception, because it's paused, but neither should it actually
590         do more work from the task.
591         """
592         self.task.pause()
593         self.task.stop()
594         self.task.resume()
595         self.scheduler.pump()
596         self.assertEqual(self.work, [])
597
598
599     def test_stopDeferred(self):
600         """
601         As a corrolary of the interaction of C{pause()} and C{unpause()},
602         C{stop()}ping a task which is waiting on a L{Deferred} should cause the
603         task to gracefully shut down, meaning that it should not be unpaused
604         when the deferred fires.
605         """
606         self.deferNext()
607         self.scheduler.pump()
608         d = self.work.pop()
609         self.assertEqual(self.task._pauseCount, 1)
610         results = []
611         d.addBoth(results.append)
612         self.scheduler.pump()
613         self.task.stop()
614         self.scheduler.pump()
615         d.callback(7)
616         self.scheduler.pump()
617         # Let's make sure that Deferred doesn't come out fried with an
618         # unhandled error that will be logged.  The value is None, rather than
619         # our test value, 7, because this Deferred is returned to and consumed
620         # by the cooperator code.  Its callback therefore has no contract.
621         self.assertEqual(results, [None])
622         # But more importantly, no further work should have happened.
623         self.assertEqual(self.work, [])
624
625
626     def test_stopExhausted(self):
627         """
628         C{stop()}ping a L{CooperativeTask} whose iterator has been exhausted
629         should raise L{TaskDone}.
630         """
631         self.stopNext()
632         self.scheduler.pump()
633         self.assertRaises(task.TaskDone, self.task.stop)
634
635
636     def test_stopErrored(self):
637         """
638         C{stop()}ping a L{CooperativeTask} whose iterator has encountered an
639         error should raise L{TaskFailed}.
640         """
641         self.dieNext()
642         self.scheduler.pump()
643         self.assertRaises(task.TaskFailed, self.task.stop)
644
645
646     def test_stopCooperatorReentrancy(self):
647         """
648         If a callback of a L{Deferred} from L{CooperativeTask.whenDone} calls
649         C{Cooperator.stop} on its L{CooperativeTask._cooperator}, the
650         L{Cooperator} will stop, but the L{CooperativeTask} whose callback is
651         calling C{stop} should already be considered 'stopped' by the time the
652         callback is running, and therefore removed from the
653         L{CoooperativeTask}.
654         """
655         callbackPhases = []
656         def stopit(result):
657             callbackPhases.append(result)
658             self.cooperator.stop()
659             # "done" here is a sanity check to make sure that we get all the
660             # way through the callback; i.e. stop() shouldn't be raising an
661             # exception due to the stopped-ness of our main task.
662             callbackPhases.append("done")
663         self.task.whenDone().addCallback(stopit)
664         self.stopNext()
665         self.scheduler.pump()
666         self.assertEqual(callbackPhases, [self.task._iterator, "done"])
667
668
669