1 # -*- test-case-name: twisted.test.test_tcp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Various asynchronous TCP/IP classes.
8 End users shouldn't use this module directly - use the reactor APIs instead.
19 from zope.interface import implements
21 from twisted.python.runtime import platformType
22 from twisted.python import versions, deprecate
25 # Try to get the memory BIO based startTLS implementation, available since
27 from twisted.internet._newtls import (
28 ConnectionMixin as _TLSConnectionMixin,
29 ClientMixin as _TLSClientMixin,
30 ServerMixin as _TLSServerMixin)
33 # Try to get the socket BIO based startTLS implementation, available in
34 # all pyOpenSSL versions
35 from twisted.internet._oldtls import (
36 ConnectionMixin as _TLSConnectionMixin,
37 ClientMixin as _TLSClientMixin,
38 ServerMixin as _TLSServerMixin)
40 # There is no version of startTLS available
41 class _TLSConnectionMixin(object):
43 class _TLSClientMixin(object):
45 class _TLSServerMixin(object):
48 if platformType == 'win32':
49 # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
51 from errno import WSAEINVAL as EINVAL
52 from errno import WSAEWOULDBLOCK as EWOULDBLOCK
53 from errno import WSAEINPROGRESS as EINPROGRESS
54 from errno import WSAEALREADY as EALREADY
55 from errno import WSAECONNRESET as ECONNRESET
56 from errno import WSAEISCONN as EISCONN
57 from errno import WSAENOTCONN as ENOTCONN
58 from errno import WSAEINTR as EINTR
59 from errno import WSAENOBUFS as ENOBUFS
60 from errno import WSAEMFILE as EMFILE
61 # No such thing as WSAENFILE, either.
66 from errno import WSAECONNRESET as ECONNABORTED
68 from twisted.python.win32 import formatError as strerror
70 from errno import EPERM
71 from errno import EINVAL
72 from errno import EWOULDBLOCK
73 from errno import EINPROGRESS
74 from errno import EALREADY
75 from errno import ECONNRESET
76 from errno import EISCONN
77 from errno import ENOTCONN
78 from errno import EINTR
79 from errno import ENOBUFS
80 from errno import EMFILE
81 from errno import ENFILE
82 from errno import ENOMEM
83 from errno import EAGAIN
84 from errno import ECONNABORTED
86 from os import strerror
89 from errno import errorcode
92 from twisted.internet import base, address, fdesc
93 from twisted.internet.task import deferLater
94 from twisted.python import log, failure, reflect
95 from twisted.python.util import unsignedID, untilConcludes
96 from twisted.internet.error import CannotListenError
97 from twisted.internet import abstract, main, interfaces, error
99 # Not all platforms have, or support, this flag.
100 _AI_NUMERICSERV = getattr(socket, "AI_NUMERICSERV", 0)
104 class _SocketCloser(object):
105 _socketShutdownMethod = 'shutdown'
107 def _closeSocket(self, orderly):
108 # The call to shutdown() before close() isn't really necessary, because
109 # we set FD_CLOEXEC now, which will ensure this is the only process
110 # holding the FD, thus ensuring close() really will shutdown the TCP
111 # socket. However, do it anyways, just to be safe.
115 if self._socketShutdownMethod is not None:
116 getattr(skt, self._socketShutdownMethod)(2)
118 # Set SO_LINGER to 1,0 which, by convention, causes a
119 # connection reset to be sent when close is called,
120 # instead of the standard FIN shutdown sequence.
121 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
122 struct.pack("ii", 1, 0))
133 class _AbortingMixin(object):
135 Common implementation of C{abortConnection}.
137 @ivar _aborting: Set to C{True} when C{abortConnection} is called.
138 @type _aborting: C{bool}
142 def abortConnection(self):
144 Aborts the connection immediately, dropping any buffered data.
148 if self.disconnected or self._aborting:
150 self._aborting = True
153 self.doRead = lambda *args, **kwargs: None
154 self.doWrite = lambda *args, **kwargs: None
155 self.reactor.callLater(0, self.connectionLost,
156 failure.Failure(error.ConnectionAborted()))
160 class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
163 Superclass of all socket-based FileDescriptors.
165 This is an abstract superclass of all objects which represent a TCP/IP
166 connection based socket.
168 @ivar logstr: prefix used when logging events related to this connection.
171 implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
174 def __init__(self, skt, protocol, reactor=None):
175 abstract.FileDescriptor.__init__(self, reactor=reactor)
177 self.socket.setblocking(0)
178 self.fileno = skt.fileno
179 self.protocol = protocol
183 """Return the socket for this connection."""
188 """Calls self.protocol.dataReceived with all available data.
190 This reads up to self.bufferSize bytes of data from its socket, then
191 calls self.dataReceived(data) to process it. If the connection is not
192 lost through an error in the physical recv(), this function will return
193 the result of the dataReceived call.
196 data = self.socket.recv(self.bufferSize)
197 except socket.error, se:
198 if se.args[0] == EWOULDBLOCK:
201 return main.CONNECTION_LOST
203 return self._dataReceived(data)
206 def _dataReceived(self, data):
208 return main.CONNECTION_DONE
209 rval = self.protocol.dataReceived(data)
211 offender = self.protocol.dataReceived
213 'Returning a value other than None from %(fqpn)s is '
214 'deprecated since %(version)s.')
215 warningString = deprecate.getDeprecationWarningString(
216 offender, versions.Version('Twisted', 11, 0, 0),
217 format=warningFormat)
218 deprecate.warnAboutFunction(offender, warningString)
222 def writeSomeData(self, data):
224 Write as much as possible of the given data to this TCP connection.
226 This sends up to C{self.SEND_LIMIT} bytes from C{data}. If the
227 connection is lost, an exception is returned. Otherwise, the number
228 of bytes successfully written is returned.
230 # Limit length of buffer to try to send, because some OSes are too
231 # stupid to do so themselves (ahem windows)
232 limitedData = buffer(data, 0, self.SEND_LIMIT)
235 return untilConcludes(self.socket.send, limitedData)
236 except socket.error, se:
237 if se.args[0] in (EWOULDBLOCK, ENOBUFS):
240 return main.CONNECTION_LOST
243 def _closeWriteConnection(self):
245 getattr(self.socket, self._socketShutdownMethod)(1)
248 p = interfaces.IHalfCloseableProtocol(self.protocol, None)
251 p.writeConnectionLost()
253 f = failure.Failure()
255 self.connectionLost(f)
258 def readConnectionLost(self, reason):
259 p = interfaces.IHalfCloseableProtocol(self.protocol, None)
262 p.readConnectionLost()
265 self.connectionLost(failure.Failure())
267 self.connectionLost(reason)
271 def connectionLost(self, reason):
272 """See abstract.FileDescriptor.connectionLost().
274 # Make sure we're not called twice, which can happen e.g. if
275 # abortConnection() is called from protocol's dataReceived and then
276 # code immediately after throws an exception that reaches the
277 # reactor. We can't rely on "disconnected" attribute for this check
278 # since twisted.internet._oldtls does evil things to it:
279 if not hasattr(self, "socket"):
281 abstract.FileDescriptor.connectionLost(self, reason)
282 self._closeSocket(not reason.check(error.ConnectionAborted))
283 protocol = self.protocol
287 protocol.connectionLost(reason)
290 logstr = "Uninitialized"
293 """Return the prefix to log with when I own the logging thread.
297 def getTcpNoDelay(self):
298 return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
300 def setTcpNoDelay(self, enabled):
301 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
303 def getTcpKeepAlive(self):
304 return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
305 socket.SO_KEEPALIVE))
307 def setTcpKeepAlive(self, enabled):
308 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
313 class _BaseBaseClient(object):
315 Code shared with other (non-POSIX) reactors for management of general
316 outgoing connections.
318 Requirements upon subclasses are documented as instance variables rather
319 than abstract methods, in order to avoid MRO confusion, since this base is
320 mixed in to unfortunately weird and distinctive multiple-inheritance
321 hierarchies and many of these attributes are provided by peer classes
322 rather than descendant classes in those hierarchies.
324 @ivar addressFamily: The address family constant (C{socket.AF_INET},
325 C{socket.AF_INET6}, C{socket.AF_UNIX}) of the underlying socket of this
327 @type addressFamily: C{int}
329 @ivar socketType: The socket type constant (C{socket.SOCK_STREAM} or
330 C{socket.SOCK_DGRAM}) of the underlying socket.
331 @type socketType: C{int}
333 @ivar _requiresResolution: A flag indicating whether the address of this
334 client will require name resolution. C{True} if the hostname of said
335 address indicates a name that must be resolved by hostname lookup,
336 C{False} if it indicates an IP address literal.
337 @type _requiresResolution: C{bool}
339 @cvar _commonConnection: Subclasses must provide this attribute, which
340 indicates the L{Connection}-alike class to invoke C{__init__} and
341 C{connectionLost} on.
342 @type _commonConnection: C{type}
344 @ivar _stopReadingAndWriting: Subclasses must implement in order to remove
345 this transport from its reactor's notifications in response to a
346 terminated connection attempt.
347 @type _stopReadingAndWriting: 0-argument callable returning C{None}
349 @ivar _closeSocket: Subclasses must implement in order to close the socket
350 in response to a terminated connection attempt.
351 @type _closeSocket: 1-argument callable; see L{_SocketCloser._closeSocket}
353 @ivar _collectSocketDetails: Clean up references to the attached socket in
354 its underlying OS resource (such as a file descriptor or file handle),
355 as part of post connection-failure cleanup.
356 @type _collectSocketDetails: 0-argument callable returning C{None}.
358 @ivar reactor: The class pointed to by C{_commonConnection} should set this
359 attribute in its constructor.
360 @type reactor: L{twisted.internet.interfaces.IReactorTime},
361 L{twisted.internet.interfaces.IReactorCore},
362 L{twisted.internet.interfaces.IReactorFDSet}
365 addressFamily = socket.AF_INET
366 socketType = socket.SOCK_STREAM
368 def _finishInit(self, whenDone, skt, error, reactor):
370 Called by subclasses to continue to the stage of initialization where
371 the socket connect attempt is made.
373 @param whenDone: A 0-argument callable to invoke once the connection is
374 set up. This is C{None} if the connection could not be prepared
375 due to a previous error.
377 @param skt: The socket object to use to perform the connection.
378 @type skt: C{socket._socketobject}
380 @param error: The error to fail the connection with.
382 @param reactor: The reactor to use for this client.
383 @type reactor: L{twisted.internet.interfaces.IReactorTime}
386 self._commonConnection.__init__(self, skt, None, reactor)
387 reactor.callLater(0, whenDone)
389 reactor.callLater(0, self.failIfNotConnected, error)
392 def resolveAddress(self):
394 Resolve the name that was passed to this L{_BaseBaseClient}, if
395 necessary, and then move on to attempting the connection once an
396 address has been determined. (The connection will be attempted
397 immediately within this function if either name resolution can be
398 synchronous or the address was an IP address literal.)
400 @note: You don't want to call this method from outside, as it won't do
401 anything useful; it's just part of the connection bootstrapping
402 process. Also, although this method is on L{_BaseBaseClient} for
403 historical reasons, it's not used anywhere except for L{Client}
408 if self._requiresResolution:
409 d = self.reactor.resolve(self.addr[0])
410 d.addCallback(lambda n: (n,) + self.addr[1:])
411 d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
413 self._setRealAddress(self.addr)
416 def _setRealAddress(self, address):
418 Set the resolved address of this L{_BaseBaseClient} and initiate the
421 @param address: Depending on whether this is an IPv4 or IPv6 connection
422 attempt, a 2-tuple of C{(host, port)} or a 4-tuple of C{(host,
423 port, flow, scope)}. At this point it is a fully resolved address,
424 and the 'host' portion will always be an IP address, not a DNS
427 self.realAddress = address
431 def failIfNotConnected(self, err):
433 Generic method called when the attemps to connect failed. It basically
434 cleans everything it can: call connectionFailed, stop read and write,
435 delete socket related members.
437 if (self.connected or self.disconnected or
438 not hasattr(self, "connector")):
441 self._stopReadingAndWriting()
443 self._closeSocket(True)
444 except AttributeError:
447 self._collectSocketDetails()
448 self.connector.connectionFailed(failure.Failure(err))
452 def stopConnecting(self):
454 If a connection attempt is still outstanding (i.e. no connection is
455 yet established), immediately stop attempting to connect.
457 self.failIfNotConnected(error.UserError())
460 def connectionLost(self, reason):
462 Invoked by lower-level logic when it's time to clean the socket up.
463 Depending on the state of the connection, either inform the attached
464 L{Connector} that the connection attempt has failed, or inform the
465 connected L{IProtocol} that the established connection has been lost.
467 @param reason: the reason that the connection was terminated
468 @type reason: L{Failure}
470 if not self.connected:
471 self.failIfNotConnected(error.ConnectError(string=reason))
473 self._commonConnection.connectionLost(self, reason)
474 self.connector.connectionLost(reason)
478 class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
480 A base class for client TCP (and similiar) sockets.
482 @ivar realAddress: The address object that will be used for socket.connect;
483 this address is an address tuple (the number of elements dependent upon
484 the address family) which does not contain any names which need to be
486 @type realAddress: C{tuple}
488 @ivar _base: L{Connection}, which is the base class of this class which has
489 all of the useful file descriptor methods. This is used by
490 L{_TLSServerMixin} to call the right methods to directly manipulate the
491 transport, as is necessary for writing TLS-encrypted bytes (whereas
492 those methods on L{Server} will go through another layer of TLS if it
497 _commonConnection = Connection
499 def _stopReadingAndWriting(self):
501 Implement the POSIX-ish (i.e.
502 L{twisted.internet.interfaces.IReactorFDSet}) method of detaching this
503 socket from the reactor for L{_BaseBaseClient}.
505 if hasattr(self, "reactor"):
506 # this doesn't happen if we failed in __init__
511 def _collectSocketDetails(self):
513 Clean up references to the socket and its file descriptor.
515 @see: L{_BaseBaseClient}
517 del self.socket, self.fileno
520 def createInternetSocket(self):
521 """(internal) Create a non-blocking socket using
522 self.addressFamily, self.socketType.
524 s = socket.socket(self.addressFamily, self.socketType)
526 fdesc._setCloseOnExec(s.fileno())
532 Initiate the outgoing connection attempt.
534 @note: Applications do not need to call this method; it will be invoked
535 internally as part of L{IReactorTCP.connectTCP}.
537 self.doWrite = self.doConnect
538 self.doRead = self.doConnect
539 if not hasattr(self, "connector"):
540 # this happens when connection failed but doConnect
541 # was scheduled via a callLater in self._finishInit
544 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
546 self.failIfNotConnected(error.getConnectError((err, strerror(err))))
549 # doConnect gets called twice. The first time we actually need to
550 # start the connection attempt. The second time we don't really
551 # want to (SO_ERROR above will have taken care of any errors, and if
552 # it reported none, the mere fact that doConnect was called again is
553 # sufficient to indicate that the connection has succeeded), but it
554 # is not /particularly/ detrimental to do so. This should get
555 # cleaned up some day, though.
557 connectResult = self.socket.connect_ex(self.realAddress)
558 except socket.error, se:
559 connectResult = se.args[0]
561 if connectResult == EISCONN:
563 # on Windows EINVAL means sometimes that we should keep trying:
564 # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
565 elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
566 (connectResult == EINVAL and platformType == "win32")):
571 self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
574 # If I have reached this point without raising or returning, that means
575 # that the socket is connected.
578 # we first stop and then start, to reset any references to the old doRead
584 def _connectDone(self):
586 This is a hook for when a connection attempt has succeeded.
588 Here, we build the protocol from the
589 L{twisted.internet.protocol.ClientFactory} that was passed in, compute
590 a log string, begin reading so as to send traffic to the newly built
591 protocol, and finally hook up the protocol itself.
593 This hook is overridden by L{ssl.Client} to initiate the TLS protocol.
595 self.protocol = self.connector.buildProtocol(self.getPeer())
597 logPrefix = self._getLogPrefix(self.protocol)
598 self.logstr = "%s,client" % logPrefix
600 self.protocol.makeConnection(self)
604 _NUMERIC_ONLY = socket.AI_NUMERICHOST | _AI_NUMERICSERV
606 def _resolveIPv6(ip, port):
608 Resolve an IPv6 literal into an IPv6 address.
610 This is necessary to resolve any embedded scope identifiers to the relevant
611 C{sin6_scope_id} for use with C{socket.connect()}, C{socket.listen()}, or
612 C{socket.bind()}; see U{RFC 3493 <https://tools.ietf.org/html/rfc3493>} for
615 @param ip: An IPv6 address literal.
618 @param port: A port number.
621 @return: a 4-tuple of C{(host, port, flow, scope)}, suitable for use as an
624 @raise socket.gaierror: if either the IP or port is not numeric as it
627 return socket.getaddrinfo(ip, port, 0, 0, 0, _NUMERIC_ONLY)[0][4]
631 class _BaseTCPClient(object):
633 Code shared with other (non-POSIX) reactors for management of outgoing TCP
634 connections (both TCPv4 and TCPv6).
636 @note: In order to be functional, this class must be mixed into the same
637 hierarchy as L{_BaseBaseClient}. It would subclass L{_BaseBaseClient}
638 directly, but the class hierarchy here is divided in strange ways out
639 of the need to share code along multiple axes; specifically, with the
640 IOCP reactor and also with UNIX clients in other reactors.
642 @ivar _addressType: The Twisted _IPAddress implementation for this client
643 @type _addressType: L{IPv4Address} or L{IPv6Address}
645 @ivar connector: The L{Connector} which is driving this L{_BaseTCPClient}'s
648 @ivar addr: The address that this socket will be connecting to.
649 @type addr: If IPv4, a 2-C{tuple} of C{(str host, int port)}. If IPv6, a
650 4-C{tuple} of (C{str host, int port, int ignored, int scope}).
652 @ivar createInternetSocket: Subclasses must implement this as a method to
653 create a python socket object of the appropriate address family and
655 @type createInternetSocket: 0-argument callable returning
656 C{socket._socketobject}.
659 _addressType = address.IPv4Address
661 def __init__(self, host, port, bindAddress, connector, reactor=None):
662 # BaseClient.__init__ is invoked later
663 self.connector = connector
664 self.addr = (host, port)
666 whenDone = self.resolveAddress
670 if abstract.isIPAddress(host):
671 self._requiresResolution = False
672 elif abstract.isIPv6Address(host):
673 self._requiresResolution = False
674 self.addr = _resolveIPv6(host, port)
675 self.addressFamily = socket.AF_INET6
676 self._addressType = address.IPv6Address
678 self._requiresResolution = True
680 skt = self.createInternetSocket()
681 except socket.error, se:
682 err = error.ConnectBindError(se.args[0], se.args[1])
684 if whenDone and bindAddress is not None:
686 if abstract.isIPv6Address(bindAddress[0]):
687 bindinfo = _resolveIPv6(*bindAddress)
689 bindinfo = bindAddress
691 except socket.error, se:
692 err = error.ConnectBindError(se.args[0], se.args[1])
694 self._finishInit(whenDone, skt, err, reactor)
699 Returns an L{IPv4Address} or L{IPv6Address}.
701 This indicates the address from which I am connecting.
703 return self._addressType('TCP', *self.socket.getsockname()[:2])
708 Returns an L{IPv4Address} or L{IPv6Address}.
710 This indicates the address that I am connected to.
712 # an ipv6 realAddress has more than two elements, but the IPv6Address
713 # constructor still only takes two.
714 return self._addressType('TCP', *self.realAddress[:2])
718 s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
723 class Client(_BaseTCPClient, BaseClient):
725 A transport for a TCP protocol; either TCPv4 or TCPv6.
727 Do not create these directly; use L{IReactorTCP.connectTCP}.
732 class Server(_TLSServerMixin, Connection):
734 Serverside socket-stream connection class.
736 This is a serverside network connection transport; a socket which came from
737 an accept() on a server.
739 @ivar _base: L{Connection}, which is the base class of this class which has
740 all of the useful file descriptor methods. This is used by
741 L{_TLSServerMixin} to call the right methods to directly manipulate the
742 transport, as is necessary for writing TLS-encrypted bytes (whereas
743 those methods on L{Server} will go through another layer of TLS if it
748 _addressType = address.IPv4Address
750 def __init__(self, sock, protocol, client, server, sessionno, reactor):
752 Server(sock, protocol, client, server, sessionno)
754 Initialize it with a socket, a protocol, a descriptor for my peer (a
755 tuple of host, port describing the other end of the connection), an
756 instance of Port, and a session number.
758 Connection.__init__(self, sock, protocol, reactor)
760 self._addressType = address.IPv6Address
763 self.sessionno = sessionno
764 self.hostname = client[0]
766 logPrefix = self._getLogPrefix(self.protocol)
767 self.logstr = "%s,%s,%s" % (logPrefix,
770 self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
772 self.server._realPortNumber)
777 """A string representation of this connection.
784 Returns an L{IPv4Address} or L{IPv6Address}.
786 This indicates the server's address.
788 host, port = self.socket.getsockname()[:2]
789 return self._addressType('TCP', host, port)
794 Returns an L{IPv4Address} or L{IPv6Address}.
796 This indicates the client's address.
798 return self._addressType('TCP', *self.client[:2])
802 class Port(base.BasePort, _SocketCloser):
804 A TCP server port, listening for connections.
806 When a connection is accepted, this will call a factory's buildProtocol
807 with the incoming address as an argument, according to the specification
808 described in L{twisted.internet.interfaces.IProtocolFactory}.
810 If you wish to change the sort of transport that will be used, the
811 C{transport} attribute will be called with the signature expected for
812 C{Server.__init__}, so it can be replaced.
814 @ivar deferred: a deferred created when L{stopListening} is called, and
815 that will fire when connection is lost. This is not to be used it
816 directly: prefer the deferred returned by L{stopListening} instead.
817 @type deferred: L{defer.Deferred}
819 @ivar disconnecting: flag indicating that the L{stopListening} method has
820 been called and that no connections should be accepted anymore.
821 @type disconnecting: C{bool}
823 @ivar connected: flag set once the listen has successfully been called on
825 @type connected: C{bool}
827 @ivar _type: A string describing the connections which will be created by
828 this port. Normally this is C{"TCP"}, since this is a TCP port, but
829 when the TLS implementation re-uses this class it overrides the value
830 with C{"TLS"}. Only used for logging.
832 @ivar _preexistingSocket: If not C{None}, a L{socket.socket} instance which
833 was created and initialized outside of the reactor and will be used to
834 listen for connections (instead of a new socket being created by this
838 implements(interfaces.IListeningPort)
840 socketType = socket.SOCK_STREAM
849 # Actual port number being listened on, only set to a non-None
850 # value when we are actually listening.
851 _realPortNumber = None
853 # An externally initialized socket that we will use, rather than creating
855 _preexistingSocket = None
857 addressFamily = socket.AF_INET
858 _addressType = address.IPv4Address
860 def __init__(self, port, factory, backlog=50, interface='', reactor=None):
861 """Initialize with a numeric port to listen on.
863 base.BasePort.__init__(self, reactor=reactor)
865 self.factory = factory
866 self.backlog = backlog
867 if abstract.isIPv6Address(interface):
868 self.addressFamily = socket.AF_INET6
869 self._addressType = address.IPv6Address
870 self.interface = interface
874 def _fromListeningDescriptor(cls, reactor, fd, addressFamily, factory):
876 Create a new L{Port} based on an existing listening I{SOCK_STREAM}
879 Arguments are the same as to L{Port.__init__}, except where noted.
881 @param fd: An integer file descriptor associated with a listening
882 socket. The socket must be in non-blocking mode. Any additional
883 attributes desired, such as I{FD_CLOEXEC}, must also be set already.
885 @param addressFamily: The address family (sometimes called I{domain}) of
886 the existing socket. For example, L{socket.AF_INET}.
888 @return: A new instance of C{cls} wrapping the socket given by C{fd}.
890 port = socket.fromfd(fd, addressFamily, cls.socketType)
891 interface = port.getsockname()[0]
892 self = cls(None, factory, None, interface, reactor)
893 self._preexistingSocket = port
898 if self._realPortNumber is not None:
899 return "<%s of %s on %s>" % (self.__class__,
900 self.factory.__class__, self._realPortNumber)
902 return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
904 def createInternetSocket(self):
905 s = base.BasePort.createInternetSocket(self)
906 if platformType == "posix" and sys.platform != "cygwin":
907 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
911 def startListening(self):
912 """Create and bind my socket, and begin listening on it.
914 This is called on unserialization, and must be called after creating a
915 server to begin listening on the specified port.
917 if self._preexistingSocket is None:
918 # Create a new socket and make it listen
920 skt = self.createInternetSocket()
921 if self.addressFamily == socket.AF_INET6:
922 addr = _resolveIPv6(self.interface, self.port)
924 addr = (self.interface, self.port)
926 except socket.error, le:
927 raise CannotListenError, (self.interface, self.port, le)
928 skt.listen(self.backlog)
930 # Re-use the externally specified socket
931 skt = self._preexistingSocket
932 self._preexistingSocket = None
933 # Avoid shutting it down at the end.
934 self._socketShutdownMethod = None
936 # Make sure that if we listened on port 0, we update that to
937 # reflect what the OS actually assigned us.
938 self._realPortNumber = skt.getsockname()[1]
940 log.msg("%s starting on %s" % (
941 self._getLogPrefix(self.factory), self._realPortNumber))
943 # The order of the next 5 lines is kind of bizarre. If no one
944 # can explain it, perhaps we should re-arrange them.
945 self.factory.doStart()
946 self.connected = True
948 self.fileno = self.socket.fileno
949 self.numberAccepts = 100
954 def _buildAddr(self, address):
955 host, port = address[:2]
956 return self._addressType('TCP', host, port)
960 """Called when my socket is ready for reading.
962 This accepts a connection and calls self.protocol() to handle the
966 if platformType == "posix":
967 numAccepts = self.numberAccepts
969 # win32 event loop breaks if we do more than one accept()
970 # in an iteration of the event loop.
972 for i in range(numAccepts):
973 # we need this so we can deal with a factory's buildProtocol
974 # calling our loseConnection
975 if self.disconnecting:
978 skt, addr = self.socket.accept()
979 except socket.error, e:
980 if e.args[0] in (EWOULDBLOCK, EAGAIN):
981 self.numberAccepts = i
983 elif e.args[0] == EPERM:
984 # Netfilter on Linux may have rejected the
985 # connection, but we get told to try to accept()
988 elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
990 # Linux gives EMFILE when a process is not allowed
991 # to allocate any more file descriptors. *BSD and
992 # Win32 give (WSA)ENOBUFS. Linux can also give
993 # ENFILE if the system is out of inodes, or ENOMEM
994 # if there is insufficient memory to allocate a new
995 # dentry. ECONNABORTED is documented as possible on
996 # both Linux and Windows, but it is not clear
997 # whether there are actually any circumstances under
998 # which it can happen (one might expect it to be
999 # possible if a client sends a FIN or RST after the
1000 # server sends a SYN|ACK but before application code
1001 # calls accept(2), however at least on Linux this
1002 # _seems_ to be short-circuited by syncookies.
1004 log.msg("Could not accept new connection (%s)" % (
1005 errorcode[e.args[0]],))
1009 fdesc._setCloseOnExec(skt.fileno())
1010 protocol = self.factory.buildProtocol(self._buildAddr(addr))
1011 if protocol is None:
1015 self.sessionno = s+1
1016 transport = self.transport(skt, protocol, addr, self, s, self.reactor)
1017 protocol.makeConnection(transport)
1019 self.numberAccepts = self.numberAccepts+20
1021 # Note that in TLS mode, this will possibly catch SSL.Errors
1022 # raised by self.socket.accept()
1024 # There is no "except SSL.Error:" above because SSL may be
1025 # None if there is no SSL support. In any case, all the
1026 # "except SSL.Error:" suite would probably do is log.deferr()
1027 # and return, so handling it here works just as well.
1030 def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
1032 Stop accepting connections on this port.
1034 This will shut down the socket and call self.connectionLost(). It
1035 returns a deferred which will fire successfully when the port is
1036 actually closed, or with a failure if an error occurs shutting down.
1038 self.disconnecting = True
1041 self.deferred = deferLater(
1042 self.reactor, 0, self.connectionLost, connDone)
1043 return self.deferred
1045 stopListening = loseConnection
1047 def _logConnectionLostMsg(self):
1049 Log message for closing port
1051 log.msg('(%s Port %s Closed)' % (self._type, self._realPortNumber))
1054 def connectionLost(self, reason):
1056 Cleans up the socket.
1058 self._logConnectionLostMsg()
1059 self._realPortNumber = None
1061 base.BasePort.connectionLost(self, reason)
1062 self.connected = False
1063 self._closeSocket(True)
1068 self.factory.doStop()
1070 self.disconnecting = False
1073 def logPrefix(self):
1074 """Returns the name of my class, to prefix log entries with.
1076 return reflect.qual(self.factory.__class__)
1081 Return an L{IPv4Address} or L{IPv6Address} indicating the listening
1082 address of this port.
1084 host, port = self.socket.getsockname()[:2]
1085 return self._addressType('TCP', host, port)
1089 class Connector(base.BaseConnector):
1091 A L{Connector} provides of L{twisted.internet.interfaces.IConnector} for
1092 all POSIX-style reactors.
1094 @ivar _addressType: the type returned by L{Connector.getDestination}.
1095 Either L{IPv4Address} or L{IPv6Address}, depending on the type of
1097 @type _addressType: C{type}
1099 _addressType = address.IPv4Address
1101 def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
1102 if isinstance(port, types.StringTypes):
1104 port = socket.getservbyname(port, 'tcp')
1105 except socket.error, e:
1106 raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
1107 self.host, self.port = host, port
1108 if abstract.isIPv6Address(host):
1109 self._addressType = address.IPv6Address
1110 self.bindAddress = bindAddress
1111 base.BaseConnector.__init__(self, factory, timeout, reactor)
1114 def _makeTransport(self):
1116 Create a L{Client} bound to this L{Connector}.
1118 @return: a new L{Client}
1121 return Client(self.host, self.port, self.bindAddress, self, self.reactor)
1124 def getDestination(self):
1126 @see: L{twisted.internet.interfaces.IConnector.getDestination}.
1128 return self._addressType('TCP', self.host, self.port)