Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_application.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.application} and its interaction with
6 L{twisted.persisted.sob}.
7 """
8
9 import copy, os, pickle
10 from StringIO import StringIO
11
12 from twisted.trial import unittest, util
13 from twisted.application import service, internet, app
14 from twisted.persisted import sob
15 from twisted.python import usage
16 from twisted.internet import interfaces, defer
17 from twisted.protocols import wire, basic
18 from twisted.internet import protocol, reactor
19 from twisted.application import reactors
20 from twisted.test.proto_helpers import MemoryReactor
21
22
23 class Dummy:
24     processName=None
25
26 class TestService(unittest.TestCase):
27
28     def testName(self):
29         s = service.Service()
30         s.setName("hello")
31         self.assertEqual(s.name, "hello")
32
33     def testParent(self):
34         s = service.Service()
35         p = service.MultiService()
36         s.setServiceParent(p)
37         self.assertEqual(list(p), [s])
38         self.assertEqual(s.parent, p)
39
40     def testApplicationAsParent(self):
41         s = service.Service()
42         p = service.Application("")
43         s.setServiceParent(p)
44         self.assertEqual(list(service.IServiceCollection(p)), [s])
45         self.assertEqual(s.parent, service.IServiceCollection(p))
46
47     def testNamedChild(self):
48         s = service.Service()
49         p = service.MultiService()
50         s.setName("hello")
51         s.setServiceParent(p)
52         self.assertEqual(list(p), [s])
53         self.assertEqual(s.parent, p)
54         self.assertEqual(p.getServiceNamed("hello"), s)
55
56     def testDoublyNamedChild(self):
57         s = service.Service()
58         p = service.MultiService()
59         s.setName("hello")
60         s.setServiceParent(p)
61         self.failUnlessRaises(RuntimeError, s.setName, "lala")
62
63     def testDuplicateNamedChild(self):
64         s = service.Service()
65         p = service.MultiService()
66         s.setName("hello")
67         s.setServiceParent(p)
68         s = service.Service()
69         s.setName("hello")
70         self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
71
72     def testDisowning(self):
73         s = service.Service()
74         p = service.MultiService()
75         s.setServiceParent(p)
76         self.assertEqual(list(p), [s])
77         self.assertEqual(s.parent, p)
78         s.disownServiceParent()
79         self.assertEqual(list(p), [])
80         self.assertEqual(s.parent, None)
81
82     def testRunning(self):
83         s = service.Service()
84         self.assert_(not s.running)
85         s.startService()
86         self.assert_(s.running)
87         s.stopService()
88         self.assert_(not s.running)
89
90     def testRunningChildren1(self):
91         s = service.Service()
92         p = service.MultiService()
93         s.setServiceParent(p)
94         self.assert_(not s.running)
95         self.assert_(not p.running)
96         p.startService()
97         self.assert_(s.running)
98         self.assert_(p.running)
99         p.stopService()
100         self.assert_(not s.running)
101         self.assert_(not p.running)
102
103     def testRunningChildren2(self):
104         s = service.Service()
105         def checkRunning():
106             self.assert_(s.running)
107         t = service.Service()
108         t.stopService = checkRunning
109         t.startService = checkRunning
110         p = service.MultiService()
111         s.setServiceParent(p)
112         t.setServiceParent(p)
113         p.startService()
114         p.stopService()
115
116     def testAddingIntoRunning(self):
117         p = service.MultiService()
118         p.startService()
119         s = service.Service()
120         self.assert_(not s.running)
121         s.setServiceParent(p)
122         self.assert_(s.running)
123         s.disownServiceParent()
124         self.assert_(not s.running)
125
126     def testPrivileged(self):
127         s = service.Service()
128         def pss():
129             s.privilegedStarted = 1
130         s.privilegedStartService = pss
131         s1 = service.Service()
132         p = service.MultiService()
133         s.setServiceParent(p)
134         s1.setServiceParent(p)
135         p.privilegedStartService()
136         self.assert_(s.privilegedStarted)
137
138     def testCopying(self):
139         s = service.Service()
140         s.startService()
141         s1 = copy.copy(s)
142         self.assert_(not s1.running)
143         self.assert_(s.running)
144
145
146 if hasattr(os, "getuid"):
147     curuid = os.getuid()
148     curgid = os.getgid()
149 else:
150     curuid = curgid = 0
151
152
153 class TestProcess(unittest.TestCase):
154
155     def testID(self):
156         p = service.Process(5, 6)
157         self.assertEqual(p.uid, 5)
158         self.assertEqual(p.gid, 6)
159
160     def testDefaults(self):
161         p = service.Process(5)
162         self.assertEqual(p.uid, 5)
163         self.assertEqual(p.gid, None)
164         p = service.Process(gid=5)
165         self.assertEqual(p.uid, None)
166         self.assertEqual(p.gid, 5)
167         p = service.Process()
168         self.assertEqual(p.uid, None)
169         self.assertEqual(p.gid, None)
170
171     def testProcessName(self):
172         p = service.Process()
173         self.assertEqual(p.processName, None)
174         p.processName = 'hello'
175         self.assertEqual(p.processName, 'hello')
176
177
178 class TestInterfaces(unittest.TestCase):
179
180     def testService(self):
181         self.assert_(service.IService.providedBy(service.Service()))
182
183     def testMultiService(self):
184         self.assert_(service.IService.providedBy(service.MultiService()))
185         self.assert_(service.IServiceCollection.providedBy(service.MultiService()))
186
187     def testProcess(self):
188         self.assert_(service.IProcess.providedBy(service.Process()))
189
190
191 class TestApplication(unittest.TestCase):
192
193     def testConstructor(self):
194         service.Application("hello")
195         service.Application("hello", 5)
196         service.Application("hello", 5, 6)
197
198     def testProcessComponent(self):
199         a = service.Application("hello")
200         self.assertEqual(service.IProcess(a).uid, None)
201         self.assertEqual(service.IProcess(a).gid, None)
202         a = service.Application("hello", 5)
203         self.assertEqual(service.IProcess(a).uid, 5)
204         self.assertEqual(service.IProcess(a).gid, None)
205         a = service.Application("hello", 5, 6)
206         self.assertEqual(service.IProcess(a).uid, 5)
207         self.assertEqual(service.IProcess(a).gid, 6)
208
209     def testServiceComponent(self):
210         a = service.Application("hello")
211         self.assert_(service.IService(a) is service.IServiceCollection(a))
212         self.assertEqual(service.IService(a).name, "hello")
213         self.assertEqual(service.IService(a).parent, None)
214
215     def testPersistableComponent(self):
216         a = service.Application("hello")
217         p = sob.IPersistable(a)
218         self.assertEqual(p.style, 'pickle')
219         self.assertEqual(p.name, 'hello')
220         self.assert_(p.original is a)
221
222 class TestLoading(unittest.TestCase):
223
224     def test_simpleStoreAndLoad(self):
225         a = service.Application("hello")
226         p = sob.IPersistable(a)
227         for style in 'source pickle'.split():
228             p.setStyle(style)
229             p.save()
230             a1 = service.loadApplication("hello.ta"+style[0], style)
231             self.assertEqual(service.IService(a1).name, "hello")
232         f = open("hello.tac", 'w')
233         f.writelines([
234         "from twisted.application import service\n",
235         "application = service.Application('hello')\n",
236         ])
237         f.close()
238         a1 = service.loadApplication("hello.tac", 'python')
239         self.assertEqual(service.IService(a1).name, "hello")
240
241
242
243 class TestAppSupport(unittest.TestCase):
244
245     def testPassphrase(self):
246         self.assertEqual(app.getPassphrase(0), None)
247
248     def testLoadApplication(self):
249         """
250         Test loading an application file in different dump format.
251         """
252         a = service.Application("hello")
253         baseconfig = {'file': None, 'source': None, 'python':None}
254         for style in 'source pickle'.split():
255             config = baseconfig.copy()
256             config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
257             sob.IPersistable(a).setStyle(style)
258             sob.IPersistable(a).save(filename='helloapplication')
259             a1 = app.getApplication(config, None)
260             self.assertEqual(service.IService(a1).name, "hello")
261         config = baseconfig.copy()
262         config['python'] = 'helloapplication'
263         f = open("helloapplication", 'w')
264         f.writelines([
265         "from twisted.application import service\n",
266         "application = service.Application('hello')\n",
267         ])
268         f.close()
269         a1 = app.getApplication(config, None)
270         self.assertEqual(service.IService(a1).name, "hello")
271
272     def test_convertStyle(self):
273         appl = service.Application("lala")
274         for instyle in 'source pickle'.split():
275             for outstyle in 'source pickle'.split():
276                 sob.IPersistable(appl).setStyle(instyle)
277                 sob.IPersistable(appl).save(filename="converttest")
278                 app.convertStyle("converttest", instyle, None,
279                                  "converttest.out", outstyle, 0)
280                 appl2 = service.loadApplication("converttest.out", outstyle)
281                 self.assertEqual(service.IService(appl2).name, "lala")
282
283
284     def test_startApplication(self):
285         appl = service.Application("lala")
286         app.startApplication(appl, 0)
287         self.assert_(service.IService(appl).running)
288
289
290 class Foo(basic.LineReceiver):
291     def connectionMade(self):
292         self.transport.write('lalala\r\n')
293     def lineReceived(self, line):
294         self.factory.line = line
295         self.transport.loseConnection()
296     def connectionLost(self, reason):
297         self.factory.d.callback(self.factory.line)
298
299
300 class DummyApp:
301     processName = None
302     def addService(self, service):
303         self.services[service.name] = service
304     def removeService(self, service):
305         del self.services[service.name]
306
307
308 class TimerTarget:
309     def __init__(self):
310         self.l = []
311     def append(self, what):
312         self.l.append(what)
313
314 class TestEcho(wire.Echo):
315     def connectionLost(self, reason):
316         self.d.callback(True)
317
318 class TestInternet2(unittest.TestCase):
319
320     def testTCP(self):
321         s = service.MultiService()
322         s.startService()
323         factory = protocol.ServerFactory()
324         factory.protocol = TestEcho
325         TestEcho.d = defer.Deferred()
326         t = internet.TCPServer(0, factory)
327         t.setServiceParent(s)
328         num = t._port.getHost().port
329         factory = protocol.ClientFactory()
330         factory.d = defer.Deferred()
331         factory.protocol = Foo
332         factory.line = None
333         internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
334         factory.d.addCallback(self.assertEqual, 'lalala')
335         factory.d.addCallback(lambda x : s.stopService())
336         factory.d.addCallback(lambda x : TestEcho.d)
337         return factory.d
338
339
340     def test_UDP(self):
341         """
342         Test L{internet.UDPServer} with a random port: starting the service
343         should give it valid port, and stopService should free it so that we
344         can start a server on the same port again.
345         """
346         if not interfaces.IReactorUDP(reactor, None):
347             raise unittest.SkipTest("This reactor does not support UDP sockets")
348         p = protocol.DatagramProtocol()
349         t = internet.UDPServer(0, p)
350         t.startService()
351         num = t._port.getHost().port
352         self.assertNotEquals(num, 0)
353         def onStop(ignored):
354             t = internet.UDPServer(num, p)
355             t.startService()
356             return t.stopService()
357         return defer.maybeDeferred(t.stopService).addCallback(onStop)
358
359
360     def testPrivileged(self):
361         factory = protocol.ServerFactory()
362         factory.protocol = TestEcho
363         TestEcho.d = defer.Deferred()
364         t = internet.TCPServer(0, factory)
365         t.privileged = 1
366         t.privilegedStartService()
367         num = t._port.getHost().port
368         factory = protocol.ClientFactory()
369         factory.d = defer.Deferred()
370         factory.protocol = Foo
371         factory.line = None
372         c = internet.TCPClient('127.0.0.1', num, factory)
373         c.startService()
374         factory.d.addCallback(self.assertEqual, 'lalala')
375         factory.d.addCallback(lambda x : c.stopService())
376         factory.d.addCallback(lambda x : t.stopService())
377         factory.d.addCallback(lambda x : TestEcho.d)
378         return factory.d
379
380     def testConnectionGettingRefused(self):
381         factory = protocol.ServerFactory()
382         factory.protocol = wire.Echo
383         t = internet.TCPServer(0, factory)
384         t.startService()
385         num = t._port.getHost().port
386         t.stopService()
387         d = defer.Deferred()
388         factory = protocol.ClientFactory()
389         factory.clientConnectionFailed = lambda *args: d.callback(None)
390         c = internet.TCPClient('127.0.0.1', num, factory)
391         c.startService()
392         return d
393
394     def testUNIX(self):
395         # FIXME: This test is far too dense.  It needs comments.
396         #  -- spiv, 2004-11-07
397         if not interfaces.IReactorUNIX(reactor, None):
398             raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
399         s = service.MultiService()
400         s.startService()
401         factory = protocol.ServerFactory()
402         factory.protocol = TestEcho
403         TestEcho.d = defer.Deferred()
404         t = internet.UNIXServer('echo.skt', factory)
405         t.setServiceParent(s)
406         factory = protocol.ClientFactory()
407         factory.protocol = Foo
408         factory.d = defer.Deferred()
409         factory.line = None
410         internet.UNIXClient('echo.skt', factory).setServiceParent(s)
411         factory.d.addCallback(self.assertEqual, 'lalala')
412         factory.d.addCallback(lambda x : s.stopService())
413         factory.d.addCallback(lambda x : TestEcho.d)
414         factory.d.addCallback(self._cbTestUnix, factory, s)
415         return factory.d
416
417     def _cbTestUnix(self, ignored, factory, s):
418         TestEcho.d = defer.Deferred()
419         factory.line = None
420         factory.d = defer.Deferred()
421         s.startService()
422         factory.d.addCallback(self.assertEqual, 'lalala')
423         factory.d.addCallback(lambda x : s.stopService())
424         factory.d.addCallback(lambda x : TestEcho.d)
425         return factory.d
426
427     def testVolatile(self):
428         if not interfaces.IReactorUNIX(reactor, None):
429             raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
430         factory = protocol.ServerFactory()
431         factory.protocol = wire.Echo
432         t = internet.UNIXServer('echo.skt', factory)
433         t.startService()
434         self.failIfIdentical(t._port, None)
435         t1 = copy.copy(t)
436         self.assertIdentical(t1._port, None)
437         t.stopService()
438         self.assertIdentical(t._port, None)
439         self.failIf(t.running)
440
441         factory = protocol.ClientFactory()
442         factory.protocol = wire.Echo
443         t = internet.UNIXClient('echo.skt', factory)
444         t.startService()
445         self.failIfIdentical(t._connection, None)
446         t1 = copy.copy(t)
447         self.assertIdentical(t1._connection, None)
448         t.stopService()
449         self.assertIdentical(t._connection, None)
450         self.failIf(t.running)
451
452     def testStoppingServer(self):
453         if not interfaces.IReactorUNIX(reactor, None):
454             raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
455         factory = protocol.ServerFactory()
456         factory.protocol = wire.Echo
457         t = internet.UNIXServer('echo.skt', factory)
458         t.startService()
459         t.stopService()
460         self.failIf(t.running)
461         factory = protocol.ClientFactory()
462         d = defer.Deferred()
463         factory.clientConnectionFailed = lambda *args: d.callback(None)
464         reactor.connectUNIX('echo.skt', factory)
465         return d
466
467     def testPickledTimer(self):
468         target = TimerTarget()
469         t0 = internet.TimerService(1, target.append, "hello")
470         t0.startService()
471         s = pickle.dumps(t0)
472         t0.stopService()
473
474         t = pickle.loads(s)
475         self.failIf(t.running)
476
477     def testBrokenTimer(self):
478         d = defer.Deferred()
479         t = internet.TimerService(1, lambda: 1 / 0)
480         oldFailed = t._failed
481         def _failed(why):
482             oldFailed(why)
483             d.callback(None)
484         t._failed = _failed
485         t.startService()
486         d.addCallback(lambda x : t.stopService)
487         d.addCallback(lambda x : self.assertEqual(
488             [ZeroDivisionError],
489             [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)]))
490         return d
491
492
493     def test_genericServerDeprecated(self):
494         """
495         Instantiating L{GenericServer} emits a deprecation warning.
496         """
497         internet.GenericServer()
498         warnings = self.flushWarnings(
499             offendingFunctions=[self.test_genericServerDeprecated])
500         self.assertEqual(
501             warnings[0]['message'],
502             'GenericServer was deprecated in Twisted 10.1.')
503         self.assertEqual(warnings[0]['category'], DeprecationWarning)
504         self.assertEqual(len(warnings), 1)
505
506
507     def test_genericClientDeprecated(self):
508         """
509         Instantiating L{GenericClient} emits a deprecation warning.
510         """
511         internet.GenericClient()
512         warnings = self.flushWarnings(
513             offendingFunctions=[self.test_genericClientDeprecated])
514         self.assertEqual(
515             warnings[0]['message'],
516             'GenericClient was deprecated in Twisted 10.1.')
517         self.assertEqual(warnings[0]['category'], DeprecationWarning)
518         self.assertEqual(len(warnings), 1)
519
520
521     def test_everythingThere(self):
522         """
523         L{twisted.application.internet} dynamically defines a set of
524         L{service.Service} subclasses that in general have corresponding
525         reactor.listenXXX or reactor.connectXXX calls.
526         """
527         trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
528         for tran in trans[:]:
529             if not getattr(interfaces, "IReactor" + tran)(reactor, None):
530                 trans.remove(tran)
531         if interfaces.IReactorArbitrary(reactor, None) is not None:
532             trans.insert(0, "Generic")
533         for tran in trans:
534             for side in 'Server Client'.split():
535                 if tran == "Multicast" and side == "Client":
536                     continue
537                 self.assertTrue(hasattr(internet, tran + side))
538                 method = getattr(internet, tran + side).method
539                 prefix = {'Server': 'listen', 'Client': 'connect'}[side]
540                 self.assertTrue(hasattr(reactor, prefix + method) or
541                         (prefix == "connect" and method == "UDP"))
542                 o = getattr(internet, tran + side)()
543                 self.assertEqual(service.IService(o), o)
544     test_everythingThere.suppress = [
545         util.suppress(message='GenericServer was deprecated in Twisted 10.1.',
546                       category=DeprecationWarning),
547         util.suppress(message='GenericClient was deprecated in Twisted 10.1.',
548                       category=DeprecationWarning),
549         util.suppress(message='twisted.internet.interfaces.IReactorArbitrary was '
550                       'deprecated in Twisted 10.1.0: See IReactorFDSet.')]
551
552
553     def test_importAll(self):
554         """
555         L{twisted.application.internet} dynamically defines L{service.Service}
556         subclasses. This test ensures that the subclasses exposed by C{__all__}
557         are valid attributes of the module.
558         """
559         for cls in internet.__all__:
560             self.assertTrue(
561                 hasattr(internet, cls),
562                 '%s not importable from twisted.application.internet' % (cls,))
563
564
565     def test_reactorParametrizationInServer(self):
566         """
567         L{internet._AbstractServer} supports a C{reactor} keyword argument
568         that can be used to parametrize the reactor used to listen for
569         connections.
570         """
571         reactor = MemoryReactor()
572
573         factory = object()
574         t = internet.TCPServer(1234, factory, reactor=reactor)
575         t.startService()
576         self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
577
578
579     def test_reactorParametrizationInClient(self):
580         """
581         L{internet._AbstractClient} supports a C{reactor} keyword arguments
582         that can be used to parametrize the reactor used to create new client
583         connections.
584         """
585         reactor = MemoryReactor()
586
587         factory = protocol.ClientFactory()
588         t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
589         t.startService()
590         self.assertEqual(
591             reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
592
593
594     def test_reactorParametrizationInServerMultipleStart(self):
595         """
596         Like L{test_reactorParametrizationInServer}, but stop and restart the
597         service and check that the given reactor is still used.
598         """
599         reactor = MemoryReactor()
600
601         factory = protocol.Factory()
602         t = internet.TCPServer(1234, factory, reactor=reactor)
603         t.startService()
604         self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
605         t.stopService()
606         t.startService()
607         self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
608
609
610     def test_reactorParametrizationInClientMultipleStart(self):
611         """
612         Like L{test_reactorParametrizationInClient}, but stop and restart the
613         service and check that the given reactor is still used.
614         """
615         reactor = MemoryReactor()
616
617         factory = protocol.ClientFactory()
618         t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
619         t.startService()
620         self.assertEqual(
621             reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
622         t.stopService()
623         t.startService()
624         self.assertEqual(
625             reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
626
627
628
629 class TestTimerBasic(unittest.TestCase):
630
631     def testTimerRuns(self):
632         d = defer.Deferred()
633         self.t = internet.TimerService(1, d.callback, 'hello')
634         self.t.startService()
635         d.addCallback(self.assertEqual, 'hello')
636         d.addCallback(lambda x : self.t.stopService())
637         d.addCallback(lambda x : self.failIf(self.t.running))
638         return d
639
640     def tearDown(self):
641         return self.t.stopService()
642
643     def testTimerRestart(self):
644         # restart the same TimerService
645         d1 = defer.Deferred()
646         d2 = defer.Deferred()
647         work = [(d2, "bar"), (d1, "foo")]
648         def trigger():
649             d, arg = work.pop()
650             d.callback(arg)
651         self.t = internet.TimerService(1, trigger)
652         self.t.startService()
653         def onFirstResult(result):
654             self.assertEqual(result, 'foo')
655             return self.t.stopService()
656         def onFirstStop(ignored):
657             self.failIf(self.t.running)
658             self.t.startService()
659             return d2
660         def onSecondResult(result):
661             self.assertEqual(result, 'bar')
662             self.t.stopService()
663         d1.addCallback(onFirstResult)
664         d1.addCallback(onFirstStop)
665         d1.addCallback(onSecondResult)
666         return d1
667
668     def testTimerLoops(self):
669         l = []
670         def trigger(data, number, d):
671             l.append(data)
672             if len(l) == number:
673                 d.callback(l)
674         d = defer.Deferred()
675         self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
676         self.t.startService()
677         d.addCallback(self.assertEqual, ['hello'] * 10)
678         d.addCallback(lambda x : self.t.stopService())
679         return d
680
681
682 class FakeReactor(reactors.Reactor):
683     """
684     A fake reactor with a hooked install method.
685     """
686
687     def __init__(self, install, *args, **kwargs):
688         """
689         @param install: any callable that will be used as install method.
690         @type install: C{callable}
691         """
692         reactors.Reactor.__init__(self, *args, **kwargs)
693         self.install = install
694
695
696
697 class PluggableReactorTestCase(unittest.TestCase):
698     """
699     Tests for the reactor discovery/inspection APIs.
700     """
701
702     def setUp(self):
703         """
704         Override the L{reactors.getPlugins} function, normally bound to
705         L{twisted.plugin.getPlugins}, in order to control which
706         L{IReactorInstaller} plugins are seen as available.
707
708         C{self.pluginResults} can be customized and will be used as the
709         result of calls to C{reactors.getPlugins}.
710         """
711         self.pluginCalls = []
712         self.pluginResults = []
713         self.originalFunction = reactors.getPlugins
714         reactors.getPlugins = self._getPlugins
715
716
717     def tearDown(self):
718         """
719         Restore the original L{reactors.getPlugins}.
720         """
721         reactors.getPlugins = self.originalFunction
722
723
724     def _getPlugins(self, interface, package=None):
725         """
726         Stand-in for the real getPlugins method which records its arguments
727         and returns a fixed result.
728         """
729         self.pluginCalls.append((interface, package))
730         return list(self.pluginResults)
731
732
733     def test_getPluginReactorTypes(self):
734         """
735         Test that reactor plugins are returned from L{getReactorTypes}
736         """
737         name = 'fakereactortest'
738         package = __name__ + '.fakereactor'
739         description = 'description'
740         self.pluginResults = [reactors.Reactor(name, package, description)]
741         reactorTypes = reactors.getReactorTypes()
742
743         self.assertEqual(
744             self.pluginCalls,
745             [(reactors.IReactorInstaller, None)])
746
747         for r in reactorTypes:
748             if r.shortName == name:
749                 self.assertEqual(r.description, description)
750                 break
751         else:
752             self.fail("Reactor plugin not present in getReactorTypes() result")
753
754
755     def test_reactorInstallation(self):
756         """
757         Test that L{reactors.Reactor.install} loads the correct module and
758         calls its install attribute.
759         """
760         installed = []
761         def install():
762             installed.append(True)
763         installer = FakeReactor(install,
764                                 'fakereactortest', __name__, 'described')
765         installer.install()
766         self.assertEqual(installed, [True])
767
768
769     def test_installReactor(self):
770         """
771         Test that the L{reactors.installReactor} function correctly installs
772         the specified reactor.
773         """
774         installed = []
775         def install():
776             installed.append(True)
777         name = 'fakereactortest'
778         package = __name__
779         description = 'description'
780         self.pluginResults = [FakeReactor(install, name, package, description)]
781         reactors.installReactor(name)
782         self.assertEqual(installed, [True])
783
784
785     def test_installNonExistentReactor(self):
786         """
787         Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
788         when asked to install a reactor which it cannot find.
789         """
790         self.pluginResults = []
791         self.assertRaises(
792             reactors.NoSuchReactor,
793             reactors.installReactor, 'somereactor')
794
795
796     def test_installNotAvailableReactor(self):
797         """
798         Test that L{reactors.installReactor} raises an exception when asked to
799         install a reactor which doesn't work in this environment.
800         """
801         def install():
802             raise ImportError("Missing foo bar")
803         name = 'fakereactortest'
804         package = __name__
805         description = 'description'
806         self.pluginResults = [FakeReactor(install, name, package, description)]
807         self.assertRaises(ImportError, reactors.installReactor, name)
808
809
810     def test_reactorSelectionMixin(self):
811         """
812         Test that the reactor selected is installed as soon as possible, ie
813         when the option is parsed.
814         """
815         executed = []
816         INSTALL_EVENT = 'reactor installed'
817         SUBCOMMAND_EVENT = 'subcommands loaded'
818
819         class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
820             def subCommands(self):
821                 executed.append(SUBCOMMAND_EVENT)
822                 return [('subcommand', None, lambda: self, 'test subcommand')]
823             subCommands = property(subCommands)
824
825         def install():
826             executed.append(INSTALL_EVENT)
827         self.pluginResults = [
828             FakeReactor(install, 'fakereactortest', __name__, 'described')
829         ]
830
831         options = ReactorSelectionOptions()
832         options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
833         self.assertEqual(executed[0], INSTALL_EVENT)
834         self.assertEqual(executed.count(INSTALL_EVENT), 1)
835         self.assertEqual(options["reactor"], "fakereactortest")
836
837
838     def test_reactorSelectionMixinNonExistent(self):
839         """
840         Test that the usage mixin exits when trying to use a non existent
841         reactor (the name not matching to any reactor), giving an error
842         message.
843         """
844         class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
845             pass
846         self.pluginResults = []
847
848         options = ReactorSelectionOptions()
849         options.messageOutput = StringIO()
850         e = self.assertRaises(usage.UsageError, options.parseOptions,
851                               ['--reactor', 'fakereactortest', 'subcommand'])
852         self.assertIn("fakereactortest", e.args[0])
853         self.assertIn("help-reactors", e.args[0])
854
855
856     def test_reactorSelectionMixinNotAvailable(self):
857         """
858         Test that the usage mixin exits when trying to use a reactor not
859         available (the reactor raises an error at installation), giving an
860         error message.
861         """
862         class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
863             pass
864         message = "Missing foo bar"
865         def install():
866             raise ImportError(message)
867
868         name = 'fakereactortest'
869         package = __name__
870         description = 'description'
871         self.pluginResults = [FakeReactor(install, name, package, description)]
872
873         options = ReactorSelectionOptions()
874         options.messageOutput = StringIO()
875         e =  self.assertRaises(usage.UsageError, options.parseOptions,
876                                ['--reactor', 'fakereactortest', 'subcommand'])
877         self.assertIn(message, e.args[0])
878         self.assertIn("help-reactors", e.args[0])