Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / test / connectionmixins.py
1 # -*- test-case-name: twisted.internet.test.test_tcp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Various helpers for tests for connection-oriented transports.
7 """
8
9 import socket
10
11 from gc import collect
12 from weakref import ref
13
14 from zope.interface import implements
15 from zope.interface.verify import verifyObject
16
17 from twisted.python import context, log
18 from twisted.python.failure import Failure
19 from twisted.python.runtime import platform
20 from twisted.python.log import ILogContext, msg, err
21 from twisted.internet.defer import Deferred, gatherResults, succeed, fail
22 from twisted.internet.interfaces import (
23     IConnector, IResolverSimple, IReactorFDSet)
24 from twisted.internet.protocol import ClientFactory, Protocol, ServerFactory
25 from twisted.test.test_tcp import ClosingProtocol
26 from twisted.trial.unittest import SkipTest
27 from twisted.internet.error import DNSLookupError
28 from twisted.internet.interfaces import ITLSTransport
29 from twisted.internet.test.reactormixins import ConnectableProtocol
30 from twisted.internet.test.reactormixins import runProtocolsWithReactor
31 from twisted.internet.test.reactormixins import needsRunningReactor
32
33
34
35 def serverFactoryFor(protocol):
36     """
37     Helper function which returns a L{ServerFactory} which will build instances
38     of C{protocol}.
39
40     @param protocol: A callable which returns an L{IProtocol} provider to be
41         used to handle connections to the port the returned factory listens on.
42     """
43     factory = ServerFactory()
44     factory.protocol = protocol
45     return factory
46
47 # ServerFactory is good enough for client endpoints, too.
48 factoryFor = serverFactoryFor
49
50
51
52 def findFreePort(interface='127.0.0.1', family=socket.AF_INET,
53                  type=socket.SOCK_STREAM):
54     """
55     Ask the platform to allocate a free port on the specified interface, then
56     release the socket and return the address which was allocated.
57
58     @param interface: The local address to try to bind the port on.
59     @type interface: C{str}
60
61     @param type: The socket type which will use the resulting port.
62
63     @return: A two-tuple of address and port, like that returned by
64         L{socket.getsockname}.
65     """
66     addr = socket.getaddrinfo(interface, 0)[0][4]
67     probe = socket.socket(family, type)
68     try:
69         probe.bind(addr)
70         return probe.getsockname()
71     finally:
72         probe.close()
73
74
75
76 def _getWriters(reactor):
77     """
78     Like L{IReactorFDSet.getWriters}, but with support for IOCP reactor as
79     well.
80     """
81     if IReactorFDSet.providedBy(reactor):
82         return reactor.getWriters()
83     elif 'IOCP' in reactor.__class__.__name__:
84         return reactor.handles
85     else:
86         # Cannot tell what is going on.
87         raise Exception("Cannot find writers on %r" % (reactor,))
88
89
90
91 class _AcceptOneClient(ServerFactory):
92     """
93     This factory fires a L{Deferred} with a protocol instance shortly after it
94     is constructed (hopefully long enough afterwards so that it has been
95     connected to a transport).
96
97     @ivar reactor: The reactor used to schedule the I{shortly}.
98
99     @ivar result: A L{Deferred} which will be fired with the protocol instance.
100     """
101     def __init__(self, reactor, result):
102         self.reactor = reactor
103         self.result = result
104
105
106     def buildProtocol(self, addr):
107         protocol = ServerFactory.buildProtocol(self, addr)
108         self.reactor.callLater(0, self.result.callback, protocol)
109         return protocol
110
111
112
113 class _SimplePullProducer(object):
114     """
115     A pull producer which writes one byte whenever it is resumed.  For use by
116     L{test_unregisterProducerAfterDisconnect}.
117     """
118     def __init__(self, consumer):
119         self.consumer = consumer
120
121
122     def stopProducing(self):
123         pass
124
125
126     def resumeProducing(self):
127         log.msg("Producer.resumeProducing")
128         self.consumer.write('x')
129
130
131
132 class Stop(ClientFactory):
133     """
134     A client factory which stops a reactor when a connection attempt fails.
135     """
136     failReason = None
137
138     def __init__(self, reactor):
139         self.reactor = reactor
140
141
142     def clientConnectionFailed(self, connector, reason):
143         self.failReason = reason
144         msg("Stop(CF) cCFailed: %s" % (reason.getErrorMessage(),))
145         self.reactor.stop()
146
147
148
149 class FakeResolver(object):
150     """
151     A resolver implementation based on a C{dict} mapping names to addresses.
152     """
153     implements(IResolverSimple)
154
155     def __init__(self, names):
156         self.names = names
157
158
159     def getHostByName(self, name, timeout):
160         try:
161             return succeed(self.names[name])
162         except KeyError:
163             return fail(DNSLookupError("FakeResolver couldn't find " + name))
164
165
166
167 class ClosingLaterProtocol(ConnectableProtocol):
168     """
169     ClosingLaterProtocol exchanges one byte with its peer and then disconnects
170     itself.  This is mostly a work-around for the fact that connectionMade is
171     called before the SSL handshake has completed.
172     """
173     def __init__(self, onConnectionLost):
174         self.lostConnectionReason = None
175         self.onConnectionLost = onConnectionLost
176
177
178     def connectionMade(self):
179         msg("ClosingLaterProtocol.connectionMade")
180
181
182     def dataReceived(self, bytes):
183         msg("ClosingLaterProtocol.dataReceived %r" % (bytes,))
184         self.transport.loseConnection()
185
186
187     def connectionLost(self, reason):
188         msg("ClosingLaterProtocol.connectionLost")
189         self.lostConnectionReason = reason
190         self.onConnectionLost.callback(self)
191
192
193
194 class ConnectionTestsMixin(object):
195     """
196     This mixin defines test methods which should apply to most L{ITransport}
197     implementations.
198     """
199
200     # This should be a reactormixins.EndpointCreator instance.
201     endpoints = None
202
203
204     def test_logPrefix(self):
205         """
206         Client and server transports implement L{ILoggingContext.logPrefix} to
207         return a message reflecting the protocol they are running.
208         """
209         class CustomLogPrefixProtocol(ConnectableProtocol):
210             def __init__(self, prefix):
211                 self._prefix = prefix
212                 self.system = None
213
214             def connectionMade(self):
215                 self.transport.write("a")
216
217             def logPrefix(self):
218                 return self._prefix
219
220             def dataReceived(self, bytes):
221                 self.system = context.get(ILogContext)["system"]
222                 self.transport.write("b")
223                 # Only close connection if both sides have received data, so
224                 # that both sides have system set.
225                 if "b" in bytes:
226                     self.transport.loseConnection()
227
228         client = CustomLogPrefixProtocol("Custom Client")
229         server = CustomLogPrefixProtocol("Custom Server")
230         runProtocolsWithReactor(self, server, client, self.endpoints)
231         self.assertIn("Custom Client", client.system)
232         self.assertIn("Custom Server", server.system)
233
234
235     def test_writeAfterDisconnect(self):
236         """
237         After a connection is disconnected, L{ITransport.write} and
238         L{ITransport.writeSequence} are no-ops.
239         """
240         reactor = self.buildReactor()
241
242         finished = []
243
244         serverConnectionLostDeferred = Deferred()
245         protocol = lambda: ClosingLaterProtocol(serverConnectionLostDeferred)
246         portDeferred = self.endpoints.server(reactor).listen(
247             serverFactoryFor(protocol))
248         def listening(port):
249             msg("Listening on %r" % (port.getHost(),))
250             endpoint = self.endpoints.client(reactor, port.getHost())
251
252             lostConnectionDeferred = Deferred()
253             protocol = lambda: ClosingLaterProtocol(lostConnectionDeferred)
254             client = endpoint.connect(factoryFor(protocol))
255             def write(proto):
256                 msg("About to write to %r" % (proto,))
257                 proto.transport.write('x')
258             client.addCallbacks(write, lostConnectionDeferred.errback)
259
260             def disconnected(proto):
261                 msg("%r disconnected" % (proto,))
262                 proto.transport.write("some bytes to get lost")
263                 proto.transport.writeSequence(["some", "more"])
264                 finished.append(True)
265
266             lostConnectionDeferred.addCallback(disconnected)
267             serverConnectionLostDeferred.addCallback(disconnected)
268             return gatherResults([
269                     lostConnectionDeferred,
270                     serverConnectionLostDeferred])
271
272         def onListen():
273             portDeferred.addCallback(listening)
274             portDeferred.addErrback(err)
275             portDeferred.addCallback(lambda ignored: reactor.stop())
276         needsRunningReactor(reactor, onListen)
277
278         self.runReactor(reactor)
279         self.assertEqual(finished, [True, True])
280
281
282     def test_protocolGarbageAfterLostConnection(self):
283         """
284         After the connection a protocol is being used for is closed, the
285         reactor discards all of its references to the protocol.
286         """
287         lostConnectionDeferred = Deferred()
288         clientProtocol = ClosingLaterProtocol(lostConnectionDeferred)
289         clientRef = ref(clientProtocol)
290
291         reactor = self.buildReactor()
292         portDeferred = self.endpoints.server(reactor).listen(
293             serverFactoryFor(Protocol))
294         def listening(port):
295             msg("Listening on %r" % (port.getHost(),))
296             endpoint = self.endpoints.client(reactor, port.getHost())
297
298             client = endpoint.connect(factoryFor(lambda: clientProtocol))
299             def disconnect(proto):
300                 msg("About to disconnect %r" % (proto,))
301                 proto.transport.loseConnection()
302             client.addCallback(disconnect)
303             client.addErrback(lostConnectionDeferred.errback)
304             return lostConnectionDeferred
305
306         def onListening():
307             portDeferred.addCallback(listening)
308             portDeferred.addErrback(err)
309             portDeferred.addBoth(lambda ignored: reactor.stop())
310         needsRunningReactor(reactor, onListening)
311
312         self.runReactor(reactor)
313
314         # Drop the reference and get the garbage collector to tell us if there
315         # are no references to the protocol instance left in the reactor.
316         clientProtocol = None
317         collect()
318         self.assertIdentical(None, clientRef())
319
320
321
322 class LogObserverMixin(object):
323     """
324     Mixin for L{TestCase} subclasses which want to observe log events.
325     """
326     def observe(self):
327         loggedMessages = []
328         log.addObserver(loggedMessages.append)
329         self.addCleanup(log.removeObserver, loggedMessages.append)
330         return loggedMessages
331
332
333
334 class BrokenContextFactory(object):
335     """
336     A context factory with a broken C{getContext} method, for exercising the
337     error handling for such a case.
338     """
339     message = "Some path was wrong maybe"
340
341     def getContext(self):
342         raise ValueError(self.message)
343
344
345
346 class TCPClientTestsMixin(object):
347     """
348     This mixin defines tests applicable to TCP client implementations.  Classes
349     which mix this in must provide all of the documented instance variables in
350     order to specify how the test works.  These are documented as instance
351     variables rather than declared as methods due to some peculiar inheritance
352     ordering concerns, but they are effectively abstract methods.
353
354     This must be mixed in to a L{ReactorBuilder
355     <twisted.internet.test.reactormixins.ReactorBuilder>} subclass, as it
356     depends on several of its methods.
357
358     @ivar endpoints: A L{twisted.internet.test.reactormixins.EndpointCreator}
359       instance.
360
361     @ivar interface: An IP address literal to locally bind a socket to as well
362         as to connect to.  This can be any valid interface for the local host.
363     @type interface: C{str}
364
365     @ivar port: An unused local listening port to listen on and connect to.
366         This will be used in conjunction with the C{interface}.  (Depending on
367         what they're testing, some tests will locate their own port with
368         L{findFreePort} instead.)
369     @type port: C{int}
370
371     @ivar family: an address family constant, such as L{socket.AF_INET},
372         L{socket.AF_INET6}, or L{socket.AF_UNIX}, which indicates the address
373         family of the transport type under test.
374     @type family: C{int}
375
376     @ivar addressClass: the L{twisted.internet.interfaces.IAddress} implementor
377         associated with the transport type under test.  Must also be a
378         3-argument callable which produces an instance of same.
379     @type addressClass: C{type}
380
381     @ivar fakeDomainName: A fake domain name to use, to simulate hostname
382         resolution and to distinguish between hostnames and IP addresses where
383         necessary.
384     @type fakeDomainName: C{str}
385     """
386
387     def test_interface(self):
388         """
389         L{IReactorTCP.connectTCP} returns an object providing L{IConnector}.
390         """
391         reactor = self.buildReactor()
392         connector = reactor.connectTCP(self.interface, self.port,
393                                        ClientFactory())
394         self.assertTrue(verifyObject(IConnector, connector))
395
396
397     def test_clientConnectionFailedStopsReactor(self):
398         """
399         The reactor can be stopped by a client factory's
400         C{clientConnectionFailed} method.
401         """
402         host, port = findFreePort(self.interface, self.family)[:2]
403         reactor = self.buildReactor()
404         needsRunningReactor(
405             reactor, lambda: reactor.connectTCP(host, port, Stop(reactor)))
406         self.runReactor(reactor)
407
408
409     def test_addresses(self):
410         """
411         A client's transport's C{getHost} and C{getPeer} return L{IPv4Address}
412         instances which have the dotted-quad string form of the resolved
413         adddress of the local and remote endpoints of the connection
414         respectively as their C{host} attribute, not the hostname originally
415         passed in to L{connectTCP
416         <twisted.internet.interfaces.IReactorTCP.connectTCP>}, if a hostname
417         was used.
418         """
419         host, port = findFreePort(self.interface, self.family)[:2]
420         reactor = self.buildReactor()
421         fakeDomain = self.fakeDomainName
422         reactor.installResolver(FakeResolver({fakeDomain: self.interface}))
423
424         server = reactor.listenTCP(
425             0, serverFactoryFor(Protocol), interface=host)
426         serverAddress = server.getHost()
427
428         addresses = {'host': None, 'peer': None}
429         class CheckAddress(Protocol):
430             def makeConnection(self, transport):
431                 addresses['host'] = transport.getHost()
432                 addresses['peer'] = transport.getPeer()
433                 reactor.stop()
434
435         clientFactory = Stop(reactor)
436         clientFactory.protocol = CheckAddress
437
438         def connectMe():
439             reactor.connectTCP(
440                 fakeDomain, server.getHost().port, clientFactory,
441                 bindAddress=(self.interface, port))
442         needsRunningReactor(reactor, connectMe)
443
444         self.runReactor(reactor)
445
446         if clientFactory.failReason:
447             self.fail(clientFactory.failReason.getTraceback())
448
449         self.assertEqual(
450             addresses['host'],
451             self.addressClass('TCP', self.interface, port))
452         self.assertEqual(
453             addresses['peer'],
454             self.addressClass('TCP', self.interface, serverAddress.port))
455
456
457     def test_connectEvent(self):
458         """
459         This test checks that we correctly get notifications event for a
460         client.  This ought to prevent a regression under Windows using the
461         GTK2 reactor.  See #3925.
462         """
463         reactor = self.buildReactor()
464
465         server = reactor.listenTCP(0, serverFactoryFor(Protocol),
466                                    interface=self.interface)
467         connected = []
468
469         class CheckConnection(Protocol):
470             def connectionMade(self):
471                 connected.append(self)
472                 reactor.stop()
473
474         clientFactory = Stop(reactor)
475         clientFactory.protocol = CheckConnection
476
477         needsRunningReactor(reactor, lambda: reactor.connectTCP(
478             self.interface, server.getHost().port, clientFactory))
479
480         reactor.run()
481
482         self.assertTrue(connected)
483
484
485     def test_unregisterProducerAfterDisconnect(self):
486         """
487         If a producer is unregistered from a L{ITCPTransport} provider after
488         the transport has been disconnected (by the peer) and after
489         L{ITCPTransport.loseConnection} has been called, the transport is not
490         re-added to the reactor as a writer as would be necessary if the
491         transport were still connected.
492         """
493         reactor = self.buildReactor()
494         port = reactor.listenTCP(0, serverFactoryFor(ClosingProtocol),
495                                  interface=self.interface)
496
497         finished = Deferred()
498         finished.addErrback(log.err)
499         finished.addCallback(lambda ign: reactor.stop())
500
501         writing = []
502
503         class ClientProtocol(Protocol):
504             """
505             Protocol to connect, register a producer, try to lose the
506             connection, wait for the server to disconnect from us, and then
507             unregister the producer.
508             """
509             def connectionMade(self):
510                 log.msg("ClientProtocol.connectionMade")
511                 self.transport.registerProducer(
512                     _SimplePullProducer(self.transport), False)
513                 self.transport.loseConnection()
514
515             def connectionLost(self, reason):
516                 log.msg("ClientProtocol.connectionLost")
517                 self.unregister()
518                 writing.append(self.transport in _getWriters(reactor))
519                 finished.callback(None)
520
521             def unregister(self):
522                 log.msg("ClientProtocol unregister")
523                 self.transport.unregisterProducer()
524
525         clientFactory = ClientFactory()
526         clientFactory.protocol = ClientProtocol
527         reactor.connectTCP(self.interface, port.getHost().port, clientFactory)
528         self.runReactor(reactor)
529         self.assertFalse(writing[0],
530                          "Transport was writing after unregisterProducer.")
531
532
533     def test_disconnectWhileProducing(self):
534         """
535         If L{ITCPTransport.loseConnection} is called while a producer is
536         registered with the transport, the connection is closed after the
537         producer is unregistered.
538         """
539         reactor = self.buildReactor()
540
541         # For some reason, pyobject/pygtk will not deliver the close
542         # notification that should happen after the unregisterProducer call in
543         # this test.  The selectable is in the write notification set, but no
544         # notification ever arrives.  Probably for the same reason #5233 led
545         # win32eventreactor to be broken.
546         skippedReactors = ["Glib2Reactor", "Gtk2Reactor"]
547         reactorClassName = reactor.__class__.__name__
548         if reactorClassName in skippedReactors and platform.isWindows():
549             raise SkipTest(
550                 "A pygobject/pygtk bug disables this functionality on Windows.")
551
552         class Producer:
553             def resumeProducing(self):
554                 log.msg("Producer.resumeProducing")
555
556         port = reactor.listenTCP(0, serverFactoryFor(Protocol),
557             interface=self.interface)
558
559         finished = Deferred()
560         finished.addErrback(log.err)
561         finished.addCallback(lambda ign: reactor.stop())
562
563         class ClientProtocol(Protocol):
564             """
565             Protocol to connect, register a producer, try to lose the
566             connection, unregister the producer, and wait for the connection to
567             actually be lost.
568             """
569             def connectionMade(self):
570                 log.msg("ClientProtocol.connectionMade")
571                 self.transport.registerProducer(Producer(), False)
572                 self.transport.loseConnection()
573                 # Let the reactor tick over, in case synchronously calling
574                 # loseConnection and then unregisterProducer is the same as
575                 # synchronously calling unregisterProducer and then
576                 # loseConnection (as it is in several reactors).
577                 reactor.callLater(0, reactor.callLater, 0, self.unregister)
578
579             def unregister(self):
580                 log.msg("ClientProtocol unregister")
581                 self.transport.unregisterProducer()
582                 # This should all be pretty quick.  Fail the test
583                 # if we don't get a connectionLost event really
584                 # soon.
585                 reactor.callLater(
586                     1.0, finished.errback,
587                     Failure(Exception("Connection was not lost")))
588
589             def connectionLost(self, reason):
590                 log.msg("ClientProtocol.connectionLost")
591                 finished.callback(None)
592
593         clientFactory = ClientFactory()
594         clientFactory.protocol = ClientProtocol
595         reactor.connectTCP(self.interface, port.getHost().port, clientFactory)
596         self.runReactor(reactor)
597         # If the test failed, we logged an error already and trial
598         # will catch it.
599
600
601     def test_badContext(self):
602         """
603         If the context factory passed to L{ITCPTransport.startTLS} raises an
604         exception from its C{getContext} method, that exception is raised by
605         L{ITCPTransport.startTLS}.
606         """
607         reactor = self.buildReactor()
608
609         brokenFactory = BrokenContextFactory()
610         results = []
611
612         serverFactory = ServerFactory()
613         serverFactory.protocol = Protocol
614
615         port = reactor.listenTCP(0, serverFactory, interface=self.interface)
616         endpoint = self.endpoints.client(reactor, port.getHost())
617
618         clientFactory = ClientFactory()
619         clientFactory.protocol = Protocol
620         connectDeferred = endpoint.connect(clientFactory)
621
622         def connected(protocol):
623             if not ITLSTransport.providedBy(protocol.transport):
624                 results.append("skip")
625             else:
626                 results.append(self.assertRaises(ValueError,
627                                                  protocol.transport.startTLS,
628                                                  brokenFactory))
629
630         def connectFailed(failure):
631             results.append(failure)
632
633         def whenRun():
634             connectDeferred.addCallback(connected)
635             connectDeferred.addErrback(connectFailed)
636             connectDeferred.addBoth(lambda ign: reactor.stop())
637         needsRunningReactor(reactor, whenRun)
638
639         self.runReactor(reactor)
640
641         self.assertEqual(len(results), 1,
642                          "more than one callback result: %s" % (results,))
643
644         if isinstance(results[0], Failure):
645             # self.fail(Failure)
646             results[0].raiseException()
647         if results[0] == "skip":
648             raise SkipTest("Reactor does not support ITLSTransport")
649         self.assertEqual(BrokenContextFactory.message, str(results[0]))