1 # -*- test-case-name: twisted.internet.test.test_tcp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Various helpers for tests for connection-oriented transports.
11 from gc import collect
12 from weakref import ref
14 from zope.interface import implements
15 from zope.interface.verify import verifyObject
17 from twisted.python import context, log
18 from twisted.python.failure import Failure
19 from twisted.python.runtime import platform
20 from twisted.python.log import ILogContext, msg, err
21 from twisted.internet.defer import Deferred, gatherResults, succeed, fail
22 from twisted.internet.interfaces import (
23 IConnector, IResolverSimple, IReactorFDSet)
24 from twisted.internet.protocol import ClientFactory, Protocol, ServerFactory
25 from twisted.test.test_tcp import ClosingProtocol
26 from twisted.trial.unittest import SkipTest
27 from twisted.internet.error import DNSLookupError
28 from twisted.internet.interfaces import ITLSTransport
29 from twisted.internet.test.reactormixins import ConnectableProtocol
30 from twisted.internet.test.reactormixins import runProtocolsWithReactor
31 from twisted.internet.test.reactormixins import needsRunningReactor
35 def serverFactoryFor(protocol):
37 Helper function which returns a L{ServerFactory} which will build instances
40 @param protocol: A callable which returns an L{IProtocol} provider to be
41 used to handle connections to the port the returned factory listens on.
43 factory = ServerFactory()
44 factory.protocol = protocol
47 # ServerFactory is good enough for client endpoints, too.
48 factoryFor = serverFactoryFor
52 def findFreePort(interface='127.0.0.1', family=socket.AF_INET,
53 type=socket.SOCK_STREAM):
55 Ask the platform to allocate a free port on the specified interface, then
56 release the socket and return the address which was allocated.
58 @param interface: The local address to try to bind the port on.
59 @type interface: C{str}
61 @param type: The socket type which will use the resulting port.
63 @return: A two-tuple of address and port, like that returned by
64 L{socket.getsockname}.
66 addr = socket.getaddrinfo(interface, 0)[0][4]
67 probe = socket.socket(family, type)
70 return probe.getsockname()
76 def _getWriters(reactor):
78 Like L{IReactorFDSet.getWriters}, but with support for IOCP reactor as
81 if IReactorFDSet.providedBy(reactor):
82 return reactor.getWriters()
83 elif 'IOCP' in reactor.__class__.__name__:
84 return reactor.handles
86 # Cannot tell what is going on.
87 raise Exception("Cannot find writers on %r" % (reactor,))
91 class _AcceptOneClient(ServerFactory):
93 This factory fires a L{Deferred} with a protocol instance shortly after it
94 is constructed (hopefully long enough afterwards so that it has been
95 connected to a transport).
97 @ivar reactor: The reactor used to schedule the I{shortly}.
99 @ivar result: A L{Deferred} which will be fired with the protocol instance.
101 def __init__(self, reactor, result):
102 self.reactor = reactor
106 def buildProtocol(self, addr):
107 protocol = ServerFactory.buildProtocol(self, addr)
108 self.reactor.callLater(0, self.result.callback, protocol)
113 class _SimplePullProducer(object):
115 A pull producer which writes one byte whenever it is resumed. For use by
116 L{test_unregisterProducerAfterDisconnect}.
118 def __init__(self, consumer):
119 self.consumer = consumer
122 def stopProducing(self):
126 def resumeProducing(self):
127 log.msg("Producer.resumeProducing")
128 self.consumer.write('x')
132 class Stop(ClientFactory):
134 A client factory which stops a reactor when a connection attempt fails.
138 def __init__(self, reactor):
139 self.reactor = reactor
142 def clientConnectionFailed(self, connector, reason):
143 self.failReason = reason
144 msg("Stop(CF) cCFailed: %s" % (reason.getErrorMessage(),))
149 class FakeResolver(object):
151 A resolver implementation based on a C{dict} mapping names to addresses.
153 implements(IResolverSimple)
155 def __init__(self, names):
159 def getHostByName(self, name, timeout):
161 return succeed(self.names[name])
163 return fail(DNSLookupError("FakeResolver couldn't find " + name))
167 class ClosingLaterProtocol(ConnectableProtocol):
169 ClosingLaterProtocol exchanges one byte with its peer and then disconnects
170 itself. This is mostly a work-around for the fact that connectionMade is
171 called before the SSL handshake has completed.
173 def __init__(self, onConnectionLost):
174 self.lostConnectionReason = None
175 self.onConnectionLost = onConnectionLost
178 def connectionMade(self):
179 msg("ClosingLaterProtocol.connectionMade")
182 def dataReceived(self, bytes):
183 msg("ClosingLaterProtocol.dataReceived %r" % (bytes,))
184 self.transport.loseConnection()
187 def connectionLost(self, reason):
188 msg("ClosingLaterProtocol.connectionLost")
189 self.lostConnectionReason = reason
190 self.onConnectionLost.callback(self)
194 class ConnectionTestsMixin(object):
196 This mixin defines test methods which should apply to most L{ITransport}
200 # This should be a reactormixins.EndpointCreator instance.
204 def test_logPrefix(self):
206 Client and server transports implement L{ILoggingContext.logPrefix} to
207 return a message reflecting the protocol they are running.
209 class CustomLogPrefixProtocol(ConnectableProtocol):
210 def __init__(self, prefix):
211 self._prefix = prefix
214 def connectionMade(self):
215 self.transport.write("a")
220 def dataReceived(self, bytes):
221 self.system = context.get(ILogContext)["system"]
222 self.transport.write("b")
223 # Only close connection if both sides have received data, so
224 # that both sides have system set.
226 self.transport.loseConnection()
228 client = CustomLogPrefixProtocol("Custom Client")
229 server = CustomLogPrefixProtocol("Custom Server")
230 runProtocolsWithReactor(self, server, client, self.endpoints)
231 self.assertIn("Custom Client", client.system)
232 self.assertIn("Custom Server", server.system)
235 def test_writeAfterDisconnect(self):
237 After a connection is disconnected, L{ITransport.write} and
238 L{ITransport.writeSequence} are no-ops.
240 reactor = self.buildReactor()
244 serverConnectionLostDeferred = Deferred()
245 protocol = lambda: ClosingLaterProtocol(serverConnectionLostDeferred)
246 portDeferred = self.endpoints.server(reactor).listen(
247 serverFactoryFor(protocol))
249 msg("Listening on %r" % (port.getHost(),))
250 endpoint = self.endpoints.client(reactor, port.getHost())
252 lostConnectionDeferred = Deferred()
253 protocol = lambda: ClosingLaterProtocol(lostConnectionDeferred)
254 client = endpoint.connect(factoryFor(protocol))
256 msg("About to write to %r" % (proto,))
257 proto.transport.write('x')
258 client.addCallbacks(write, lostConnectionDeferred.errback)
260 def disconnected(proto):
261 msg("%r disconnected" % (proto,))
262 proto.transport.write("some bytes to get lost")
263 proto.transport.writeSequence(["some", "more"])
264 finished.append(True)
266 lostConnectionDeferred.addCallback(disconnected)
267 serverConnectionLostDeferred.addCallback(disconnected)
268 return gatherResults([
269 lostConnectionDeferred,
270 serverConnectionLostDeferred])
273 portDeferred.addCallback(listening)
274 portDeferred.addErrback(err)
275 portDeferred.addCallback(lambda ignored: reactor.stop())
276 needsRunningReactor(reactor, onListen)
278 self.runReactor(reactor)
279 self.assertEqual(finished, [True, True])
282 def test_protocolGarbageAfterLostConnection(self):
284 After the connection a protocol is being used for is closed, the
285 reactor discards all of its references to the protocol.
287 lostConnectionDeferred = Deferred()
288 clientProtocol = ClosingLaterProtocol(lostConnectionDeferred)
289 clientRef = ref(clientProtocol)
291 reactor = self.buildReactor()
292 portDeferred = self.endpoints.server(reactor).listen(
293 serverFactoryFor(Protocol))
295 msg("Listening on %r" % (port.getHost(),))
296 endpoint = self.endpoints.client(reactor, port.getHost())
298 client = endpoint.connect(factoryFor(lambda: clientProtocol))
299 def disconnect(proto):
300 msg("About to disconnect %r" % (proto,))
301 proto.transport.loseConnection()
302 client.addCallback(disconnect)
303 client.addErrback(lostConnectionDeferred.errback)
304 return lostConnectionDeferred
307 portDeferred.addCallback(listening)
308 portDeferred.addErrback(err)
309 portDeferred.addBoth(lambda ignored: reactor.stop())
310 needsRunningReactor(reactor, onListening)
312 self.runReactor(reactor)
314 # Drop the reference and get the garbage collector to tell us if there
315 # are no references to the protocol instance left in the reactor.
316 clientProtocol = None
318 self.assertIdentical(None, clientRef())
322 class LogObserverMixin(object):
324 Mixin for L{TestCase} subclasses which want to observe log events.
328 log.addObserver(loggedMessages.append)
329 self.addCleanup(log.removeObserver, loggedMessages.append)
330 return loggedMessages
334 class BrokenContextFactory(object):
336 A context factory with a broken C{getContext} method, for exercising the
337 error handling for such a case.
339 message = "Some path was wrong maybe"
341 def getContext(self):
342 raise ValueError(self.message)
346 class TCPClientTestsMixin(object):
348 This mixin defines tests applicable to TCP client implementations. Classes
349 which mix this in must provide all of the documented instance variables in
350 order to specify how the test works. These are documented as instance
351 variables rather than declared as methods due to some peculiar inheritance
352 ordering concerns, but they are effectively abstract methods.
354 This must be mixed in to a L{ReactorBuilder
355 <twisted.internet.test.reactormixins.ReactorBuilder>} subclass, as it
356 depends on several of its methods.
358 @ivar endpoints: A L{twisted.internet.test.reactormixins.EndpointCreator}
361 @ivar interface: An IP address literal to locally bind a socket to as well
362 as to connect to. This can be any valid interface for the local host.
363 @type interface: C{str}
365 @ivar port: An unused local listening port to listen on and connect to.
366 This will be used in conjunction with the C{interface}. (Depending on
367 what they're testing, some tests will locate their own port with
368 L{findFreePort} instead.)
371 @ivar family: an address family constant, such as L{socket.AF_INET},
372 L{socket.AF_INET6}, or L{socket.AF_UNIX}, which indicates the address
373 family of the transport type under test.
376 @ivar addressClass: the L{twisted.internet.interfaces.IAddress} implementor
377 associated with the transport type under test. Must also be a
378 3-argument callable which produces an instance of same.
379 @type addressClass: C{type}
381 @ivar fakeDomainName: A fake domain name to use, to simulate hostname
382 resolution and to distinguish between hostnames and IP addresses where
384 @type fakeDomainName: C{str}
387 def test_interface(self):
389 L{IReactorTCP.connectTCP} returns an object providing L{IConnector}.
391 reactor = self.buildReactor()
392 connector = reactor.connectTCP(self.interface, self.port,
394 self.assertTrue(verifyObject(IConnector, connector))
397 def test_clientConnectionFailedStopsReactor(self):
399 The reactor can be stopped by a client factory's
400 C{clientConnectionFailed} method.
402 host, port = findFreePort(self.interface, self.family)[:2]
403 reactor = self.buildReactor()
405 reactor, lambda: reactor.connectTCP(host, port, Stop(reactor)))
406 self.runReactor(reactor)
409 def test_addresses(self):
411 A client's transport's C{getHost} and C{getPeer} return L{IPv4Address}
412 instances which have the dotted-quad string form of the resolved
413 adddress of the local and remote endpoints of the connection
414 respectively as their C{host} attribute, not the hostname originally
415 passed in to L{connectTCP
416 <twisted.internet.interfaces.IReactorTCP.connectTCP>}, if a hostname
419 host, port = findFreePort(self.interface, self.family)[:2]
420 reactor = self.buildReactor()
421 fakeDomain = self.fakeDomainName
422 reactor.installResolver(FakeResolver({fakeDomain: self.interface}))
424 server = reactor.listenTCP(
425 0, serverFactoryFor(Protocol), interface=host)
426 serverAddress = server.getHost()
428 addresses = {'host': None, 'peer': None}
429 class CheckAddress(Protocol):
430 def makeConnection(self, transport):
431 addresses['host'] = transport.getHost()
432 addresses['peer'] = transport.getPeer()
435 clientFactory = Stop(reactor)
436 clientFactory.protocol = CheckAddress
440 fakeDomain, server.getHost().port, clientFactory,
441 bindAddress=(self.interface, port))
442 needsRunningReactor(reactor, connectMe)
444 self.runReactor(reactor)
446 if clientFactory.failReason:
447 self.fail(clientFactory.failReason.getTraceback())
451 self.addressClass('TCP', self.interface, port))
454 self.addressClass('TCP', self.interface, serverAddress.port))
457 def test_connectEvent(self):
459 This test checks that we correctly get notifications event for a
460 client. This ought to prevent a regression under Windows using the
461 GTK2 reactor. See #3925.
463 reactor = self.buildReactor()
465 server = reactor.listenTCP(0, serverFactoryFor(Protocol),
466 interface=self.interface)
469 class CheckConnection(Protocol):
470 def connectionMade(self):
471 connected.append(self)
474 clientFactory = Stop(reactor)
475 clientFactory.protocol = CheckConnection
477 needsRunningReactor(reactor, lambda: reactor.connectTCP(
478 self.interface, server.getHost().port, clientFactory))
482 self.assertTrue(connected)
485 def test_unregisterProducerAfterDisconnect(self):
487 If a producer is unregistered from a L{ITCPTransport} provider after
488 the transport has been disconnected (by the peer) and after
489 L{ITCPTransport.loseConnection} has been called, the transport is not
490 re-added to the reactor as a writer as would be necessary if the
491 transport were still connected.
493 reactor = self.buildReactor()
494 port = reactor.listenTCP(0, serverFactoryFor(ClosingProtocol),
495 interface=self.interface)
497 finished = Deferred()
498 finished.addErrback(log.err)
499 finished.addCallback(lambda ign: reactor.stop())
503 class ClientProtocol(Protocol):
505 Protocol to connect, register a producer, try to lose the
506 connection, wait for the server to disconnect from us, and then
507 unregister the producer.
509 def connectionMade(self):
510 log.msg("ClientProtocol.connectionMade")
511 self.transport.registerProducer(
512 _SimplePullProducer(self.transport), False)
513 self.transport.loseConnection()
515 def connectionLost(self, reason):
516 log.msg("ClientProtocol.connectionLost")
518 writing.append(self.transport in _getWriters(reactor))
519 finished.callback(None)
521 def unregister(self):
522 log.msg("ClientProtocol unregister")
523 self.transport.unregisterProducer()
525 clientFactory = ClientFactory()
526 clientFactory.protocol = ClientProtocol
527 reactor.connectTCP(self.interface, port.getHost().port, clientFactory)
528 self.runReactor(reactor)
529 self.assertFalse(writing[0],
530 "Transport was writing after unregisterProducer.")
533 def test_disconnectWhileProducing(self):
535 If L{ITCPTransport.loseConnection} is called while a producer is
536 registered with the transport, the connection is closed after the
537 producer is unregistered.
539 reactor = self.buildReactor()
541 # For some reason, pyobject/pygtk will not deliver the close
542 # notification that should happen after the unregisterProducer call in
543 # this test. The selectable is in the write notification set, but no
544 # notification ever arrives. Probably for the same reason #5233 led
545 # win32eventreactor to be broken.
546 skippedReactors = ["Glib2Reactor", "Gtk2Reactor"]
547 reactorClassName = reactor.__class__.__name__
548 if reactorClassName in skippedReactors and platform.isWindows():
550 "A pygobject/pygtk bug disables this functionality on Windows.")
553 def resumeProducing(self):
554 log.msg("Producer.resumeProducing")
556 port = reactor.listenTCP(0, serverFactoryFor(Protocol),
557 interface=self.interface)
559 finished = Deferred()
560 finished.addErrback(log.err)
561 finished.addCallback(lambda ign: reactor.stop())
563 class ClientProtocol(Protocol):
565 Protocol to connect, register a producer, try to lose the
566 connection, unregister the producer, and wait for the connection to
569 def connectionMade(self):
570 log.msg("ClientProtocol.connectionMade")
571 self.transport.registerProducer(Producer(), False)
572 self.transport.loseConnection()
573 # Let the reactor tick over, in case synchronously calling
574 # loseConnection and then unregisterProducer is the same as
575 # synchronously calling unregisterProducer and then
576 # loseConnection (as it is in several reactors).
577 reactor.callLater(0, reactor.callLater, 0, self.unregister)
579 def unregister(self):
580 log.msg("ClientProtocol unregister")
581 self.transport.unregisterProducer()
582 # This should all be pretty quick. Fail the test
583 # if we don't get a connectionLost event really
586 1.0, finished.errback,
587 Failure(Exception("Connection was not lost")))
589 def connectionLost(self, reason):
590 log.msg("ClientProtocol.connectionLost")
591 finished.callback(None)
593 clientFactory = ClientFactory()
594 clientFactory.protocol = ClientProtocol
595 reactor.connectTCP(self.interface, port.getHost().port, clientFactory)
596 self.runReactor(reactor)
597 # If the test failed, we logged an error already and trial
601 def test_badContext(self):
603 If the context factory passed to L{ITCPTransport.startTLS} raises an
604 exception from its C{getContext} method, that exception is raised by
605 L{ITCPTransport.startTLS}.
607 reactor = self.buildReactor()
609 brokenFactory = BrokenContextFactory()
612 serverFactory = ServerFactory()
613 serverFactory.protocol = Protocol
615 port = reactor.listenTCP(0, serverFactory, interface=self.interface)
616 endpoint = self.endpoints.client(reactor, port.getHost())
618 clientFactory = ClientFactory()
619 clientFactory.protocol = Protocol
620 connectDeferred = endpoint.connect(clientFactory)
622 def connected(protocol):
623 if not ITLSTransport.providedBy(protocol.transport):
624 results.append("skip")
626 results.append(self.assertRaises(ValueError,
627 protocol.transport.startTLS,
630 def connectFailed(failure):
631 results.append(failure)
634 connectDeferred.addCallback(connected)
635 connectDeferred.addErrback(connectFailed)
636 connectDeferred.addBoth(lambda ign: reactor.stop())
637 needsRunningReactor(reactor, whenRun)
639 self.runReactor(reactor)
641 self.assertEqual(len(results), 1,
642 "more than one callback result: %s" % (results,))
644 if isinstance(results[0], Failure):
646 results[0].raiseException()
647 if results[0] == "skip":
648 raise SkipTest("Reactor does not support ITLSTransport")
649 self.assertEqual(BrokenContextFactory.message, str(results[0]))