Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / tcp.py
1 # -*- test-case-name: twisted.test.test_tcp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Various asynchronous TCP/IP classes.
7
8 End users shouldn't use this module directly - use the reactor APIs instead.
9 """
10
11
12 # System Imports
13 import types
14 import socket
15 import sys
16 import operator
17 import struct
18
19 from zope.interface import implements
20
21 from twisted.python.runtime import platformType
22 from twisted.python import versions, deprecate
23
24 try:
25     # Try to get the memory BIO based startTLS implementation, available since
26     # pyOpenSSL 0.10
27     from twisted.internet._newtls import (
28         ConnectionMixin as _TLSConnectionMixin,
29         ClientMixin as _TLSClientMixin,
30         ServerMixin as _TLSServerMixin)
31 except ImportError:
32     try:
33         # Try to get the socket BIO based startTLS implementation, available in
34         # all pyOpenSSL versions
35         from twisted.internet._oldtls import (
36             ConnectionMixin as _TLSConnectionMixin,
37             ClientMixin as _TLSClientMixin,
38             ServerMixin as _TLSServerMixin)
39     except ImportError:
40         # There is no version of startTLS available
41         class _TLSConnectionMixin(object):
42             TLS = False
43         class _TLSClientMixin(object):
44             pass
45         class _TLSServerMixin(object):
46             pass
47
48 if platformType == 'win32':
49     # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
50     EPERM = object()
51     from errno import WSAEINVAL as EINVAL
52     from errno import WSAEWOULDBLOCK as EWOULDBLOCK
53     from errno import WSAEINPROGRESS as EINPROGRESS
54     from errno import WSAEALREADY as EALREADY
55     from errno import WSAECONNRESET as ECONNRESET
56     from errno import WSAEISCONN as EISCONN
57     from errno import WSAENOTCONN as ENOTCONN
58     from errno import WSAEINTR as EINTR
59     from errno import WSAENOBUFS as ENOBUFS
60     from errno import WSAEMFILE as EMFILE
61     # No such thing as WSAENFILE, either.
62     ENFILE = object()
63     # Nor ENOMEM
64     ENOMEM = object()
65     EAGAIN = EWOULDBLOCK
66     from errno import WSAECONNRESET as ECONNABORTED
67
68     from twisted.python.win32 import formatError as strerror
69 else:
70     from errno import EPERM
71     from errno import EINVAL
72     from errno import EWOULDBLOCK
73     from errno import EINPROGRESS
74     from errno import EALREADY
75     from errno import ECONNRESET
76     from errno import EISCONN
77     from errno import ENOTCONN
78     from errno import EINTR
79     from errno import ENOBUFS
80     from errno import EMFILE
81     from errno import ENFILE
82     from errno import ENOMEM
83     from errno import EAGAIN
84     from errno import ECONNABORTED
85
86     from os import strerror
87
88
89 from errno import errorcode
90
91 # Twisted Imports
92 from twisted.internet import base, address, fdesc
93 from twisted.internet.task import deferLater
94 from twisted.python import log, failure, reflect
95 from twisted.python.util import unsignedID, untilConcludes
96 from twisted.internet.error import CannotListenError
97 from twisted.internet import abstract, main, interfaces, error
98
99 # Not all platforms have, or support, this flag.
100 _AI_NUMERICSERV = getattr(socket, "AI_NUMERICSERV", 0)
101
102
103
104 class _SocketCloser(object):
105     _socketShutdownMethod = 'shutdown'
106
107     def _closeSocket(self, orderly):
108         # The call to shutdown() before close() isn't really necessary, because
109         # we set FD_CLOEXEC now, which will ensure this is the only process
110         # holding the FD, thus ensuring close() really will shutdown the TCP
111         # socket. However, do it anyways, just to be safe.
112         skt = self.socket
113         try:
114             if orderly:
115                 if self._socketShutdownMethod is not None:
116                     getattr(skt, self._socketShutdownMethod)(2)
117             else:
118                 # Set SO_LINGER to 1,0 which, by convention, causes a
119                 # connection reset to be sent when close is called,
120                 # instead of the standard FIN shutdown sequence.
121                 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
122                                        struct.pack("ii", 1, 0))
123
124         except socket.error:
125             pass
126         try:
127             skt.close()
128         except socket.error:
129             pass
130
131
132
133 class _AbortingMixin(object):
134     """
135     Common implementation of C{abortConnection}.
136
137     @ivar _aborting: Set to C{True} when C{abortConnection} is called.
138     @type _aborting: C{bool}
139     """
140     _aborting = False
141
142     def abortConnection(self):
143         """
144         Aborts the connection immediately, dropping any buffered data.
145
146         @since: 11.1
147         """
148         if self.disconnected or self._aborting:
149             return
150         self._aborting = True
151         self.stopReading()
152         self.stopWriting()
153         self.doRead = lambda *args, **kwargs: None
154         self.doWrite = lambda *args, **kwargs: None
155         self.reactor.callLater(0, self.connectionLost,
156                                failure.Failure(error.ConnectionAborted()))
157
158
159
160 class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
161                  _AbortingMixin):
162     """
163     Superclass of all socket-based FileDescriptors.
164
165     This is an abstract superclass of all objects which represent a TCP/IP
166     connection based socket.
167
168     @ivar logstr: prefix used when logging events related to this connection.
169     @type logstr: C{str}
170     """
171     implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
172
173
174     def __init__(self, skt, protocol, reactor=None):
175         abstract.FileDescriptor.__init__(self, reactor=reactor)
176         self.socket = skt
177         self.socket.setblocking(0)
178         self.fileno = skt.fileno
179         self.protocol = protocol
180
181
182     def getHandle(self):
183         """Return the socket for this connection."""
184         return self.socket
185
186
187     def doRead(self):
188         """Calls self.protocol.dataReceived with all available data.
189
190         This reads up to self.bufferSize bytes of data from its socket, then
191         calls self.dataReceived(data) to process it.  If the connection is not
192         lost through an error in the physical recv(), this function will return
193         the result of the dataReceived call.
194         """
195         try:
196             data = self.socket.recv(self.bufferSize)
197         except socket.error, se:
198             if se.args[0] == EWOULDBLOCK:
199                 return
200             else:
201                 return main.CONNECTION_LOST
202
203         return self._dataReceived(data)
204
205
206     def _dataReceived(self, data):
207         if not data:
208             return main.CONNECTION_DONE
209         rval = self.protocol.dataReceived(data)
210         if rval is not None:
211             offender = self.protocol.dataReceived
212             warningFormat = (
213                 'Returning a value other than None from %(fqpn)s is '
214                 'deprecated since %(version)s.')
215             warningString = deprecate.getDeprecationWarningString(
216                 offender, versions.Version('Twisted', 11, 0, 0),
217                 format=warningFormat)
218             deprecate.warnAboutFunction(offender, warningString)
219         return rval
220
221
222     def writeSomeData(self, data):
223         """
224         Write as much as possible of the given data to this TCP connection.
225
226         This sends up to C{self.SEND_LIMIT} bytes from C{data}.  If the
227         connection is lost, an exception is returned.  Otherwise, the number
228         of bytes successfully written is returned.
229         """
230         # Limit length of buffer to try to send, because some OSes are too
231         # stupid to do so themselves (ahem windows)
232         limitedData = buffer(data, 0, self.SEND_LIMIT)
233
234         try:
235             return untilConcludes(self.socket.send, limitedData)
236         except socket.error, se:
237             if se.args[0] in (EWOULDBLOCK, ENOBUFS):
238                 return 0
239             else:
240                 return main.CONNECTION_LOST
241
242
243     def _closeWriteConnection(self):
244         try:
245             getattr(self.socket, self._socketShutdownMethod)(1)
246         except socket.error:
247             pass
248         p = interfaces.IHalfCloseableProtocol(self.protocol, None)
249         if p:
250             try:
251                 p.writeConnectionLost()
252             except:
253                 f = failure.Failure()
254                 log.err()
255                 self.connectionLost(f)
256
257
258     def readConnectionLost(self, reason):
259         p = interfaces.IHalfCloseableProtocol(self.protocol, None)
260         if p:
261             try:
262                 p.readConnectionLost()
263             except:
264                 log.err()
265                 self.connectionLost(failure.Failure())
266         else:
267             self.connectionLost(reason)
268
269
270
271     def connectionLost(self, reason):
272         """See abstract.FileDescriptor.connectionLost().
273         """
274         # Make sure we're not called twice, which can happen e.g. if
275         # abortConnection() is called from protocol's dataReceived and then
276         # code immediately after throws an exception that reaches the
277         # reactor. We can't rely on "disconnected" attribute for this check
278         # since twisted.internet._oldtls does evil things to it:
279         if not hasattr(self, "socket"):
280             return
281         abstract.FileDescriptor.connectionLost(self, reason)
282         self._closeSocket(not reason.check(error.ConnectionAborted))
283         protocol = self.protocol
284         del self.protocol
285         del self.socket
286         del self.fileno
287         protocol.connectionLost(reason)
288
289
290     logstr = "Uninitialized"
291
292     def logPrefix(self):
293         """Return the prefix to log with when I own the logging thread.
294         """
295         return self.logstr
296
297     def getTcpNoDelay(self):
298         return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
299
300     def setTcpNoDelay(self, enabled):
301         self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
302
303     def getTcpKeepAlive(self):
304         return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
305                                                      socket.SO_KEEPALIVE))
306
307     def setTcpKeepAlive(self, enabled):
308         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
309
310
311
312
313 class _BaseBaseClient(object):
314     """
315     Code shared with other (non-POSIX) reactors for management of general
316     outgoing connections.
317
318     Requirements upon subclasses are documented as instance variables rather
319     than abstract methods, in order to avoid MRO confusion, since this base is
320     mixed in to unfortunately weird and distinctive multiple-inheritance
321     hierarchies and many of these attributes are provided by peer classes
322     rather than descendant classes in those hierarchies.
323
324     @ivar addressFamily: The address family constant (C{socket.AF_INET},
325         C{socket.AF_INET6}, C{socket.AF_UNIX}) of the underlying socket of this
326         client connection.
327     @type addressFamily: C{int}
328
329     @ivar socketType: The socket type constant (C{socket.SOCK_STREAM} or
330         C{socket.SOCK_DGRAM}) of the underlying socket.
331     @type socketType: C{int}
332
333     @ivar _requiresResolution: A flag indicating whether the address of this
334         client will require name resolution.  C{True} if the hostname of said
335         address indicates a name that must be resolved by hostname lookup,
336         C{False} if it indicates an IP address literal.
337     @type _requiresResolution: C{bool}
338
339     @cvar _commonConnection: Subclasses must provide this attribute, which
340         indicates the L{Connection}-alike class to invoke C{__init__} and
341         C{connectionLost} on.
342     @type _commonConnection: C{type}
343
344     @ivar _stopReadingAndWriting: Subclasses must implement in order to remove
345         this transport from its reactor's notifications in response to a
346         terminated connection attempt.
347     @type _stopReadingAndWriting: 0-argument callable returning C{None}
348
349     @ivar _closeSocket: Subclasses must implement in order to close the socket
350         in response to a terminated connection attempt.
351     @type _closeSocket: 1-argument callable; see L{_SocketCloser._closeSocket}
352
353     @ivar _collectSocketDetails: Clean up references to the attached socket in
354         its underlying OS resource (such as a file descriptor or file handle),
355         as part of post connection-failure cleanup.
356     @type _collectSocketDetails: 0-argument callable returning C{None}.
357
358     @ivar reactor: The class pointed to by C{_commonConnection} should set this
359         attribute in its constructor.
360     @type reactor: L{twisted.internet.interfaces.IReactorTime},
361         L{twisted.internet.interfaces.IReactorCore},
362         L{twisted.internet.interfaces.IReactorFDSet}
363     """
364
365     addressFamily = socket.AF_INET
366     socketType = socket.SOCK_STREAM
367
368     def _finishInit(self, whenDone, skt, error, reactor):
369         """
370         Called by subclasses to continue to the stage of initialization where
371         the socket connect attempt is made.
372
373         @param whenDone: A 0-argument callable to invoke once the connection is
374             set up.  This is C{None} if the connection could not be prepared
375             due to a previous error.
376
377         @param skt: The socket object to use to perform the connection.
378         @type skt: C{socket._socketobject}
379
380         @param error: The error to fail the connection with.
381
382         @param reactor: The reactor to use for this client.
383         @type reactor: L{twisted.internet.interfaces.IReactorTime}
384         """
385         if whenDone:
386             self._commonConnection.__init__(self, skt, None, reactor)
387             reactor.callLater(0, whenDone)
388         else:
389             reactor.callLater(0, self.failIfNotConnected, error)
390
391
392     def resolveAddress(self):
393         """
394         Resolve the name that was passed to this L{_BaseBaseClient}, if
395         necessary, and then move on to attempting the connection once an
396         address has been determined.  (The connection will be attempted
397         immediately within this function if either name resolution can be
398         synchronous or the address was an IP address literal.)
399
400         @note: You don't want to call this method from outside, as it won't do
401             anything useful; it's just part of the connection bootstrapping
402             process.  Also, although this method is on L{_BaseBaseClient} for
403             historical reasons, it's not used anywhere except for L{Client}
404             itself.
405
406         @return: C{None}
407         """
408         if self._requiresResolution:
409             d = self.reactor.resolve(self.addr[0])
410             d.addCallback(lambda n: (n,) + self.addr[1:])
411             d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
412         else:
413             self._setRealAddress(self.addr)
414
415
416     def _setRealAddress(self, address):
417         """
418         Set the resolved address of this L{_BaseBaseClient} and initiate the
419         connection attempt.
420
421         @param address: Depending on whether this is an IPv4 or IPv6 connection
422             attempt, a 2-tuple of C{(host, port)} or a 4-tuple of C{(host,
423             port, flow, scope)}.  At this point it is a fully resolved address,
424             and the 'host' portion will always be an IP address, not a DNS
425             name.
426         """
427         self.realAddress = address
428         self.doConnect()
429
430
431     def failIfNotConnected(self, err):
432         """
433         Generic method called when the attemps to connect failed. It basically
434         cleans everything it can: call connectionFailed, stop read and write,
435         delete socket related members.
436         """
437         if (self.connected or self.disconnected or
438             not hasattr(self, "connector")):
439             return
440
441         self._stopReadingAndWriting()
442         try:
443             self._closeSocket(True)
444         except AttributeError:
445             pass
446         else:
447             self._collectSocketDetails()
448         self.connector.connectionFailed(failure.Failure(err))
449         del self.connector
450
451
452     def stopConnecting(self):
453         """
454         If a connection attempt is still outstanding (i.e.  no connection is
455         yet established), immediately stop attempting to connect.
456         """
457         self.failIfNotConnected(error.UserError())
458
459
460     def connectionLost(self, reason):
461         """
462         Invoked by lower-level logic when it's time to clean the socket up.
463         Depending on the state of the connection, either inform the attached
464         L{Connector} that the connection attempt has failed, or inform the
465         connected L{IProtocol} that the established connection has been lost.
466
467         @param reason: the reason that the connection was terminated
468         @type reason: L{Failure}
469         """
470         if not self.connected:
471             self.failIfNotConnected(error.ConnectError(string=reason))
472         else:
473             self._commonConnection.connectionLost(self, reason)
474             self.connector.connectionLost(reason)
475
476
477
478 class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
479     """
480     A base class for client TCP (and similiar) sockets.
481
482     @ivar realAddress: The address object that will be used for socket.connect;
483         this address is an address tuple (the number of elements dependent upon
484         the address family) which does not contain any names which need to be
485         resolved.
486     @type realAddress: C{tuple}
487
488     @ivar _base: L{Connection}, which is the base class of this class which has
489         all of the useful file descriptor methods.  This is used by
490         L{_TLSServerMixin} to call the right methods to directly manipulate the
491         transport, as is necessary for writing TLS-encrypted bytes (whereas
492         those methods on L{Server} will go through another layer of TLS if it
493         has been enabled).
494     """
495
496     _base = Connection
497     _commonConnection = Connection
498
499     def _stopReadingAndWriting(self):
500         """
501         Implement the POSIX-ish (i.e.
502         L{twisted.internet.interfaces.IReactorFDSet}) method of detaching this
503         socket from the reactor for L{_BaseBaseClient}.
504         """
505         if hasattr(self, "reactor"):
506             # this doesn't happen if we failed in __init__
507             self.stopReading()
508             self.stopWriting()
509
510
511     def _collectSocketDetails(self):
512         """
513         Clean up references to the socket and its file descriptor.
514
515         @see: L{_BaseBaseClient}
516         """
517         del self.socket, self.fileno
518
519
520     def createInternetSocket(self):
521         """(internal) Create a non-blocking socket using
522         self.addressFamily, self.socketType.
523         """
524         s = socket.socket(self.addressFamily, self.socketType)
525         s.setblocking(0)
526         fdesc._setCloseOnExec(s.fileno())
527         return s
528
529
530     def doConnect(self):
531         """
532         Initiate the outgoing connection attempt.
533
534         @note: Applications do not need to call this method; it will be invoked
535             internally as part of L{IReactorTCP.connectTCP}.
536         """
537         self.doWrite = self.doConnect
538         self.doRead = self.doConnect
539         if not hasattr(self, "connector"):
540             # this happens when connection failed but doConnect
541             # was scheduled via a callLater in self._finishInit
542             return
543
544         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
545         if err:
546             self.failIfNotConnected(error.getConnectError((err, strerror(err))))
547             return
548
549         # doConnect gets called twice.  The first time we actually need to
550         # start the connection attempt.  The second time we don't really
551         # want to (SO_ERROR above will have taken care of any errors, and if
552         # it reported none, the mere fact that doConnect was called again is
553         # sufficient to indicate that the connection has succeeded), but it
554         # is not /particularly/ detrimental to do so.  This should get
555         # cleaned up some day, though.
556         try:
557             connectResult = self.socket.connect_ex(self.realAddress)
558         except socket.error, se:
559             connectResult = se.args[0]
560         if connectResult:
561             if connectResult == EISCONN:
562                 pass
563             # on Windows EINVAL means sometimes that we should keep trying:
564             # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
565             elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
566                   (connectResult == EINVAL and platformType == "win32")):
567                 self.startReading()
568                 self.startWriting()
569                 return
570             else:
571                 self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
572                 return
573
574         # If I have reached this point without raising or returning, that means
575         # that the socket is connected.
576         del self.doWrite
577         del self.doRead
578         # we first stop and then start, to reset any references to the old doRead
579         self.stopReading()
580         self.stopWriting()
581         self._connectDone()
582
583
584     def _connectDone(self):
585         """
586         This is a hook for when a connection attempt has succeeded.
587
588         Here, we build the protocol from the
589         L{twisted.internet.protocol.ClientFactory} that was passed in, compute
590         a log string, begin reading so as to send traffic to the newly built
591         protocol, and finally hook up the protocol itself.
592
593         This hook is overridden by L{ssl.Client} to initiate the TLS protocol.
594         """
595         self.protocol = self.connector.buildProtocol(self.getPeer())
596         self.connected = 1
597         logPrefix = self._getLogPrefix(self.protocol)
598         self.logstr = "%s,client" % logPrefix
599         self.startReading()
600         self.protocol.makeConnection(self)
601
602
603
604 _NUMERIC_ONLY = socket.AI_NUMERICHOST | _AI_NUMERICSERV
605
606 def _resolveIPv6(ip, port):
607     """
608     Resolve an IPv6 literal into an IPv6 address.
609
610     This is necessary to resolve any embedded scope identifiers to the relevant
611     C{sin6_scope_id} for use with C{socket.connect()}, C{socket.listen()}, or
612     C{socket.bind()}; see U{RFC 3493 <https://tools.ietf.org/html/rfc3493>} for
613     more information.
614
615     @param ip: An IPv6 address literal.
616     @type ip: C{str}
617
618     @param port: A port number.
619     @type port: C{int}
620
621     @return: a 4-tuple of C{(host, port, flow, scope)}, suitable for use as an
622         IPv6 address.
623
624     @raise socket.gaierror: if either the IP or port is not numeric as it
625         should be.
626     """
627     return socket.getaddrinfo(ip, port, 0, 0, 0, _NUMERIC_ONLY)[0][4]
628
629
630
631 class _BaseTCPClient(object):
632     """
633     Code shared with other (non-POSIX) reactors for management of outgoing TCP
634     connections (both TCPv4 and TCPv6).
635
636     @note: In order to be functional, this class must be mixed into the same
637         hierarchy as L{_BaseBaseClient}.  It would subclass L{_BaseBaseClient}
638         directly, but the class hierarchy here is divided in strange ways out
639         of the need to share code along multiple axes; specifically, with the
640         IOCP reactor and also with UNIX clients in other reactors.
641
642     @ivar _addressType: The Twisted _IPAddress implementation for this client
643     @type _addressType: L{IPv4Address} or L{IPv6Address}
644
645     @ivar connector: The L{Connector} which is driving this L{_BaseTCPClient}'s
646         connection attempt.
647
648     @ivar addr: The address that this socket will be connecting to.
649     @type addr: If IPv4, a 2-C{tuple} of C{(str host, int port)}.  If IPv6, a
650         4-C{tuple} of (C{str host, int port, int ignored, int scope}).
651
652     @ivar createInternetSocket: Subclasses must implement this as a method to
653         create a python socket object of the appropriate address family and
654         socket type.
655     @type createInternetSocket: 0-argument callable returning
656         C{socket._socketobject}.
657     """
658
659     _addressType = address.IPv4Address
660
661     def __init__(self, host, port, bindAddress, connector, reactor=None):
662         # BaseClient.__init__ is invoked later
663         self.connector = connector
664         self.addr = (host, port)
665
666         whenDone = self.resolveAddress
667         err = None
668         skt = None
669
670         if abstract.isIPAddress(host):
671             self._requiresResolution = False
672         elif abstract.isIPv6Address(host):
673             self._requiresResolution = False
674             self.addr = _resolveIPv6(host, port)
675             self.addressFamily = socket.AF_INET6
676             self._addressType = address.IPv6Address
677         else:
678             self._requiresResolution = True
679         try:
680             skt = self.createInternetSocket()
681         except socket.error, se:
682             err = error.ConnectBindError(se.args[0], se.args[1])
683             whenDone = None
684         if whenDone and bindAddress is not None:
685             try:
686                 if abstract.isIPv6Address(bindAddress[0]):
687                     bindinfo = _resolveIPv6(*bindAddress)
688                 else:
689                     bindinfo = bindAddress
690                 skt.bind(bindinfo)
691             except socket.error, se:
692                 err = error.ConnectBindError(se.args[0], se.args[1])
693                 whenDone = None
694         self._finishInit(whenDone, skt, err, reactor)
695
696
697     def getHost(self):
698         """
699         Returns an L{IPv4Address} or L{IPv6Address}.
700
701         This indicates the address from which I am connecting.
702         """
703         return self._addressType('TCP', *self.socket.getsockname()[:2])
704
705
706     def getPeer(self):
707         """
708         Returns an L{IPv4Address} or L{IPv6Address}.
709
710         This indicates the address that I am connected to.
711         """
712         # an ipv6 realAddress has more than two elements, but the IPv6Address
713         # constructor still only takes two.
714         return self._addressType('TCP', *self.realAddress[:2])
715
716
717     def __repr__(self):
718         s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
719         return s
720
721
722
723 class Client(_BaseTCPClient, BaseClient):
724     """
725     A transport for a TCP protocol; either TCPv4 or TCPv6.
726
727     Do not create these directly; use L{IReactorTCP.connectTCP}.
728     """
729
730
731
732 class Server(_TLSServerMixin, Connection):
733     """
734     Serverside socket-stream connection class.
735
736     This is a serverside network connection transport; a socket which came from
737     an accept() on a server.
738
739     @ivar _base: L{Connection}, which is the base class of this class which has
740         all of the useful file descriptor methods.  This is used by
741         L{_TLSServerMixin} to call the right methods to directly manipulate the
742         transport, as is necessary for writing TLS-encrypted bytes (whereas
743         those methods on L{Server} will go through another layer of TLS if it
744         has been enabled).
745     """
746     _base = Connection
747
748     _addressType = address.IPv4Address
749
750     def __init__(self, sock, protocol, client, server, sessionno, reactor):
751         """
752         Server(sock, protocol, client, server, sessionno)
753
754         Initialize it with a socket, a protocol, a descriptor for my peer (a
755         tuple of host, port describing the other end of the connection), an
756         instance of Port, and a session number.
757         """
758         Connection.__init__(self, sock, protocol, reactor)
759         if len(client) != 2:
760             self._addressType = address.IPv6Address
761         self.server = server
762         self.client = client
763         self.sessionno = sessionno
764         self.hostname = client[0]
765
766         logPrefix = self._getLogPrefix(self.protocol)
767         self.logstr = "%s,%s,%s" % (logPrefix,
768                                     sessionno,
769                                     self.hostname)
770         self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
771                                           self.sessionno,
772                                           self.server._realPortNumber)
773         self.startReading()
774         self.connected = 1
775
776     def __repr__(self):
777         """A string representation of this connection.
778         """
779         return self.repstr
780
781
782     def getHost(self):
783         """
784         Returns an L{IPv4Address} or L{IPv6Address}.
785
786         This indicates the server's address.
787         """
788         host, port = self.socket.getsockname()[:2]
789         return self._addressType('TCP', host, port)
790
791
792     def getPeer(self):
793         """
794         Returns an L{IPv4Address} or L{IPv6Address}.
795
796         This indicates the client's address.
797         """
798         return self._addressType('TCP', *self.client[:2])
799
800
801
802 class Port(base.BasePort, _SocketCloser):
803     """
804     A TCP server port, listening for connections.
805
806     When a connection is accepted, this will call a factory's buildProtocol
807     with the incoming address as an argument, according to the specification
808     described in L{twisted.internet.interfaces.IProtocolFactory}.
809
810     If you wish to change the sort of transport that will be used, the
811     C{transport} attribute will be called with the signature expected for
812     C{Server.__init__}, so it can be replaced.
813
814     @ivar deferred: a deferred created when L{stopListening} is called, and
815         that will fire when connection is lost. This is not to be used it
816         directly: prefer the deferred returned by L{stopListening} instead.
817     @type deferred: L{defer.Deferred}
818
819     @ivar disconnecting: flag indicating that the L{stopListening} method has
820         been called and that no connections should be accepted anymore.
821     @type disconnecting: C{bool}
822
823     @ivar connected: flag set once the listen has successfully been called on
824         the socket.
825     @type connected: C{bool}
826
827     @ivar _type: A string describing the connections which will be created by
828         this port.  Normally this is C{"TCP"}, since this is a TCP port, but
829         when the TLS implementation re-uses this class it overrides the value
830         with C{"TLS"}.  Only used for logging.
831
832     @ivar _preexistingSocket: If not C{None}, a L{socket.socket} instance which
833         was created and initialized outside of the reactor and will be used to
834         listen for connections (instead of a new socket being created by this
835         L{Port}).
836     """
837
838     implements(interfaces.IListeningPort)
839
840     socketType = socket.SOCK_STREAM
841
842     transport = Server
843     sessionno = 0
844     interface = ''
845     backlog = 50
846
847     _type = 'TCP'
848
849     # Actual port number being listened on, only set to a non-None
850     # value when we are actually listening.
851     _realPortNumber = None
852
853     # An externally initialized socket that we will use, rather than creating
854     # our own.
855     _preexistingSocket = None
856
857     addressFamily = socket.AF_INET
858     _addressType = address.IPv4Address
859
860     def __init__(self, port, factory, backlog=50, interface='', reactor=None):
861         """Initialize with a numeric port to listen on.
862         """
863         base.BasePort.__init__(self, reactor=reactor)
864         self.port = port
865         self.factory = factory
866         self.backlog = backlog
867         if abstract.isIPv6Address(interface):
868             self.addressFamily = socket.AF_INET6
869             self._addressType = address.IPv6Address
870         self.interface = interface
871
872
873     @classmethod
874     def _fromListeningDescriptor(cls, reactor, fd, addressFamily, factory):
875         """
876         Create a new L{Port} based on an existing listening I{SOCK_STREAM}
877         I{AF_INET} socket.
878
879         Arguments are the same as to L{Port.__init__}, except where noted.
880
881         @param fd: An integer file descriptor associated with a listening
882             socket.  The socket must be in non-blocking mode.  Any additional
883             attributes desired, such as I{FD_CLOEXEC}, must also be set already.
884
885         @param addressFamily: The address family (sometimes called I{domain}) of
886             the existing socket.  For example, L{socket.AF_INET}.
887
888         @return: A new instance of C{cls} wrapping the socket given by C{fd}.
889         """
890         port = socket.fromfd(fd, addressFamily, cls.socketType)
891         interface = port.getsockname()[0]
892         self = cls(None, factory, None, interface, reactor)
893         self._preexistingSocket = port
894         return self
895
896
897     def __repr__(self):
898         if self._realPortNumber is not None:
899             return "<%s of %s on %s>" % (self.__class__,
900                 self.factory.__class__, self._realPortNumber)
901         else:
902             return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
903
904     def createInternetSocket(self):
905         s = base.BasePort.createInternetSocket(self)
906         if platformType == "posix" and sys.platform != "cygwin":
907             s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
908         return s
909
910
911     def startListening(self):
912         """Create and bind my socket, and begin listening on it.
913
914         This is called on unserialization, and must be called after creating a
915         server to begin listening on the specified port.
916         """
917         if self._preexistingSocket is None:
918             # Create a new socket and make it listen
919             try:
920                 skt = self.createInternetSocket()
921                 if self.addressFamily == socket.AF_INET6:
922                     addr = _resolveIPv6(self.interface, self.port)
923                 else:
924                     addr = (self.interface, self.port)
925                 skt.bind(addr)
926             except socket.error, le:
927                 raise CannotListenError, (self.interface, self.port, le)
928             skt.listen(self.backlog)
929         else:
930             # Re-use the externally specified socket
931             skt = self._preexistingSocket
932             self._preexistingSocket = None
933             # Avoid shutting it down at the end.
934             self._socketShutdownMethod = None
935
936         # Make sure that if we listened on port 0, we update that to
937         # reflect what the OS actually assigned us.
938         self._realPortNumber = skt.getsockname()[1]
939
940         log.msg("%s starting on %s" % (
941                 self._getLogPrefix(self.factory), self._realPortNumber))
942
943         # The order of the next 5 lines is kind of bizarre.  If no one
944         # can explain it, perhaps we should re-arrange them.
945         self.factory.doStart()
946         self.connected = True
947         self.socket = skt
948         self.fileno = self.socket.fileno
949         self.numberAccepts = 100
950
951         self.startReading()
952
953
954     def _buildAddr(self, address):
955         host, port = address[:2]
956         return self._addressType('TCP', host, port)
957
958
959     def doRead(self):
960         """Called when my socket is ready for reading.
961
962         This accepts a connection and calls self.protocol() to handle the
963         wire-level protocol.
964         """
965         try:
966             if platformType == "posix":
967                 numAccepts = self.numberAccepts
968             else:
969                 # win32 event loop breaks if we do more than one accept()
970                 # in an iteration of the event loop.
971                 numAccepts = 1
972             for i in range(numAccepts):
973                 # we need this so we can deal with a factory's buildProtocol
974                 # calling our loseConnection
975                 if self.disconnecting:
976                     return
977                 try:
978                     skt, addr = self.socket.accept()
979                 except socket.error, e:
980                     if e.args[0] in (EWOULDBLOCK, EAGAIN):
981                         self.numberAccepts = i
982                         break
983                     elif e.args[0] == EPERM:
984                         # Netfilter on Linux may have rejected the
985                         # connection, but we get told to try to accept()
986                         # anyway.
987                         continue
988                     elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
989
990                         # Linux gives EMFILE when a process is not allowed
991                         # to allocate any more file descriptors.  *BSD and
992                         # Win32 give (WSA)ENOBUFS.  Linux can also give
993                         # ENFILE if the system is out of inodes, or ENOMEM
994                         # if there is insufficient memory to allocate a new
995                         # dentry.  ECONNABORTED is documented as possible on
996                         # both Linux and Windows, but it is not clear
997                         # whether there are actually any circumstances under
998                         # which it can happen (one might expect it to be
999                         # possible if a client sends a FIN or RST after the
1000                         # server sends a SYN|ACK but before application code
1001                         # calls accept(2), however at least on Linux this
1002                         # _seems_ to be short-circuited by syncookies.
1003
1004                         log.msg("Could not accept new connection (%s)" % (
1005                             errorcode[e.args[0]],))
1006                         break
1007                     raise
1008
1009                 fdesc._setCloseOnExec(skt.fileno())
1010                 protocol = self.factory.buildProtocol(self._buildAddr(addr))
1011                 if protocol is None:
1012                     skt.close()
1013                     continue
1014                 s = self.sessionno
1015                 self.sessionno = s+1
1016                 transport = self.transport(skt, protocol, addr, self, s, self.reactor)
1017                 protocol.makeConnection(transport)
1018             else:
1019                 self.numberAccepts = self.numberAccepts+20
1020         except:
1021             # Note that in TLS mode, this will possibly catch SSL.Errors
1022             # raised by self.socket.accept()
1023             #
1024             # There is no "except SSL.Error:" above because SSL may be
1025             # None if there is no SSL support.  In any case, all the
1026             # "except SSL.Error:" suite would probably do is log.deferr()
1027             # and return, so handling it here works just as well.
1028             log.deferr()
1029
1030     def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
1031         """
1032         Stop accepting connections on this port.
1033
1034         This will shut down the socket and call self.connectionLost().  It
1035         returns a deferred which will fire successfully when the port is
1036         actually closed, or with a failure if an error occurs shutting down.
1037         """
1038         self.disconnecting = True
1039         self.stopReading()
1040         if self.connected:
1041             self.deferred = deferLater(
1042                 self.reactor, 0, self.connectionLost, connDone)
1043             return self.deferred
1044
1045     stopListening = loseConnection
1046
1047     def _logConnectionLostMsg(self):
1048         """
1049         Log message for closing port
1050         """
1051         log.msg('(%s Port %s Closed)' % (self._type, self._realPortNumber))
1052
1053
1054     def connectionLost(self, reason):
1055         """
1056         Cleans up the socket.
1057         """
1058         self._logConnectionLostMsg()
1059         self._realPortNumber = None
1060
1061         base.BasePort.connectionLost(self, reason)
1062         self.connected = False
1063         self._closeSocket(True)
1064         del self.socket
1065         del self.fileno
1066
1067         try:
1068             self.factory.doStop()
1069         finally:
1070             self.disconnecting = False
1071
1072
1073     def logPrefix(self):
1074         """Returns the name of my class, to prefix log entries with.
1075         """
1076         return reflect.qual(self.factory.__class__)
1077
1078
1079     def getHost(self):
1080         """
1081         Return an L{IPv4Address} or L{IPv6Address} indicating the listening
1082         address of this port.
1083         """
1084         host, port = self.socket.getsockname()[:2]
1085         return self._addressType('TCP', host, port)
1086
1087
1088
1089 class Connector(base.BaseConnector):
1090     """
1091     A L{Connector} provides of L{twisted.internet.interfaces.IConnector} for
1092     all POSIX-style reactors.
1093
1094     @ivar _addressType: the type returned by L{Connector.getDestination}.
1095         Either L{IPv4Address} or L{IPv6Address}, depending on the type of
1096         address.
1097     @type _addressType: C{type}
1098     """
1099     _addressType = address.IPv4Address
1100
1101     def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
1102         if isinstance(port, types.StringTypes):
1103             try:
1104                 port = socket.getservbyname(port, 'tcp')
1105             except socket.error, e:
1106                 raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
1107         self.host, self.port = host, port
1108         if abstract.isIPv6Address(host):
1109             self._addressType = address.IPv6Address
1110         self.bindAddress = bindAddress
1111         base.BaseConnector.__init__(self, factory, timeout, reactor)
1112
1113
1114     def _makeTransport(self):
1115         """
1116         Create a L{Client} bound to this L{Connector}.
1117
1118         @return: a new L{Client}
1119         @rtype: L{Client}
1120         """
1121         return Client(self.host, self.port, self.bindAddress, self, self.reactor)
1122
1123
1124     def getDestination(self):
1125         """
1126         @see: L{twisted.internet.interfaces.IConnector.getDestination}.
1127         """
1128         return self._addressType('TCP', self.host, self.port)
1129
1130