1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for (new code in) L{twisted.application.internet}.
9 from zope.interface import implements
10 from zope.interface.verify import verifyClass
12 from twisted.internet.protocol import Factory
13 from twisted.trial.unittest import TestCase
14 from twisted.application.internet import StreamServerEndpointService
15 from twisted.internet.interfaces import IStreamServerEndpoint, IListeningPort
16 from twisted.internet.defer import Deferred, CancelledError
18 class FakeServer(object):
20 In-memory implementation of L{IStreamServerEndpoint}.
22 @ivar result: The L{Deferred} resulting from the call to C{listen}, after
23 C{listen} has been called.
25 @ivar factory: The factory passed to C{listen}.
27 @ivar cancelException: The exception to errback C{self.result} when it is
30 @ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire
33 @ivar listenAttempts: The number of times C{listen} has been invoked.
35 @ivar failImmediately: If set, the exception to fail the L{Deferred}
36 returned from C{listen} before it is returned.
39 implements(IStreamServerEndpoint)
43 failImmediately = None
44 cancelException = CancelledError()
48 self.port = FakePort()
51 def listen(self, factory):
53 Return a Deferred and store it for future use. (Implementation of
54 L{IStreamServerEndpoint}).
56 self.listenAttempts += 1
57 self.factory = factory
58 self.result = Deferred(
59 canceller=lambda d: d.errback(self.cancelException))
60 if self.failImmediately is not None:
61 self.result.errback(self.failImmediately)
65 def startedListening(self):
67 Test code should invoke this method after causing C{listen} to be
68 invoked in order to fire the L{Deferred} previously returned from
71 self.result.callback(self.port)
74 def stoppedListening(self):
76 Test code should invoke this method after causing C{stopListening} to
77 be invoked on the port fired from the L{Deferred} returned from
78 C{listen} in order to cause the L{Deferred} returned from
79 C{stopListening} to fire.
81 self.port.deferred.callback(None)
83 verifyClass(IStreamServerEndpoint, FakeServer)
87 class FakePort(object):
89 Fake L{IListeningPort} implementation.
91 @ivar deferred: The L{Deferred} returned by C{stopListening}.
94 implements(IListeningPort)
98 def stopListening(self):
99 self.deferred = Deferred()
102 verifyClass(IStreamServerEndpoint, FakeServer)
106 class TestEndpointService(TestCase):
108 Tests for L{twisted.application.internet}.
113 Construct a stub server, a stub factory, and a
114 L{StreamServerEndpointService} to test.
116 self.fakeServer = FakeServer()
117 self.factory = Factory()
118 self.svc = StreamServerEndpointService(self.fakeServer, self.factory)
121 def test_privilegedStartService(self):
123 L{StreamServerEndpointService.privilegedStartService} calls its
124 endpoint's C{listen} method with its factory.
126 self.svc.privilegedStartService()
127 self.assertIdentical(self.factory, self.fakeServer.factory)
130 def test_synchronousRaiseRaisesSynchronously(self, thunk=None):
132 L{StreamServerEndpointService.startService} should raise synchronously
133 if the L{Deferred} returned by its wrapped
134 L{IStreamServerEndpoint.listen} has already fired with an errback and
135 the L{StreamServerEndpointService}'s C{_raiseSynchronously} flag has
136 been set. This feature is necessary to preserve compatibility with old
137 behavior of L{twisted.internet.strports.service}, which is to return a
138 service which synchronously raises an exception from C{startService}
139 (so that, among other things, twistd will not start running). However,
140 since L{IStreamServerEndpoint.listen} may fail asynchronously, it is
141 a bad idea to rely on this behavior.
143 self.fakeServer.failImmediately = ZeroDivisionError()
144 self.svc._raiseSynchronously = True
145 self.assertRaises(ZeroDivisionError, thunk or self.svc.startService)
148 def test_synchronousRaisePrivileged(self):
150 L{StreamServerEndpointService.privilegedStartService} should behave the
151 same as C{startService} with respect to
152 L{TestEndpointService.test_synchronousRaiseRaisesSynchronously}.
154 self.test_synchronousRaiseRaisesSynchronously(
155 self.svc.privilegedStartService)
158 def test_failReportsError(self):
160 L{StreamServerEndpointService.startService} and
161 L{StreamServerEndpointService.privilegedStartService} should both log
162 an exception when the L{Deferred} returned from their wrapped
163 L{IStreamServerEndpoint.listen} fails.
165 self.svc.startService()
166 self.fakeServer.result.errback(ZeroDivisionError())
167 logged = self.flushLoggedErrors(ZeroDivisionError)
168 self.assertEqual(len(logged), 1)
171 def test_synchronousFailReportsError(self):
173 Without the C{_raiseSynchronously} compatibility flag, failing
174 immediately has the same behavior as failing later; it logs the error.
176 self.fakeServer.failImmediately = ZeroDivisionError()
177 self.svc.startService()
178 logged = self.flushLoggedErrors(ZeroDivisionError)
179 self.assertEqual(len(logged), 1)
182 def test_startServiceUnstarted(self):
184 L{StreamServerEndpointService.startService} sets the C{running} flag,
185 and calls its endpoint's C{listen} method with its factory, if it
186 has not yet been started.
188 self.svc.startService()
189 self.assertIdentical(self.factory, self.fakeServer.factory)
190 self.assertEqual(self.svc.running, True)
193 def test_startServiceStarted(self):
195 L{StreamServerEndpointService.startService} sets the C{running} flag,
196 but nothing else, if the service has already been started.
198 self.test_privilegedStartService()
199 self.svc.startService()
200 self.assertEqual(self.fakeServer.listenAttempts, 1)
201 self.assertEqual(self.svc.running, True)
204 def test_stopService(self):
206 L{StreamServerEndpointService.stopService} calls C{stopListening} on
207 the L{IListeningPort} returned from its endpoint, returns the
208 C{Deferred} from stopService, and sets C{running} to C{False}.
210 self.svc.privilegedStartService()
211 self.fakeServer.startedListening()
212 # Ensure running gets set to true
213 self.svc.startService()
214 result = self.svc.stopService()
216 result.addCallback(l.append)
217 self.assertEqual(len(l), 0)
218 self.fakeServer.stoppedListening()
219 self.assertEqual(len(l), 1)
220 self.assertFalse(self.svc.running)
223 def test_stopServiceBeforeStartFinished(self):
225 L{StreamServerEndpointService.stopService} cancels the L{Deferred}
226 returned by C{listen} if it has not yet fired. No error will be logged
227 about the cancellation of the listen attempt.
229 self.svc.privilegedStartService()
230 result = self.svc.stopService()
232 result.addBoth(l.append)
233 self.assertEqual(l, [None])
234 self.assertEqual(self.flushLoggedErrors(CancelledError), [])
237 def test_stopServiceCancelStartError(self):
239 L{StreamServerEndpointService.stopService} cancels the L{Deferred}
240 returned by C{listen} if it has not fired yet. An error will be logged
241 if the resulting exception is not L{CancelledError}.
243 self.fakeServer.cancelException = ZeroDivisionError()
244 self.svc.privilegedStartService()
245 result = self.svc.stopService()
247 result.addCallback(l.append)
248 self.assertEqual(l, [None])
249 stoppingErrors = self.flushLoggedErrors(ZeroDivisionError)
250 self.assertEqual(len(stoppingErrors), 1)