1 # -*- test-case-name: twisted.test.test_stringtransport -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Assorted functionality which is commonly useful when writing unit tests.
9 from socket import AF_INET, AF_INET6
10 from StringIO import StringIO
12 from zope.interface import implements
14 from twisted.internet.interfaces import (
15 ITransport, IConsumer, IPushProducer, IConnector)
16 from twisted.internet.interfaces import (
17 IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
18 from twisted.internet.interfaces import IListeningPort
19 from twisted.protocols import basic
20 from twisted.internet import protocol, error, address
22 from twisted.internet.address import IPv4Address, UNIXAddress
25 class AccumulatingProtocol(protocol.Protocol):
27 L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
28 the data delivered to it and can fire a Deferred when it is connected or
31 @ivar made: A flag indicating whether C{connectionMade} has been called.
32 @ivar data: A string giving all the data passed to C{dataReceived}.
33 @ivar closed: A flag indicated whether C{connectionLost} has been called.
34 @ivar closedReason: The value of the I{reason} parameter passed to
36 @ivar closedDeferred: If set to a L{Deferred}, this will be fired when
37 C{connectionLost} is called.
48 def connectionMade(self):
50 if (self.factory is not None and
51 self.factory.protocolConnectionMade is not None):
52 d = self.factory.protocolConnectionMade
53 self.factory.protocolConnectionMade = None
56 def dataReceived(self, data):
59 def connectionLost(self, reason):
61 self.closedReason = reason
62 if self.closedDeferred is not None:
63 d, self.closedDeferred = self.closedDeferred, None
67 class LineSendingProtocol(basic.LineReceiver):
70 def __init__(self, lines, start = True):
75 def connectionMade(self):
77 map(self.sendLine, self.lines)
79 def lineReceived(self, line):
81 map(self.sendLine, self.lines)
83 self.response.append(line)
85 def connectionLost(self, reason):
89 class FakeDatagramTransport:
95 def write(self, packet, addr=noAddr):
96 self.written.append((packet, addr))
99 class StringTransport:
101 A transport implementation which buffers data in memory and keeps track of
102 its other state without providing any behavior.
104 L{StringTransport} has a number of attributes which are not part of any of
105 the interfaces it claims to implement. These attributes are provided for
106 testing purposes. Implementation code should not use any of these
107 attributes; they are not provided by other transports.
109 @ivar disconnecting: A C{bool} which is C{False} until L{loseConnection} is
110 called, then C{True}.
112 @ivar producer: If a producer is currently registered, C{producer} is a
113 reference to it. Otherwise, C{None}.
115 @ivar streaming: If a producer is currently registered, C{streaming} refers
116 to the value of the second parameter passed to C{registerProducer}.
118 @ivar hostAddr: C{None} or an object which will be returned as the host
119 address of this transport. If C{None}, a nasty tuple will be returned
122 @ivar peerAddr: C{None} or an object which will be returned as the peer
123 address of this transport. If C{None}, a nasty tuple will be returned
126 @ivar producerState: The state of this L{StringTransport} in its capacity
127 as an L{IPushProducer}. One of C{'producing'}, C{'paused'}, or
130 @ivar io: A L{StringIO} which holds the data which has been written to this
131 transport since the last call to L{clear}. Use L{value} instead of
132 accessing this directly.
134 implements(ITransport, IConsumer, IPushProducer)
136 disconnecting = False
144 producerState = 'producing'
146 def __init__(self, hostAddress=None, peerAddress=None):
148 if hostAddress is not None:
149 self.hostAddr = hostAddress
150 if peerAddress is not None:
151 self.peerAddr = peerAddress
152 self.connected = True
156 Discard all data written to this transport so far.
158 This is not a transport method. It is intended for tests. Do not use
159 it in implementation code.
166 Retrieve all data which has been buffered by this transport.
168 This is not a transport method. It is intended for tests. Do not use
169 it in implementation code.
171 @return: A C{str} giving all data written to this transport since the
172 last call to L{clear}.
175 return self.io.getvalue()
179 def write(self, data):
180 if isinstance(data, unicode): # no, really, I mean it
181 raise TypeError("Data must not be unicode")
185 def writeSequence(self, data):
186 self.io.write(''.join(data))
189 def loseConnection(self):
191 Close the connection. Does nothing besides toggle the C{disconnecting}
192 instance variable to C{True}.
194 self.disconnecting = True
198 if self.peerAddr is None:
199 return address.IPv4Address('TCP', '192.168.1.1', 54321)
204 if self.hostAddr is None:
205 return address.IPv4Address('TCP', '10.0.0.1', 12345)
210 def registerProducer(self, producer, streaming):
211 if self.producer is not None:
212 raise RuntimeError("Cannot register two producers")
213 self.producer = producer
214 self.streaming = streaming
217 def unregisterProducer(self):
218 if self.producer is None:
220 "Cannot unregister a producer unless one is registered")
222 self.streaming = None
226 def _checkState(self):
227 if self.disconnecting:
229 "Cannot resume producing after loseConnection")
230 if self.producerState == 'stopped':
231 raise RuntimeError("Cannot resume a stopped producer")
234 def pauseProducing(self):
236 self.producerState = 'paused'
239 def stopProducing(self):
240 self.producerState = 'stopped'
243 def resumeProducing(self):
245 self.producerState = 'producing'
249 class StringTransportWithDisconnection(StringTransport):
250 def loseConnection(self):
252 self.connected = False
253 self.protocol.connectionLost(error.ConnectionDone("Bye."))
257 class StringIOWithoutClosing(StringIO):
259 A StringIO that can't be closed.
268 class _FakePort(object):
270 A fake L{IListeningPort} to be used in tests.
272 @ivar _hostAddress: The L{IAddress} this L{IListeningPort} is pretending
275 implements(IListeningPort)
277 def __init__(self, hostAddress):
279 @param hostAddress: An L{IAddress} this L{IListeningPort} should
280 pretend to be listening on.
282 self._hostAddress = hostAddress
285 def startListening(self):
287 Fake L{IListeningPort.startListening} that doesn't do anything.
291 def stopListening(self):
293 Fake L{IListeningPort.stopListening} that doesn't do anything.
299 Fake L{IListeningPort.getHost} that returns our L{IAddress}.
301 return self._hostAddress
305 class _FakeConnector(object):
307 A fake L{IConnector} that allows us to inspect if it has been told to stop
310 @ivar stoppedConnecting: has this connector's
311 L{FakeConnector.stopConnecting} method been invoked yet?
313 @ivar _address: An L{IAddress} provider that represents our destination.
315 implements(IConnector)
317 stoppedConnecting = False
319 def __init__(self, address):
321 @param address: An L{IAddress} provider that represents this
322 connector's destination.
324 self._address = address
327 def stopConnecting(self):
329 Implement L{IConnector.stopConnecting} and set
330 L{FakeConnector.stoppedConnecting} to C{True}
332 self.stoppedConnecting = True
335 def disconnect(self):
337 Implement L{IConnector.disconnect} as a no-op.
343 Implement L{IConnector.connect} as a no-op.
347 def getDestination(self):
349 Implement L{IConnector.getDestination} to return the C{address} passed
356 class MemoryReactor(object):
358 A fake reactor to be used in tests. This reactor doesn't actually do
359 much that's useful yet. It accepts TCP connection setup attempts, but
360 they will never succeed.
362 @ivar tcpClients: a list that keeps track of connection attempts (ie, calls
364 @type tcpClients: C{list}
366 @ivar tcpServers: a list that keeps track of server listen attempts (ie, calls
368 @type tcpServers: C{list}
370 @ivar sslClients: a list that keeps track of connection attempts (ie,
371 calls to C{connectSSL}).
372 @type sslClients: C{list}
374 @ivar sslServers: a list that keeps track of server listen attempts (ie,
375 calls to C{listenSSL}).
376 @type sslServers: C{list}
378 @ivar unixClients: a list that keeps track of connection attempts (ie,
379 calls to C{connectUNIX}).
380 @type unixClients: C{list}
382 @ivar unixServers: a list that keeps track of server listen attempts (ie,
383 calls to C{listenUNIX}).
384 @type unixServers: C{list}
386 @ivar adoptedPorts: a list that keeps track of server listen attempts (ie,
387 calls to C{adoptStreamPort}).
389 implements(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
393 Initialize the tracking lists.
399 self.unixClients = []
400 self.unixServers = []
401 self.adoptedPorts = []
404 def adoptStreamPort(self, fileno, addressFamily, factory):
406 Fake L{IReactorSocket.adoptStreamPort}, that logs the call and returns
407 an L{IListeningPort}.
409 if addressFamily == AF_INET:
410 addr = IPv4Address('TCP', '0.0.0.0', 1234)
411 elif addressFamily == AF_INET6:
412 addr = IPv6Address('TCP', '::', 1234)
414 raise UnsupportedAddressFamily()
416 self.adoptedPorts.append((fileno, addressFamily, factory))
417 return _FakePort(addr)
420 def listenTCP(self, port, factory, backlog=50, interface=''):
422 Fake L{reactor.listenTCP}, that logs the call and returns an
425 self.tcpServers.append((port, factory, backlog, interface))
426 return _FakePort(IPv4Address('TCP', '0.0.0.0', port))
429 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
431 Fake L{reactor.connectTCP}, that logs the call and returns an
434 self.tcpClients.append((host, port, factory, timeout, bindAddress))
435 conn = _FakeConnector(IPv4Address('TCP', host, port))
436 factory.startedConnecting(conn)
440 def listenSSL(self, port, factory, contextFactory,
441 backlog=50, interface=''):
443 Fake L{reactor.listenSSL}, that logs the call and returns an
446 self.sslServers.append((port, factory, contextFactory,
448 return _FakePort(IPv4Address('TCP', '0.0.0.0', port))
451 def connectSSL(self, host, port, factory, contextFactory,
452 timeout=30, bindAddress=None):
454 Fake L{reactor.connectSSL}, that logs the call and returns an
457 self.sslClients.append((host, port, factory, contextFactory,
458 timeout, bindAddress))
459 conn = _FakeConnector(IPv4Address('TCP', host, port))
460 factory.startedConnecting(conn)
464 def listenUNIX(self, address, factory,
465 backlog=50, mode=0666, wantPID=0):
467 Fake L{reactor.listenUNIX}, that logs the call and returns an
470 self.unixServers.append((address, factory, backlog, mode, wantPID))
471 return _FakePort(UNIXAddress(address))
474 def connectUNIX(self, address, factory, timeout=30, checkPID=0):
476 Fake L{reactor.connectUNIX}, that logs the call and returns an
479 self.unixClients.append((address, factory, timeout, checkPID))
480 conn = _FakeConnector(UNIXAddress(address))
481 factory.startedConnecting(conn)
486 class RaisingMemoryReactor(object):
488 A fake reactor to be used in tests. It accepts TCP connection setup
489 attempts, but they will fail.
491 @ivar _listenException: An instance of an L{Exception}
492 @ivar _connectException: An instance of an L{Exception}
494 implements(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
496 def __init__(self, listenException=None, connectException=None):
498 @param listenException: An instance of an L{Exception} to raise when any
499 C{listen} method is called.
501 @param connectException: An instance of an L{Exception} to raise when
502 any C{connect} method is called.
504 self._listenException = listenException
505 self._connectException = connectException
508 def adoptStreamPort(self, fileno, addressFamily, factory):
510 Fake L{IReactorSocket.adoptStreamPort}, that raises
511 L{self._listenException}.
513 raise self._listenException
516 def listenTCP(self, port, factory, backlog=50, interface=''):
518 Fake L{reactor.listenTCP}, that raises L{self._listenException}.
520 raise self._listenException
523 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
525 Fake L{reactor.connectTCP}, that raises L{self._connectException}.
527 raise self._connectException
530 def listenSSL(self, port, factory, contextFactory,
531 backlog=50, interface=''):
533 Fake L{reactor.listenSSL}, that raises L{self._listenException}.
535 raise self._listenException
538 def connectSSL(self, host, port, factory, contextFactory,
539 timeout=30, bindAddress=None):
541 Fake L{reactor.connectSSL}, that raises L{self._connectException}.
543 raise self._connectException
546 def listenUNIX(self, address, factory,
547 backlog=50, mode=0666, wantPID=0):
549 Fake L{reactor.listenUNIX}, that raises L{self._listenException}.
551 raise self._listenException
554 def connectUNIX(self, address, factory, timeout=30, checkPID=0):
556 Fake L{reactor.connectUNIX}, that raises L{self._connectException}.
558 raise self._connectException