Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / posixbase.py
1 # -*- test-case-name: twisted.test.test_internet,twisted.internet.test.test_posixbase -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Posix reactor base class
7 """
8
9 import warnings
10 import socket
11 import errno
12 import os
13 import sys
14
15 from zope.interface import implements, classImplements
16
17 from twisted.python.compat import set
18 from twisted.internet.interfaces import IReactorUNIX, IReactorUNIXDatagram
19 from twisted.internet.interfaces import (
20     IReactorTCP, IReactorUDP, IReactorSSL, _IReactorArbitrary, IReactorSocket)
21 from twisted.internet.interfaces import IReactorProcess, IReactorMulticast
22 from twisted.internet.interfaces import IHalfCloseableDescriptor
23 from twisted.internet import error
24 from twisted.internet import tcp, udp
25
26 from twisted.python import log, failure, util
27 from twisted.persisted import styles
28 from twisted.python.runtime import platformType, platform
29
30 from twisted.internet.base import ReactorBase, _SignalReactorMixin
31 from twisted.internet.main import CONNECTION_DONE, CONNECTION_LOST
32
33 # Exceptions that doSelect might return frequently
34 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
35 _NO_FILEDESC = error.ConnectionFdescWentAway('File descriptor lost')
36
37
38 try:
39     from twisted.protocols import tls
40 except ImportError:
41     tls = None
42     try:
43         from twisted.internet import ssl
44     except ImportError:
45         ssl = None
46
47 try:
48     from twisted.internet import unix
49     unixEnabled = True
50 except ImportError:
51     unixEnabled = False
52
53 processEnabled = False
54 if platformType == 'posix':
55     from twisted.internet import fdesc, process, _signals
56     processEnabled = True
57
58 if platform.isWindows():
59     try:
60         import win32process
61         processEnabled = True
62     except ImportError:
63         win32process = None
64
65
66 class _SocketWaker(log.Logger, styles.Ephemeral):
67     """
68     The I{self-pipe trick<http://cr.yp.to/docs/selfpipe.html>}, implemented
69     using a pair of sockets rather than pipes (due to the lack of support in
70     select() on Windows for pipes), used to wake up the main loop from
71     another thread.
72     """
73     disconnected = 0
74
75     def __init__(self, reactor):
76         """Initialize.
77         """
78         self.reactor = reactor
79         # Following select_trigger (from asyncore)'s example;
80         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
81         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
82         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
83         server.bind(('127.0.0.1', 0))
84         server.listen(1)
85         client.connect(server.getsockname())
86         reader, clientaddr = server.accept()
87         client.setblocking(0)
88         reader.setblocking(0)
89         self.r = reader
90         self.w = client
91         self.fileno = self.r.fileno
92
93     def wakeUp(self):
94         """Send a byte to my connection.
95         """
96         try:
97             util.untilConcludes(self.w.send, 'x')
98         except socket.error, (err, msg):
99             if err != errno.WSAEWOULDBLOCK:
100                 raise
101
102     def doRead(self):
103         """Read some data from my connection.
104         """
105         try:
106             self.r.recv(8192)
107         except socket.error:
108             pass
109
110     def connectionLost(self, reason):
111         self.r.close()
112         self.w.close()
113
114
115
116 class _FDWaker(object, log.Logger, styles.Ephemeral):
117     """
118     The I{self-pipe trick<http://cr.yp.to/docs/selfpipe.html>}, used to wake
119     up the main loop from another thread or a signal handler.
120
121     L{_FDWaker} is a base class for waker implementations based on
122     writing to a pipe being monitored by the reactor.
123
124     @ivar o: The file descriptor for the end of the pipe which can be
125         written to to wake up a reactor monitoring this waker.
126
127     @ivar i: The file descriptor which should be monitored in order to
128         be awoken by this waker.
129     """
130     disconnected = 0
131
132     i = None
133     o = None
134
135     def __init__(self, reactor):
136         """Initialize.
137         """
138         self.reactor = reactor
139         self.i, self.o = os.pipe()
140         fdesc.setNonBlocking(self.i)
141         fdesc._setCloseOnExec(self.i)
142         fdesc.setNonBlocking(self.o)
143         fdesc._setCloseOnExec(self.o)
144         self.fileno = lambda: self.i
145
146
147     def doRead(self):
148         """
149         Read some bytes from the pipe and discard them.
150         """
151         fdesc.readFromFD(self.fileno(), lambda data: None)
152
153
154     def connectionLost(self, reason):
155         """Close both ends of my pipe.
156         """
157         if not hasattr(self, "o"):
158             return
159         for fd in self.i, self.o:
160             try:
161                 os.close(fd)
162             except IOError:
163                 pass
164         del self.i, self.o
165
166
167
168 class _UnixWaker(_FDWaker):
169     """
170     This class provides a simple interface to wake up the event loop.
171
172     This is used by threads or signals to wake up the event loop.
173     """
174
175     def wakeUp(self):
176         """Write one byte to the pipe, and flush it.
177         """
178         # We don't use fdesc.writeToFD since we need to distinguish
179         # between EINTR (try again) and EAGAIN (do nothing).
180         if self.o is not None:
181             try:
182                 util.untilConcludes(os.write, self.o, 'x')
183             except OSError, e:
184                 # XXX There is no unit test for raising the exception
185                 # for other errnos. See #4285.
186                 if e.errno != errno.EAGAIN:
187                     raise
188
189
190
191 if platformType == 'posix':
192     _Waker = _UnixWaker
193 else:
194     # Primarily Windows and Jython.
195     _Waker = _SocketWaker
196
197
198 class _SIGCHLDWaker(_FDWaker):
199     """
200     L{_SIGCHLDWaker} can wake up a reactor whenever C{SIGCHLD} is
201     received.
202
203     @see: L{twisted.internet._signals}
204     """
205     def __init__(self, reactor):
206         _FDWaker.__init__(self, reactor)
207
208
209     def install(self):
210         """
211         Install the handler necessary to make this waker active.
212         """
213         _signals.installHandler(self.o)
214
215
216     def uninstall(self):
217         """
218         Remove the handler which makes this waker active.
219         """
220         _signals.installHandler(-1)
221
222
223     def doRead(self):
224         """
225         Having woken up the reactor in response to receipt of
226         C{SIGCHLD}, reap the process which exited.
227
228         This is called whenever the reactor notices the waker pipe is
229         writeable, which happens soon after any call to the C{wakeUp}
230         method.
231         """
232         _FDWaker.doRead(self)
233         process.reapAllProcesses()
234
235
236
237
238 class _DisconnectSelectableMixin(object):
239     """
240     Mixin providing the C{_disconnectSelectable} method.
241     """
242
243     def _disconnectSelectable(self, selectable, why, isRead, faildict={
244         error.ConnectionDone: failure.Failure(error.ConnectionDone()),
245         error.ConnectionLost: failure.Failure(error.ConnectionLost())
246         }):
247         """
248         Utility function for disconnecting a selectable.
249
250         Supports half-close notification, isRead should be boolean indicating
251         whether error resulted from doRead().
252         """
253         self.removeReader(selectable)
254         f = faildict.get(why.__class__)
255         if f:
256             if (isRead and why.__class__ ==  error.ConnectionDone
257                 and IHalfCloseableDescriptor.providedBy(selectable)):
258                 selectable.readConnectionLost(f)
259             else:
260                 self.removeWriter(selectable)
261                 selectable.connectionLost(f)
262         else:
263             self.removeWriter(selectable)
264             selectable.connectionLost(failure.Failure(why))
265
266
267
268 class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
269                        ReactorBase):
270     """
271     A basis for reactors that use file descriptors.
272
273     @ivar _childWaker: C{None} or a reference to the L{_SIGCHLDWaker}
274         which is used to properly notice child process termination.
275     """
276     implements(_IReactorArbitrary, IReactorTCP, IReactorUDP, IReactorMulticast)
277
278     # Callable that creates a waker, overrideable so that subclasses can
279     # substitute their own implementation:
280     _wakerFactory = _Waker
281
282     def installWaker(self):
283         """
284         Install a `waker' to allow threads and signals to wake up the IO thread.
285
286         We use the self-pipe trick (http://cr.yp.to/docs/selfpipe.html) to wake
287         the reactor. On Windows we use a pair of sockets.
288         """
289         if not self.waker:
290             self.waker = self._wakerFactory(self)
291             self._internalReaders.add(self.waker)
292             self.addReader(self.waker)
293
294
295     _childWaker = None
296     def _handleSignals(self):
297         """
298         Extend the basic signal handling logic to also support
299         handling SIGCHLD to know when to try to reap child processes.
300         """
301         _SignalReactorMixin._handleSignals(self)
302         if platformType == 'posix':
303             if not self._childWaker:
304                 self._childWaker = _SIGCHLDWaker(self)
305                 self._internalReaders.add(self._childWaker)
306                 self.addReader(self._childWaker)
307             self._childWaker.install()
308             # Also reap all processes right now, in case we missed any
309             # signals before we installed the SIGCHLD waker/handler.
310             # This should only happen if someone used spawnProcess
311             # before calling reactor.run (and the process also exited
312             # already).
313             process.reapAllProcesses()
314
315     def _uninstallHandler(self):
316         """
317         If a child waker was created and installed, uninstall it now.
318
319         Since this disables reactor functionality and is only called
320         when the reactor is stopping, it doesn't provide any directly
321         useful functionality, but the cleanup of reactor-related
322         process-global state that it does helps in unit tests
323         involving multiple reactors and is generally just a nice
324         thing.
325         """
326         # XXX This would probably be an alright place to put all of
327         # the cleanup code for all internal readers (here and in the
328         # base class, anyway).  See #3063 for that cleanup task.
329         if self._childWaker:
330             self._childWaker.uninstall()
331
332     # IReactorProcess
333
334     def spawnProcess(self, processProtocol, executable, args=(),
335                      env={}, path=None,
336                      uid=None, gid=None, usePTY=0, childFDs=None):
337         args, env = self._checkProcessArgs(args, env)
338         if platformType == 'posix':
339             if usePTY:
340                 if childFDs is not None:
341                     raise ValueError("Using childFDs is not supported with usePTY=True.")
342                 return process.PTYProcess(self, executable, args, env, path,
343                                           processProtocol, uid, gid, usePTY)
344             else:
345                 return process.Process(self, executable, args, env, path,
346                                        processProtocol, uid, gid, childFDs)
347         elif platformType == "win32":
348             if uid is not None:
349                 raise ValueError("Setting UID is unsupported on this platform.")
350             if gid is not None:
351                 raise ValueError("Setting GID is unsupported on this platform.")
352             if usePTY:
353                 raise ValueError("The usePTY parameter is not supported on Windows.")
354             if childFDs:
355                 raise ValueError("Customizing childFDs is not supported on Windows.")
356
357             if win32process:
358                 from twisted.internet._dumbwin32proc import Process
359                 return Process(self, processProtocol, executable, args, env, path)
360             else:
361                 raise NotImplementedError, "spawnProcess not available since pywin32 is not installed."
362         else:
363             raise NotImplementedError, "spawnProcess only available on Windows or POSIX."
364
365     # IReactorUDP
366
367     def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
368         """Connects a given L{DatagramProtocol} to the given numeric UDP port.
369
370         @returns: object conforming to L{IListeningPort}.
371         """
372         p = udp.Port(port, protocol, interface, maxPacketSize, self)
373         p.startListening()
374         return p
375
376     # IReactorMulticast
377
378     def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192, listenMultiple=False):
379         """Connects a given DatagramProtocol to the given numeric UDP port.
380
381         EXPERIMENTAL.
382
383         @returns: object conforming to IListeningPort.
384         """
385         p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, listenMultiple)
386         p.startListening()
387         return p
388
389
390     # IReactorUNIX
391
392     def connectUNIX(self, address, factory, timeout=30, checkPID=0):
393         """@see: twisted.internet.interfaces.IReactorUNIX.connectUNIX
394         """
395         assert unixEnabled, "UNIX support is not present"
396         c = unix.Connector(address, factory, timeout, self, checkPID)
397         c.connect()
398         return c
399
400     def listenUNIX(self, address, factory, backlog=50, mode=0666, wantPID=0):
401         """
402         @see: twisted.internet.interfaces.IReactorUNIX.listenUNIX
403         """
404         assert unixEnabled, "UNIX support is not present"
405         p = unix.Port(address, factory, backlog, mode, self, wantPID)
406         p.startListening()
407         return p
408
409
410     # IReactorUNIXDatagram
411
412     def listenUNIXDatagram(self, address, protocol, maxPacketSize=8192,
413                            mode=0666):
414         """
415         Connects a given L{DatagramProtocol} to the given path.
416
417         EXPERIMENTAL.
418
419         @returns: object conforming to L{IListeningPort}.
420         """
421         assert unixEnabled, "UNIX support is not present"
422         p = unix.DatagramPort(address, protocol, maxPacketSize, mode, self)
423         p.startListening()
424         return p
425
426     def connectUNIXDatagram(self, address, protocol, maxPacketSize=8192,
427                             mode=0666, bindAddress=None):
428         """
429         Connects a L{ConnectedDatagramProtocol} instance to a path.
430
431         EXPERIMENTAL.
432         """
433         assert unixEnabled, "UNIX support is not present"
434         p = unix.ConnectedDatagramPort(address, protocol, maxPacketSize, mode, bindAddress, self)
435         p.startListening()
436         return p
437
438
439     # IReactorSocket (but not on Windows)
440     def adoptStreamPort(self, fileDescriptor, addressFamily, factory):
441         """
442         Create a new L{IListeningPort} from an already-initialized socket.
443
444         This just dispatches to a suitable port implementation (eg from
445         L{IReactorTCP}, etc) based on the specified C{addressFamily}.
446
447         @see: L{twisted.internet.interfaces.IReactorSocket.adoptStreamPort}
448         """
449         if addressFamily not in (socket.AF_INET, socket.AF_INET6):
450             raise error.UnsupportedAddressFamily(addressFamily)
451
452         p = tcp.Port._fromListeningDescriptor(
453             self, fileDescriptor, addressFamily, factory)
454         p.startListening()
455         return p
456
457
458     # IReactorTCP
459
460     def listenTCP(self, port, factory, backlog=50, interface=''):
461         """@see: twisted.internet.interfaces.IReactorTCP.listenTCP
462         """
463         p = tcp.Port(port, factory, backlog, interface, self)
464         p.startListening()
465         return p
466
467     def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
468         """@see: twisted.internet.interfaces.IReactorTCP.connectTCP
469         """
470         c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
471         c.connect()
472         return c
473
474     # IReactorSSL (sometimes, not implemented)
475
476     def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
477         """@see: twisted.internet.interfaces.IReactorSSL.connectSSL
478         """
479         if tls is not None:
480             tlsFactory = tls.TLSMemoryBIOFactory(contextFactory, True, factory)
481             return self.connectTCP(host, port, tlsFactory, timeout, bindAddress)
482         elif ssl is not None:
483             c = ssl.Connector(
484                 host, port, factory, contextFactory, timeout, bindAddress, self)
485             c.connect()
486             return c
487         else:
488             assert False, "SSL support is not present"
489
490
491
492     def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
493         """@see: twisted.internet.interfaces.IReactorSSL.listenSSL
494         """
495         if tls is not None:
496             tlsFactory = tls.TLSMemoryBIOFactory(contextFactory, False, factory)
497             port = self.listenTCP(port, tlsFactory, backlog, interface)
498             port._type = 'TLS'
499             return port
500         elif ssl is not None:
501             p = ssl.Port(
502                 port, factory, contextFactory, backlog, interface, self)
503             p.startListening()
504             return p
505         else:
506             assert False, "SSL support is not present"
507
508
509     # IReactorArbitrary
510     def listenWith(self, portType, *args, **kw):
511         warnings.warn(
512             "listenWith is deprecated since Twisted 10.1.  "
513             "See IReactorFDSet.",
514             category=DeprecationWarning,
515             stacklevel=2)
516         kw['reactor'] = self
517         p = portType(*args, **kw)
518         p.startListening()
519         return p
520
521
522     def connectWith(self, connectorType, *args, **kw):
523         warnings.warn(
524             "connectWith is deprecated since Twisted 10.1.  "
525             "See IReactorFDSet.",
526             category=DeprecationWarning,
527             stacklevel=2)
528         kw['reactor'] = self
529         c = connectorType(*args, **kw)
530         c.connect()
531         return c
532
533
534     def _removeAll(self, readers, writers):
535         """
536         Remove all readers and writers, and list of removed L{IReadDescriptor}s
537         and L{IWriteDescriptor}s.
538
539         Meant for calling from subclasses, to implement removeAll, like::
540
541           def removeAll(self):
542               return self._removeAll(self._reads, self._writes)
543
544         where C{self._reads} and C{self._writes} are iterables.
545         """
546         removedReaders = set(readers) - self._internalReaders
547         for reader in removedReaders:
548             self.removeReader(reader)
549
550         removedWriters = set(writers)
551         for writer in removedWriters:
552             self.removeWriter(writer)
553
554         return list(removedReaders | removedWriters)
555
556
557 class _PollLikeMixin(object):
558     """
559     Mixin for poll-like reactors.
560
561     Subclasses must define the following attributes::
562
563       - _POLL_DISCONNECTED - Bitmask for events indicating a connection was
564         lost.
565       - _POLL_IN - Bitmask for events indicating there is input to read.
566       - _POLL_OUT - Bitmask for events indicating output can be written.
567
568     Must be mixed in to a subclass of PosixReactorBase (for
569     _disconnectSelectable).
570     """
571
572     def _doReadOrWrite(self, selectable, fd, event):
573         """
574         fd is available for read or write, do the work and raise errors if
575         necessary.
576         """
577         why = None
578         inRead = False
579         if event & self._POLL_DISCONNECTED and not (event & self._POLL_IN):
580             # Handle disconnection.  But only if we finished processing all
581             # the pending input.
582             if fd in self._reads:
583                 # If we were reading from the descriptor then this is a
584                 # clean shutdown.  We know there are no read events pending
585                 # because we just checked above.  It also might be a
586                 # half-close (which is why we have to keep track of inRead).
587                 inRead = True
588                 why = CONNECTION_DONE
589             else:
590                 # If we weren't reading, this is an error shutdown of some
591                 # sort.
592                 why = CONNECTION_LOST
593         else:
594             # Any non-disconnect event turns into a doRead or a doWrite.
595             try:
596                 # First check to see if the descriptor is still valid.  This
597                 # gives fileno() a chance to raise an exception, too. 
598                 # Ideally, disconnection would always be indicated by the
599                 # return value of doRead or doWrite (or an exception from
600                 # one of those methods), but calling fileno here helps make
601                 # buggy applications more transparent.
602                 if selectable.fileno() == -1:
603                     # -1 is sort of a historical Python artifact.  Python
604                     # files and sockets used to change their file descriptor
605                     # to -1 when they closed.  For the time being, we'll
606                     # continue to support this anyway in case applications
607                     # replicated it, plus abstract.FileDescriptor.fileno
608                     # returns -1.  Eventually it'd be good to deprecate this
609                     # case.
610                     why = _NO_FILEDESC
611                 else:
612                     if event & self._POLL_IN:
613                         # Handle a read event.
614                         why = selectable.doRead()
615                         inRead = True
616                     if not why and event & self._POLL_OUT:
617                         # Handle a write event, as long as doRead didn't
618                         # disconnect us.
619                         why = selectable.doWrite()
620                         inRead = False
621             except:
622                 # Any exception from application code gets logged and will
623                 # cause us to disconnect the selectable.
624                 why = sys.exc_info()[1]
625                 log.err()
626         if why:
627             self._disconnectSelectable(selectable, why, inRead)
628
629
630
631 if tls is not None or ssl is not None:
632     classImplements(PosixReactorBase, IReactorSSL)
633 if unixEnabled:
634     classImplements(PosixReactorBase, IReactorUNIX, IReactorUNIXDatagram)
635 if processEnabled:
636     classImplements(PosixReactorBase, IReactorProcess)
637 if getattr(socket, 'fromfd', None) is not None:
638     classImplements(PosixReactorBase, IReactorSocket)
639
640 __all__ = ["PosixReactorBase"]