Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / test / reactormixins.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for implementations of L{IReactorTime}.
6 """
7
8 __metaclass__ = type
9
10 import os, signal, time
11
12 from twisted.internet.defer import TimeoutError, Deferred, gatherResults
13 from twisted.internet.protocol import ClientFactory, Protocol
14 from twisted.trial.unittest import TestCase, SkipTest
15 from twisted.python.runtime import platform
16 from twisted.python.reflect import namedAny, fullyQualifiedName
17 from twisted.python import log
18 from twisted.python.failure import Failure
19
20 # Access private APIs.
21 if platform.isWindows():
22     process = None
23 else:
24     from twisted.internet import process
25
26
27
28 def needsRunningReactor(reactor, thunk):
29     """
30     Various functions within these tests need an already-running reactor at
31     some point.  They need to stop the reactor when the test has completed, and
32     that means calling reactor.stop().  However, reactor.stop() raises an
33     exception if the reactor isn't already running, so if the L{Deferred} that
34     a particular API under test returns fires synchronously (as especially an
35     endpoint's C{connect()} method may do, if the connect is to a local
36     interface address) then the test won't be able to stop the reactor being
37     tested and finish.  So this calls C{thunk} only once C{reactor} is running.
38
39     (This is just an alias for
40     L{twisted.internet.interfaces.IReactorCore.callWhenRunning} on the given
41     reactor parameter, in order to centrally reference the above paragraph and
42     repeating it everywhere as a comment.)
43
44     @param reactor: the L{twisted.internet.interfaces.IReactorCore} under test
45
46     @param thunk: a 0-argument callable, which eventually finishes the test in
47         question, probably in a L{Deferred} callback.
48     """
49     reactor.callWhenRunning(thunk)
50
51
52
53 class ConnectableProtocol(Protocol):
54     """
55     A protocol to be used with L{runProtocolsWithReactor}.
56
57     The protocol and its pair should eventually disconnect from each other.
58
59     @ivar reactor: The reactor used in this test.
60
61     @ivar disconnectReason: The L{Failure} passed to C{connectionLost}.
62
63     @ivar _done: A L{Deferred} which will be fired when the connection is
64         lost.
65     """
66
67     disconnectReason = None
68
69     def _setAttributes(self, reactor, done):
70         """
71         Set attributes on the protocol that are known only externally; this
72         will be called by L{runProtocolsWithReactor} when this protocol is
73         instantiated.
74
75         @param reactor: The reactor used in this test.
76
77         @param done: A L{Deferred} which will be fired when the connection is
78            lost.
79         """
80         self.reactor = reactor
81         self._done = done
82
83
84     def connectionLost(self, reason):
85         self.disconnectReason = reason
86         self._done.callback(None)
87         del self._done
88
89
90
91 class EndpointCreator:
92     """
93     Create client and server endpoints that know how to connect to each other.
94     """
95
96     def server(self, reactor):
97         """
98         Return an object providing C{IStreamServerEndpoint} for use in creating
99         a server to use to establish the connection type to be tested.
100         """
101         raise NotImplementedError()
102
103
104     def client(self, reactor, serverAddress):
105         """
106         Return an object providing C{IStreamClientEndpoint} for use in creating
107         a client to use to establish the connection type to be tested.
108         """
109         raise NotImplementedError()
110
111
112
113 class _SingleProtocolFactory(ClientFactory):
114     """
115     Factory to be used by L{runProtocolsWithReactor}.
116
117     It always returns the same protocol (i.e. is intended for only a single connection).
118     """
119
120     def __init__(self, protocol):
121         self._protocol = protocol
122
123
124     def buildProtocol(self, addr):
125         return self._protocol
126
127
128
129 def runProtocolsWithReactor(reactorBuilder, serverProtocol, clientProtocol,
130                             endpointCreator):
131     """
132     Connect two protocols using endpoints and a new reactor instance.
133
134     A new reactor will be created and run, with the client and server protocol
135     instances connected to each other using the given endpoint creator. The
136     protocols should run through some set of tests, then disconnect; when both
137     have disconnected the reactor will be stopped and the function will
138     return.
139
140     @param reactorBuilder: A L{ReactorBuilder} instance.
141
142     @param serverProtocol: A L{ConnectableProtocol} that will be the server.
143
144     @param clientProtocol: A L{ConnectableProtocol} that will be the client.
145
146     @param endpointCreator: An instance of L{EndpointCreator}.
147
148     @return: The reactor run by this test.
149     """
150     reactor = reactorBuilder.buildReactor()
151     serverProtocol._setAttributes(reactor, Deferred())
152     clientProtocol._setAttributes(reactor, Deferred())
153     serverFactory = _SingleProtocolFactory(serverProtocol)
154     clientFactory = _SingleProtocolFactory(clientProtocol)
155
156     # Listen on a port:
157     serverEndpoint = endpointCreator.server(reactor)
158     d = serverEndpoint.listen(serverFactory)
159
160     # Connect to the port:
161     def gotPort(p):
162         clientEndpoint = endpointCreator.client(
163             reactor, p.getHost())
164         return clientEndpoint.connect(clientFactory)
165     d.addCallback(gotPort)
166
167     # Stop reactor when both connections are lost:
168     def failed(result):
169         log.err(result, "Connection setup failed.")
170     disconnected = gatherResults([serverProtocol._done, clientProtocol._done])
171     d.addCallback(lambda _: disconnected)
172     d.addErrback(failed)
173     d.addCallback(lambda _: needsRunningReactor(reactor, reactor.stop))
174
175     reactorBuilder.runReactor(reactor)
176     return reactor
177
178
179
180 class ReactorBuilder:
181     """
182     L{TestCase} mixin which provides a reactor-creation API.  This mixin
183     defines C{setUp} and C{tearDown}, so mix it in before L{TestCase} or call
184     its methods from the overridden ones in the subclass.
185
186     @cvar skippedReactors: A dict mapping FQPN strings of reactors for
187         which the tests defined by this class will be skipped to strings
188         giving the skip message.
189     @cvar requiredInterfaces: A C{list} of interfaces which the reactor must
190         provide or these tests will be skipped.  The default, C{None}, means
191         that no interfaces are required.
192     @ivar reactorFactory: A no-argument callable which returns the reactor to
193         use for testing.
194     @ivar originalHandler: The SIGCHLD handler which was installed when setUp
195         ran and which will be re-installed when tearDown runs.
196     @ivar _reactors: A list of FQPN strings giving the reactors for which
197         TestCases will be created.
198     """
199
200     _reactors = [
201         # Select works everywhere
202         "twisted.internet.selectreactor.SelectReactor",
203         ]
204
205     if platform.isWindows():
206         # PortableGtkReactor is only really interesting on Windows,
207         # but not really Windows specific; if you want you can
208         # temporarily move this up to the all-platforms list to test
209         # it on other platforms.  It's not there in general because
210         # it's not _really_ worth it to support on other platforms,
211         # since no one really wants to use it on other platforms.
212         _reactors.extend([
213                 "twisted.internet.gtk2reactor.PortableGtkReactor",
214                 "twisted.internet.gireactor.PortableGIReactor",
215                 "twisted.internet.gtk3reactor.PortableGtk3Reactor",
216                 "twisted.internet.win32eventreactor.Win32Reactor",
217                 "twisted.internet.iocpreactor.reactor.IOCPReactor"])
218     else:
219         _reactors.extend([
220                 "twisted.internet.glib2reactor.Glib2Reactor",
221                 "twisted.internet.gtk2reactor.Gtk2Reactor",
222                 "twisted.internet.gireactor.GIReactor",
223                 "twisted.internet.gtk3reactor.Gtk3Reactor"])
224         if platform.isMacOSX():
225             _reactors.append("twisted.internet.cfreactor.CFReactor")
226         else:
227             _reactors.extend([
228                     "twisted.internet.pollreactor.PollReactor",
229                     "twisted.internet.epollreactor.EPollReactor"])
230             if not platform.isLinux():
231                 # Presumably Linux is not going to start supporting kqueue, so
232                 # skip even trying this configuration.
233                 _reactors.extend([
234                         # Support KQueue on non-OS-X POSIX platforms for now.
235                         "twisted.internet.kqreactor.KQueueReactor",
236                         ])
237
238     reactorFactory = None
239     originalHandler = None
240     requiredInterfaces = None
241     skippedReactors = {}
242
243     def setUp(self):
244         """
245         Clear the SIGCHLD handler, if there is one, to ensure an environment
246         like the one which exists prior to a call to L{reactor.run}.
247         """
248         if not platform.isWindows():
249             self.originalHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL)
250
251
252     def tearDown(self):
253         """
254         Restore the original SIGCHLD handler and reap processes as long as
255         there seem to be any remaining.
256         """
257         if self.originalHandler is not None:
258             signal.signal(signal.SIGCHLD, self.originalHandler)
259         if process is not None:
260             begin = time.time()
261             while process.reapProcessHandlers:
262                 log.msg(
263                     "ReactorBuilder.tearDown reaping some processes %r" % (
264                         process.reapProcessHandlers,))
265                 process.reapAllProcesses()
266
267                 # The process should exit on its own.  However, if it
268                 # doesn't, we're stuck in this loop forever.  To avoid
269                 # hanging the test suite, eventually give the process some
270                 # help exiting and move on.
271                 time.sleep(0.001)
272                 if time.time() - begin > 60:
273                     for pid in process.reapProcessHandlers:
274                         os.kill(pid, signal.SIGKILL)
275                     raise Exception(
276                         "Timeout waiting for child processes to exit: %r" % (
277                             process.reapProcessHandlers,))
278
279
280     def unbuildReactor(self, reactor):
281         """
282         Clean up any resources which may have been allocated for the given
283         reactor by its creation or by a test which used it.
284         """
285         # Chris says:
286         #
287         # XXX These explicit calls to clean up the waker (and any other
288         # internal readers) should become obsolete when bug #3063 is
289         # fixed. -radix, 2008-02-29. Fortunately it should probably cause an
290         # error when bug #3063 is fixed, so it should be removed in the same
291         # branch that fixes it.
292         #
293         # -exarkun
294         reactor._uninstallHandler()
295         if getattr(reactor, '_internalReaders', None) is not None:
296             for reader in reactor._internalReaders:
297                 reactor.removeReader(reader)
298                 reader.connectionLost(None)
299             reactor._internalReaders.clear()
300
301         # Here's an extra thing unrelated to wakers but necessary for
302         # cleaning up after the reactors we make.  -exarkun
303         reactor.disconnectAll()
304
305         # It would also be bad if any timed calls left over were allowed to
306         # run.
307         calls = reactor.getDelayedCalls()
308         for c in calls:
309             c.cancel()
310
311
312     def buildReactor(self):
313         """
314         Create and return a reactor using C{self.reactorFactory}.
315         """
316         try:
317             from twisted.internet.cfreactor import CFReactor
318             from twisted.internet import reactor as globalReactor
319         except ImportError:
320             pass
321         else:
322             if (isinstance(globalReactor, CFReactor)
323                 and self.reactorFactory is CFReactor):
324                 raise SkipTest(
325                     "CFReactor uses APIs which manipulate global state, "
326                     "so it's not safe to run its own reactor-builder tests "
327                     "under itself")
328         try:
329             reactor = self.reactorFactory()
330         except:
331             # Unfortunately, not all errors which result in a reactor
332             # being unusable are detectable without actually
333             # instantiating the reactor.  So we catch some more here
334             # and skip the test if necessary.  We also log it to aid
335             # with debugging, but flush the logged error so the test
336             # doesn't fail.
337             log.err(None, "Failed to install reactor")
338             self.flushLoggedErrors()
339             raise SkipTest(Failure().getErrorMessage())
340         else:
341             if self.requiredInterfaces is not None:
342                 missing = filter(
343                      lambda required: not required.providedBy(reactor),
344                      self.requiredInterfaces)
345                 if missing:
346                     self.unbuildReactor(reactor)
347                     raise SkipTest("%s does not provide %s" % (
348                         fullyQualifiedName(reactor.__class__),
349                         ",".join([fullyQualifiedName(x) for x in missing])))
350         self.addCleanup(self.unbuildReactor, reactor)
351         return reactor
352
353
354     def runReactor(self, reactor, timeout=None):
355         """
356         Run the reactor for at most the given amount of time.
357
358         @param reactor: The reactor to run.
359
360         @type timeout: C{int} or C{float}
361         @param timeout: The maximum amount of time, specified in seconds, to
362             allow the reactor to run.  If the reactor is still running after
363             this much time has elapsed, it will be stopped and an exception
364             raised.  If C{None}, the default test method timeout imposed by
365             Trial will be used.  This depends on the L{IReactorTime}
366             implementation of C{reactor} for correct operation.
367
368         @raise TimeoutError: If the reactor is still running after C{timeout}
369             seconds.
370         """
371         if timeout is None:
372             timeout = self.getTimeout()
373
374         timedOut = []
375         def stop():
376             timedOut.append(None)
377             reactor.stop()
378
379         reactor.callLater(timeout, stop)
380         reactor.run()
381         if timedOut:
382             raise TimeoutError(
383                 "reactor still running after %s seconds" % (timeout,))
384
385
386     def makeTestCaseClasses(cls):
387         """
388         Create a L{TestCase} subclass which mixes in C{cls} for each known
389         reactor and return a dict mapping their names to them.
390         """
391         classes = {}
392         for reactor in cls._reactors:
393             shortReactorName = reactor.split(".")[-1]
394             name = (cls.__name__ + "." + shortReactorName).replace(".", "_")
395             class testcase(cls, TestCase):
396                 __module__ = cls.__module__
397                 if reactor in cls.skippedReactors:
398                     skip = cls.skippedReactors[reactor]
399                 try:
400                     reactorFactory = namedAny(reactor)
401                 except:
402                     skip = Failure().getErrorMessage()
403             testcase.__name__ = name
404             classes[testcase.__name__] = testcase
405         return classes
406     makeTestCaseClasses = classmethod(makeTestCaseClasses)
407
408
409 __all__ = ['ReactorBuilder']