Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / application / internet.py
1 # -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Reactor-based Services
7
8 Here are services to run clients, servers and periodic services using
9 the reactor.
10
11 If you want to run a server service, L{StreamServerEndpointService} defines a
12 service that can wrap an arbitrary L{IStreamServerEndpoint
13 <twisted.internet.interfaces.IStreamServerEndpoint>}
14 as an L{IService}. See also L{twisted.application.strports.service} for
15 constructing one of these directly from a descriptive string.
16
17 Additionally, this module (dynamically) defines various Service subclasses that
18 let you represent clients and servers in a Service hierarchy.  Endpoints APIs
19 should be preferred for stream server services, but since those APIs do not yet
20 exist for clients or datagram services, many of these are still useful.
21
22 They are as follows::
23
24   TCPServer, TCPClient,
25   UNIXServer, UNIXClient,
26   SSLServer, SSLClient,
27   UDPServer, UDPClient,
28   UNIXDatagramServer, UNIXDatagramClient,
29   MulticastServer
30
31 These classes take arbitrary arguments in their constructors and pass
32 them straight on to their respective reactor.listenXXX or
33 reactor.connectXXX calls.
34
35 For example, the following service starts a web server on port 8080:
36 C{TCPServer(8080, server.Site(r))}.  See the documentation for the
37 reactor.listen/connect* methods for more information.
38 """
39
40 import warnings
41
42 from twisted.python import log
43 from twisted.application import service
44 from twisted.internet import task
45
46 from twisted.internet.defer import CancelledError
47
48
49 def _maybeGlobalReactor(maybeReactor):
50     """
51     @return: the argument, or the global reactor if the argument is C{None}.
52     """
53     if maybeReactor is None:
54         from twisted.internet import reactor
55         return reactor
56     else:
57         return maybeReactor
58
59
60 class _VolatileDataService(service.Service):
61
62     volatile = []
63
64     def __getstate__(self):
65         d = service.Service.__getstate__(self)
66         for attr in self.volatile:
67             if attr in d:
68                 del d[attr]
69         return d
70
71
72
73 class _AbstractServer(_VolatileDataService):
74     """
75     @cvar volatile: list of attribute to remove from pickling.
76     @type volatile: C{list}
77
78     @ivar method: the type of method to call on the reactor, one of B{TCP},
79         B{UDP}, B{SSL} or B{UNIX}.
80     @type method: C{str}
81
82     @ivar reactor: the current running reactor.
83     @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
84         C{IReactorSSL} or C{IReactorUnix}.
85
86     @ivar _port: instance of port set when the service is started.
87     @type _port: a provider of L{twisted.internet.interfaces.IListeningPort}.
88     """
89
90     volatile = ['_port']
91     method = None
92     reactor = None
93
94     _port = None
95
96     def __init__(self, *args, **kwargs):
97         self.args = args
98         if 'reactor' in kwargs:
99             self.reactor = kwargs.pop("reactor")
100         self.kwargs = kwargs
101
102
103     def privilegedStartService(self):
104         service.Service.privilegedStartService(self)
105         self._port = self._getPort()
106
107
108     def startService(self):
109         service.Service.startService(self)
110         if self._port is None:
111             self._port = self._getPort()
112
113
114     def stopService(self):
115         service.Service.stopService(self)
116         # TODO: if startup failed, should shutdown skip stopListening?
117         # _port won't exist
118         if self._port is not None:
119             d = self._port.stopListening()
120             del self._port
121             return d
122
123
124     def _getPort(self):
125         """
126         Wrapper around the appropriate listen method of the reactor.
127
128         @return: the port object returned by the listen method.
129         @rtype: an object providing
130             L{twisted.internet.interfaces.IListeningPort}.
131         """
132         return getattr(_maybeGlobalReactor(self.reactor),
133                        'listen%s' % (self.method,))(*self.args, **self.kwargs)
134
135
136
137 class _AbstractClient(_VolatileDataService):
138     """
139     @cvar volatile: list of attribute to remove from pickling.
140     @type volatile: C{list}
141
142     @ivar method: the type of method to call on the reactor, one of B{TCP},
143         B{UDP}, B{SSL} or B{UNIX}.
144     @type method: C{str}
145
146     @ivar reactor: the current running reactor.
147     @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
148         C{IReactorSSL} or C{IReactorUnix}.
149
150     @ivar _connection: instance of connection set when the service is started.
151     @type _connection: a provider of L{twisted.internet.interfaces.IConnector}.
152     """
153     volatile = ['_connection']
154     method = None
155     reactor = None
156
157     _connection = None
158
159     def __init__(self, *args, **kwargs):
160         self.args = args
161         if 'reactor' in kwargs:
162             self.reactor = kwargs.pop("reactor")
163         self.kwargs = kwargs
164
165
166     def startService(self):
167         service.Service.startService(self)
168         self._connection = self._getConnection()
169
170
171     def stopService(self):
172         service.Service.stopService(self)
173         if self._connection is not None:
174             self._connection.disconnect()
175             del self._connection
176
177
178     def _getConnection(self):
179         """
180         Wrapper around the appropriate connect method of the reactor.
181
182         @return: the port object returned by the connect method.
183         @rtype: an object providing L{twisted.internet.interfaces.IConnector}.
184         """
185         return getattr(_maybeGlobalReactor(self.reactor),
186                        'connect%s' % (self.method,))(*self.args, **self.kwargs)
187
188
189
190 _doc={
191 'Client':
192 """Connect to %(tran)s
193
194 Call reactor.connect%(method)s when the service starts, with the
195 arguments given to the constructor.
196 """,
197 'Server':
198 """Serve %(tran)s clients
199
200 Call reactor.listen%(method)s when the service starts, with the
201 arguments given to the constructor. When the service stops,
202 stop listening. See twisted.internet.interfaces for documentation
203 on arguments to the reactor method.
204 """,
205 }
206
207 import types
208 for tran in 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split():
209     for side in 'Server Client'.split():
210         if tran == "Multicast" and side == "Client":
211             continue
212         base = globals()['_Abstract'+side]
213         method = {'Generic': 'With'}.get(tran, tran)
214         doc = _doc[side]%vars()
215         klass = types.ClassType(tran+side, (base,),
216                                 {'method': method, '__doc__': doc})
217         globals()[tran+side] = klass
218
219
220
221 class GenericServer(_AbstractServer):
222     """
223     Serve Generic clients
224
225     Call reactor.listenWith when the service starts, with the arguments given to
226     the constructor. When the service stops, stop listening. See
227     twisted.internet.interfaces for documentation on arguments to the reactor
228     method.
229
230     This service is deprecated (because reactor.listenWith is deprecated).
231     """
232     method = 'With'
233
234     def __init__(self, *args, **kwargs):
235         warnings.warn(
236             'GenericServer was deprecated in Twisted 10.1.',
237             category=DeprecationWarning,
238             stacklevel=2)
239         _AbstractServer.__init__(self, *args, **kwargs)
240
241
242
243 class GenericClient(_AbstractClient):
244     """
245     Connect to Generic.
246
247     Call reactor.connectWith when the service starts, with the arguments given
248     to the constructor.
249
250     This service is deprecated (because reactor.connectWith is deprecated).
251     """
252     method = 'With'
253
254     def __init__(self, *args, **kwargs):
255         warnings.warn(
256             'GenericClient was deprecated in Twisted 10.1.',
257             category=DeprecationWarning,
258             stacklevel=2)
259         _AbstractClient.__init__(self, *args, **kwargs)
260
261
262
263 class TimerService(_VolatileDataService):
264
265     """Service to periodically call a function
266
267     Every C{step} seconds call the given function with the given arguments.
268     The service starts the calls when it starts, and cancels them
269     when it stops.
270     """
271
272     volatile = ['_loop']
273
274     def __init__(self, step, callable, *args, **kwargs):
275         self.step = step
276         self.call = (callable, args, kwargs)
277
278     def startService(self):
279         service.Service.startService(self)
280         callable, args, kwargs = self.call
281         # we have to make a new LoopingCall each time we're started, because
282         # an active LoopingCall remains active when serialized. If
283         # LoopingCall were a _VolatileDataService, we wouldn't need to do
284         # this.
285         self._loop = task.LoopingCall(callable, *args, **kwargs)
286         self._loop.start(self.step, now=True).addErrback(self._failed)
287
288     def _failed(self, why):
289         # make a note that the LoopingCall is no longer looping, so we don't
290         # try to shut it down a second time in stopService. I think this
291         # should be in LoopingCall. -warner
292         self._loop.running = False
293         log.err(why)
294
295     def stopService(self):
296         if self._loop.running:
297             self._loop.stop()
298         return service.Service.stopService(self)
299
300
301
302 class CooperatorService(service.Service):
303     """
304     Simple L{service.IService} which starts and stops a L{twisted.internet.task.Cooperator}.
305     """
306     def __init__(self):
307         self.coop = task.Cooperator(started=False)
308
309
310     def coiterate(self, iterator):
311         return self.coop.coiterate(iterator)
312
313
314     def startService(self):
315         self.coop.start()
316
317
318     def stopService(self):
319         self.coop.stop()
320
321
322
323 class StreamServerEndpointService(service.Service, object):
324     """
325     A L{StreamServerEndpointService} is an L{IService} which runs a server on a
326     listening port described by an L{IStreamServerEndpoint
327     <twisted.internet.interfaces.IStreamServerEndpoint>}.
328
329     @ivar factory: A server factory which will be used to listen on the
330         endpoint.
331
332     @ivar endpoint: An L{IStreamServerEndpoint
333         <twisted.internet.interfaces.IStreamServerEndpoint>} provider
334         which will be used to listen when the service starts.
335
336     @ivar _waitingForPort: a Deferred, if C{listen} has yet been invoked on the
337         endpoint, otherwise None.
338
339     @ivar _raiseSynchronously: Defines error-handling behavior for the case
340         where C{listen(...)} raises an exception before C{startService} or
341         C{privilegedStartService} have completed.
342
343     @type _raiseSynchronously: C{bool}
344
345     @since: 10.2
346     """
347
348     _raiseSynchronously = None
349
350     def __init__(self, endpoint, factory):
351         self.endpoint = endpoint
352         self.factory = factory
353         self._waitingForPort = None
354
355
356     def privilegedStartService(self):
357         """
358         Start listening on the endpoint.
359         """
360         service.Service.privilegedStartService(self)
361         self._waitingForPort = self.endpoint.listen(self.factory)
362         raisedNow = []
363         def handleIt(err):
364             if self._raiseSynchronously:
365                 raisedNow.append(err)
366             elif not err.check(CancelledError):
367                 log.err(err)
368         self._waitingForPort.addErrback(handleIt)
369         if raisedNow:
370             raisedNow[0].raiseException()
371
372
373     def startService(self):
374         """
375         Start listening on the endpoint, unless L{privilegedStartService} got
376         around to it already.
377         """
378         service.Service.startService(self)
379         if self._waitingForPort is None:
380             self.privilegedStartService()
381
382
383     def stopService(self):
384         """
385         Stop listening on the port if it is already listening, otherwise,
386         cancel the attempt to listen.
387
388         @return: a L{Deferred<twisted.internet.defer.Deferred>} which fires
389             with C{None} when the port has stopped listening.
390         """
391         self._waitingForPort.cancel()
392         def stopIt(port):
393             if port is not None:
394                 return port.stopListening()
395         d = self._waitingForPort.addCallback(stopIt)
396         def stop(passthrough):
397             self.running = False
398             return passthrough
399         d.addBoth(stop)
400         return d
401
402
403
404 __all__ = (['TimerService', 'CooperatorService', 'MulticastServer',
405             'StreamServerEndpointService'] +
406            [tran+side
407             for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram'.split()
408             for side in 'Server Client'.split()])