1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.application} and its interaction with
6 L{twisted.persisted.sob}.
9 import copy, os, pickle
10 from StringIO import StringIO
12 from twisted.trial import unittest, util
13 from twisted.application import service, internet, app
14 from twisted.persisted import sob
15 from twisted.python import usage
16 from twisted.internet import interfaces, defer
17 from twisted.protocols import wire, basic
18 from twisted.internet import protocol, reactor
19 from twisted.application import reactors
20 from twisted.test.proto_helpers import MemoryReactor
26 class TestService(unittest.TestCase):
31 self.assertEqual(s.name, "hello")
35 p = service.MultiService()
37 self.assertEqual(list(p), [s])
38 self.assertEqual(s.parent, p)
40 def testApplicationAsParent(self):
42 p = service.Application("")
44 self.assertEqual(list(service.IServiceCollection(p)), [s])
45 self.assertEqual(s.parent, service.IServiceCollection(p))
47 def testNamedChild(self):
49 p = service.MultiService()
52 self.assertEqual(list(p), [s])
53 self.assertEqual(s.parent, p)
54 self.assertEqual(p.getServiceNamed("hello"), s)
56 def testDoublyNamedChild(self):
58 p = service.MultiService()
61 self.failUnlessRaises(RuntimeError, s.setName, "lala")
63 def testDuplicateNamedChild(self):
65 p = service.MultiService()
70 self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
72 def testDisowning(self):
74 p = service.MultiService()
76 self.assertEqual(list(p), [s])
77 self.assertEqual(s.parent, p)
78 s.disownServiceParent()
79 self.assertEqual(list(p), [])
80 self.assertEqual(s.parent, None)
82 def testRunning(self):
84 self.assert_(not s.running)
86 self.assert_(s.running)
88 self.assert_(not s.running)
90 def testRunningChildren1(self):
92 p = service.MultiService()
94 self.assert_(not s.running)
95 self.assert_(not p.running)
97 self.assert_(s.running)
98 self.assert_(p.running)
100 self.assert_(not s.running)
101 self.assert_(not p.running)
103 def testRunningChildren2(self):
104 s = service.Service()
106 self.assert_(s.running)
107 t = service.Service()
108 t.stopService = checkRunning
109 t.startService = checkRunning
110 p = service.MultiService()
111 s.setServiceParent(p)
112 t.setServiceParent(p)
116 def testAddingIntoRunning(self):
117 p = service.MultiService()
119 s = service.Service()
120 self.assert_(not s.running)
121 s.setServiceParent(p)
122 self.assert_(s.running)
123 s.disownServiceParent()
124 self.assert_(not s.running)
126 def testPrivileged(self):
127 s = service.Service()
129 s.privilegedStarted = 1
130 s.privilegedStartService = pss
131 s1 = service.Service()
132 p = service.MultiService()
133 s.setServiceParent(p)
134 s1.setServiceParent(p)
135 p.privilegedStartService()
136 self.assert_(s.privilegedStarted)
138 def testCopying(self):
139 s = service.Service()
142 self.assert_(not s1.running)
143 self.assert_(s.running)
146 if hasattr(os, "getuid"):
153 class TestProcess(unittest.TestCase):
156 p = service.Process(5, 6)
157 self.assertEqual(p.uid, 5)
158 self.assertEqual(p.gid, 6)
160 def testDefaults(self):
161 p = service.Process(5)
162 self.assertEqual(p.uid, 5)
163 self.assertEqual(p.gid, None)
164 p = service.Process(gid=5)
165 self.assertEqual(p.uid, None)
166 self.assertEqual(p.gid, 5)
167 p = service.Process()
168 self.assertEqual(p.uid, None)
169 self.assertEqual(p.gid, None)
171 def testProcessName(self):
172 p = service.Process()
173 self.assertEqual(p.processName, None)
174 p.processName = 'hello'
175 self.assertEqual(p.processName, 'hello')
178 class TestInterfaces(unittest.TestCase):
180 def testService(self):
181 self.assert_(service.IService.providedBy(service.Service()))
183 def testMultiService(self):
184 self.assert_(service.IService.providedBy(service.MultiService()))
185 self.assert_(service.IServiceCollection.providedBy(service.MultiService()))
187 def testProcess(self):
188 self.assert_(service.IProcess.providedBy(service.Process()))
191 class TestApplication(unittest.TestCase):
193 def testConstructor(self):
194 service.Application("hello")
195 service.Application("hello", 5)
196 service.Application("hello", 5, 6)
198 def testProcessComponent(self):
199 a = service.Application("hello")
200 self.assertEqual(service.IProcess(a).uid, None)
201 self.assertEqual(service.IProcess(a).gid, None)
202 a = service.Application("hello", 5)
203 self.assertEqual(service.IProcess(a).uid, 5)
204 self.assertEqual(service.IProcess(a).gid, None)
205 a = service.Application("hello", 5, 6)
206 self.assertEqual(service.IProcess(a).uid, 5)
207 self.assertEqual(service.IProcess(a).gid, 6)
209 def testServiceComponent(self):
210 a = service.Application("hello")
211 self.assert_(service.IService(a) is service.IServiceCollection(a))
212 self.assertEqual(service.IService(a).name, "hello")
213 self.assertEqual(service.IService(a).parent, None)
215 def testPersistableComponent(self):
216 a = service.Application("hello")
217 p = sob.IPersistable(a)
218 self.assertEqual(p.style, 'pickle')
219 self.assertEqual(p.name, 'hello')
220 self.assert_(p.original is a)
222 class TestLoading(unittest.TestCase):
224 def test_simpleStoreAndLoad(self):
225 a = service.Application("hello")
226 p = sob.IPersistable(a)
227 for style in 'source pickle'.split():
230 a1 = service.loadApplication("hello.ta"+style[0], style)
231 self.assertEqual(service.IService(a1).name, "hello")
232 f = open("hello.tac", 'w')
234 "from twisted.application import service\n",
235 "application = service.Application('hello')\n",
238 a1 = service.loadApplication("hello.tac", 'python')
239 self.assertEqual(service.IService(a1).name, "hello")
243 class TestAppSupport(unittest.TestCase):
245 def testPassphrase(self):
246 self.assertEqual(app.getPassphrase(0), None)
248 def testLoadApplication(self):
250 Test loading an application file in different dump format.
252 a = service.Application("hello")
253 baseconfig = {'file': None, 'source': None, 'python':None}
254 for style in 'source pickle'.split():
255 config = baseconfig.copy()
256 config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
257 sob.IPersistable(a).setStyle(style)
258 sob.IPersistable(a).save(filename='helloapplication')
259 a1 = app.getApplication(config, None)
260 self.assertEqual(service.IService(a1).name, "hello")
261 config = baseconfig.copy()
262 config['python'] = 'helloapplication'
263 f = open("helloapplication", 'w')
265 "from twisted.application import service\n",
266 "application = service.Application('hello')\n",
269 a1 = app.getApplication(config, None)
270 self.assertEqual(service.IService(a1).name, "hello")
272 def test_convertStyle(self):
273 appl = service.Application("lala")
274 for instyle in 'source pickle'.split():
275 for outstyle in 'source pickle'.split():
276 sob.IPersistable(appl).setStyle(instyle)
277 sob.IPersistable(appl).save(filename="converttest")
278 app.convertStyle("converttest", instyle, None,
279 "converttest.out", outstyle, 0)
280 appl2 = service.loadApplication("converttest.out", outstyle)
281 self.assertEqual(service.IService(appl2).name, "lala")
284 def test_startApplication(self):
285 appl = service.Application("lala")
286 app.startApplication(appl, 0)
287 self.assert_(service.IService(appl).running)
290 class Foo(basic.LineReceiver):
291 def connectionMade(self):
292 self.transport.write('lalala\r\n')
293 def lineReceived(self, line):
294 self.factory.line = line
295 self.transport.loseConnection()
296 def connectionLost(self, reason):
297 self.factory.d.callback(self.factory.line)
302 def addService(self, service):
303 self.services[service.name] = service
304 def removeService(self, service):
305 del self.services[service.name]
311 def append(self, what):
314 class TestEcho(wire.Echo):
315 def connectionLost(self, reason):
316 self.d.callback(True)
318 class TestInternet2(unittest.TestCase):
321 s = service.MultiService()
323 factory = protocol.ServerFactory()
324 factory.protocol = TestEcho
325 TestEcho.d = defer.Deferred()
326 t = internet.TCPServer(0, factory)
327 t.setServiceParent(s)
328 num = t._port.getHost().port
329 factory = protocol.ClientFactory()
330 factory.d = defer.Deferred()
331 factory.protocol = Foo
333 internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
334 factory.d.addCallback(self.assertEqual, 'lalala')
335 factory.d.addCallback(lambda x : s.stopService())
336 factory.d.addCallback(lambda x : TestEcho.d)
342 Test L{internet.UDPServer} with a random port: starting the service
343 should give it valid port, and stopService should free it so that we
344 can start a server on the same port again.
346 if not interfaces.IReactorUDP(reactor, None):
347 raise unittest.SkipTest("This reactor does not support UDP sockets")
348 p = protocol.DatagramProtocol()
349 t = internet.UDPServer(0, p)
351 num = t._port.getHost().port
352 self.assertNotEquals(num, 0)
354 t = internet.UDPServer(num, p)
356 return t.stopService()
357 return defer.maybeDeferred(t.stopService).addCallback(onStop)
360 def testPrivileged(self):
361 factory = protocol.ServerFactory()
362 factory.protocol = TestEcho
363 TestEcho.d = defer.Deferred()
364 t = internet.TCPServer(0, factory)
366 t.privilegedStartService()
367 num = t._port.getHost().port
368 factory = protocol.ClientFactory()
369 factory.d = defer.Deferred()
370 factory.protocol = Foo
372 c = internet.TCPClient('127.0.0.1', num, factory)
374 factory.d.addCallback(self.assertEqual, 'lalala')
375 factory.d.addCallback(lambda x : c.stopService())
376 factory.d.addCallback(lambda x : t.stopService())
377 factory.d.addCallback(lambda x : TestEcho.d)
380 def testConnectionGettingRefused(self):
381 factory = protocol.ServerFactory()
382 factory.protocol = wire.Echo
383 t = internet.TCPServer(0, factory)
385 num = t._port.getHost().port
388 factory = protocol.ClientFactory()
389 factory.clientConnectionFailed = lambda *args: d.callback(None)
390 c = internet.TCPClient('127.0.0.1', num, factory)
395 # FIXME: This test is far too dense. It needs comments.
396 # -- spiv, 2004-11-07
397 if not interfaces.IReactorUNIX(reactor, None):
398 raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
399 s = service.MultiService()
401 factory = protocol.ServerFactory()
402 factory.protocol = TestEcho
403 TestEcho.d = defer.Deferred()
404 t = internet.UNIXServer('echo.skt', factory)
405 t.setServiceParent(s)
406 factory = protocol.ClientFactory()
407 factory.protocol = Foo
408 factory.d = defer.Deferred()
410 internet.UNIXClient('echo.skt', factory).setServiceParent(s)
411 factory.d.addCallback(self.assertEqual, 'lalala')
412 factory.d.addCallback(lambda x : s.stopService())
413 factory.d.addCallback(lambda x : TestEcho.d)
414 factory.d.addCallback(self._cbTestUnix, factory, s)
417 def _cbTestUnix(self, ignored, factory, s):
418 TestEcho.d = defer.Deferred()
420 factory.d = defer.Deferred()
422 factory.d.addCallback(self.assertEqual, 'lalala')
423 factory.d.addCallback(lambda x : s.stopService())
424 factory.d.addCallback(lambda x : TestEcho.d)
427 def testVolatile(self):
428 if not interfaces.IReactorUNIX(reactor, None):
429 raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
430 factory = protocol.ServerFactory()
431 factory.protocol = wire.Echo
432 t = internet.UNIXServer('echo.skt', factory)
434 self.failIfIdentical(t._port, None)
436 self.assertIdentical(t1._port, None)
438 self.assertIdentical(t._port, None)
439 self.failIf(t.running)
441 factory = protocol.ClientFactory()
442 factory.protocol = wire.Echo
443 t = internet.UNIXClient('echo.skt', factory)
445 self.failIfIdentical(t._connection, None)
447 self.assertIdentical(t1._connection, None)
449 self.assertIdentical(t._connection, None)
450 self.failIf(t.running)
452 def testStoppingServer(self):
453 if not interfaces.IReactorUNIX(reactor, None):
454 raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
455 factory = protocol.ServerFactory()
456 factory.protocol = wire.Echo
457 t = internet.UNIXServer('echo.skt', factory)
460 self.failIf(t.running)
461 factory = protocol.ClientFactory()
463 factory.clientConnectionFailed = lambda *args: d.callback(None)
464 reactor.connectUNIX('echo.skt', factory)
467 def testPickledTimer(self):
468 target = TimerTarget()
469 t0 = internet.TimerService(1, target.append, "hello")
475 self.failIf(t.running)
477 def testBrokenTimer(self):
479 t = internet.TimerService(1, lambda: 1 / 0)
480 oldFailed = t._failed
486 d.addCallback(lambda x : t.stopService)
487 d.addCallback(lambda x : self.assertEqual(
489 [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)]))
493 def test_genericServerDeprecated(self):
495 Instantiating L{GenericServer} emits a deprecation warning.
497 internet.GenericServer()
498 warnings = self.flushWarnings(
499 offendingFunctions=[self.test_genericServerDeprecated])
501 warnings[0]['message'],
502 'GenericServer was deprecated in Twisted 10.1.')
503 self.assertEqual(warnings[0]['category'], DeprecationWarning)
504 self.assertEqual(len(warnings), 1)
507 def test_genericClientDeprecated(self):
509 Instantiating L{GenericClient} emits a deprecation warning.
511 internet.GenericClient()
512 warnings = self.flushWarnings(
513 offendingFunctions=[self.test_genericClientDeprecated])
515 warnings[0]['message'],
516 'GenericClient was deprecated in Twisted 10.1.')
517 self.assertEqual(warnings[0]['category'], DeprecationWarning)
518 self.assertEqual(len(warnings), 1)
521 def test_everythingThere(self):
523 L{twisted.application.internet} dynamically defines a set of
524 L{service.Service} subclasses that in general have corresponding
525 reactor.listenXXX or reactor.connectXXX calls.
527 trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
528 for tran in trans[:]:
529 if not getattr(interfaces, "IReactor" + tran)(reactor, None):
531 if interfaces.IReactorArbitrary(reactor, None) is not None:
532 trans.insert(0, "Generic")
534 for side in 'Server Client'.split():
535 if tran == "Multicast" and side == "Client":
537 self.assertTrue(hasattr(internet, tran + side))
538 method = getattr(internet, tran + side).method
539 prefix = {'Server': 'listen', 'Client': 'connect'}[side]
540 self.assertTrue(hasattr(reactor, prefix + method) or
541 (prefix == "connect" and method == "UDP"))
542 o = getattr(internet, tran + side)()
543 self.assertEqual(service.IService(o), o)
544 test_everythingThere.suppress = [
545 util.suppress(message='GenericServer was deprecated in Twisted 10.1.',
546 category=DeprecationWarning),
547 util.suppress(message='GenericClient was deprecated in Twisted 10.1.',
548 category=DeprecationWarning),
549 util.suppress(message='twisted.internet.interfaces.IReactorArbitrary was '
550 'deprecated in Twisted 10.1.0: See IReactorFDSet.')]
553 def test_importAll(self):
555 L{twisted.application.internet} dynamically defines L{service.Service}
556 subclasses. This test ensures that the subclasses exposed by C{__all__}
557 are valid attributes of the module.
559 for cls in internet.__all__:
561 hasattr(internet, cls),
562 '%s not importable from twisted.application.internet' % (cls,))
565 def test_reactorParametrizationInServer(self):
567 L{internet._AbstractServer} supports a C{reactor} keyword argument
568 that can be used to parametrize the reactor used to listen for
571 reactor = MemoryReactor()
574 t = internet.TCPServer(1234, factory, reactor=reactor)
576 self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
579 def test_reactorParametrizationInClient(self):
581 L{internet._AbstractClient} supports a C{reactor} keyword arguments
582 that can be used to parametrize the reactor used to create new client
585 reactor = MemoryReactor()
587 factory = protocol.ClientFactory()
588 t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
591 reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
594 def test_reactorParametrizationInServerMultipleStart(self):
596 Like L{test_reactorParametrizationInServer}, but stop and restart the
597 service and check that the given reactor is still used.
599 reactor = MemoryReactor()
601 factory = protocol.Factory()
602 t = internet.TCPServer(1234, factory, reactor=reactor)
604 self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
607 self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
610 def test_reactorParametrizationInClientMultipleStart(self):
612 Like L{test_reactorParametrizationInClient}, but stop and restart the
613 service and check that the given reactor is still used.
615 reactor = MemoryReactor()
617 factory = protocol.ClientFactory()
618 t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
621 reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
625 reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
629 class TestTimerBasic(unittest.TestCase):
631 def testTimerRuns(self):
633 self.t = internet.TimerService(1, d.callback, 'hello')
634 self.t.startService()
635 d.addCallback(self.assertEqual, 'hello')
636 d.addCallback(lambda x : self.t.stopService())
637 d.addCallback(lambda x : self.failIf(self.t.running))
641 return self.t.stopService()
643 def testTimerRestart(self):
644 # restart the same TimerService
645 d1 = defer.Deferred()
646 d2 = defer.Deferred()
647 work = [(d2, "bar"), (d1, "foo")]
651 self.t = internet.TimerService(1, trigger)
652 self.t.startService()
653 def onFirstResult(result):
654 self.assertEqual(result, 'foo')
655 return self.t.stopService()
656 def onFirstStop(ignored):
657 self.failIf(self.t.running)
658 self.t.startService()
660 def onSecondResult(result):
661 self.assertEqual(result, 'bar')
663 d1.addCallback(onFirstResult)
664 d1.addCallback(onFirstStop)
665 d1.addCallback(onSecondResult)
668 def testTimerLoops(self):
670 def trigger(data, number, d):
675 self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
676 self.t.startService()
677 d.addCallback(self.assertEqual, ['hello'] * 10)
678 d.addCallback(lambda x : self.t.stopService())
682 class FakeReactor(reactors.Reactor):
684 A fake reactor with a hooked install method.
687 def __init__(self, install, *args, **kwargs):
689 @param install: any callable that will be used as install method.
690 @type install: C{callable}
692 reactors.Reactor.__init__(self, *args, **kwargs)
693 self.install = install
697 class PluggableReactorTestCase(unittest.TestCase):
699 Tests for the reactor discovery/inspection APIs.
704 Override the L{reactors.getPlugins} function, normally bound to
705 L{twisted.plugin.getPlugins}, in order to control which
706 L{IReactorInstaller} plugins are seen as available.
708 C{self.pluginResults} can be customized and will be used as the
709 result of calls to C{reactors.getPlugins}.
711 self.pluginCalls = []
712 self.pluginResults = []
713 self.originalFunction = reactors.getPlugins
714 reactors.getPlugins = self._getPlugins
719 Restore the original L{reactors.getPlugins}.
721 reactors.getPlugins = self.originalFunction
724 def _getPlugins(self, interface, package=None):
726 Stand-in for the real getPlugins method which records its arguments
727 and returns a fixed result.
729 self.pluginCalls.append((interface, package))
730 return list(self.pluginResults)
733 def test_getPluginReactorTypes(self):
735 Test that reactor plugins are returned from L{getReactorTypes}
737 name = 'fakereactortest'
738 package = __name__ + '.fakereactor'
739 description = 'description'
740 self.pluginResults = [reactors.Reactor(name, package, description)]
741 reactorTypes = reactors.getReactorTypes()
745 [(reactors.IReactorInstaller, None)])
747 for r in reactorTypes:
748 if r.shortName == name:
749 self.assertEqual(r.description, description)
752 self.fail("Reactor plugin not present in getReactorTypes() result")
755 def test_reactorInstallation(self):
757 Test that L{reactors.Reactor.install} loads the correct module and
758 calls its install attribute.
762 installed.append(True)
763 installer = FakeReactor(install,
764 'fakereactortest', __name__, 'described')
766 self.assertEqual(installed, [True])
769 def test_installReactor(self):
771 Test that the L{reactors.installReactor} function correctly installs
772 the specified reactor.
776 installed.append(True)
777 name = 'fakereactortest'
779 description = 'description'
780 self.pluginResults = [FakeReactor(install, name, package, description)]
781 reactors.installReactor(name)
782 self.assertEqual(installed, [True])
785 def test_installNonExistentReactor(self):
787 Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
788 when asked to install a reactor which it cannot find.
790 self.pluginResults = []
792 reactors.NoSuchReactor,
793 reactors.installReactor, 'somereactor')
796 def test_installNotAvailableReactor(self):
798 Test that L{reactors.installReactor} raises an exception when asked to
799 install a reactor which doesn't work in this environment.
802 raise ImportError("Missing foo bar")
803 name = 'fakereactortest'
805 description = 'description'
806 self.pluginResults = [FakeReactor(install, name, package, description)]
807 self.assertRaises(ImportError, reactors.installReactor, name)
810 def test_reactorSelectionMixin(self):
812 Test that the reactor selected is installed as soon as possible, ie
813 when the option is parsed.
816 INSTALL_EVENT = 'reactor installed'
817 SUBCOMMAND_EVENT = 'subcommands loaded'
819 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
820 def subCommands(self):
821 executed.append(SUBCOMMAND_EVENT)
822 return [('subcommand', None, lambda: self, 'test subcommand')]
823 subCommands = property(subCommands)
826 executed.append(INSTALL_EVENT)
827 self.pluginResults = [
828 FakeReactor(install, 'fakereactortest', __name__, 'described')
831 options = ReactorSelectionOptions()
832 options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
833 self.assertEqual(executed[0], INSTALL_EVENT)
834 self.assertEqual(executed.count(INSTALL_EVENT), 1)
835 self.assertEqual(options["reactor"], "fakereactortest")
838 def test_reactorSelectionMixinNonExistent(self):
840 Test that the usage mixin exits when trying to use a non existent
841 reactor (the name not matching to any reactor), giving an error
844 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
846 self.pluginResults = []
848 options = ReactorSelectionOptions()
849 options.messageOutput = StringIO()
850 e = self.assertRaises(usage.UsageError, options.parseOptions,
851 ['--reactor', 'fakereactortest', 'subcommand'])
852 self.assertIn("fakereactortest", e.args[0])
853 self.assertIn("help-reactors", e.args[0])
856 def test_reactorSelectionMixinNotAvailable(self):
858 Test that the usage mixin exits when trying to use a reactor not
859 available (the reactor raises an error at installation), giving an
862 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
864 message = "Missing foo bar"
866 raise ImportError(message)
868 name = 'fakereactortest'
870 description = 'description'
871 self.pluginResults = [FakeReactor(install, name, package, description)]
873 options = ReactorSelectionOptions()
874 options.messageOutput = StringIO()
875 e = self.assertRaises(usage.UsageError, options.parseOptions,
876 ['--reactor', 'fakereactortest', 'subcommand'])
877 self.assertIn(message, e.args[0])
878 self.assertIn("help-reactors", e.args[0])