1 # -*- test-case-name: twisted.test.test_loopback -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Testing support for protocols -- loopback between client and server.
11 from zope.interface import implements
14 from twisted.protocols import policies
15 from twisted.internet import interfaces, protocol, main, defer
16 from twisted.internet.task import deferLater
17 from twisted.python import failure
18 from twisted.internet.interfaces import IAddress
21 class _LoopbackQueue(object):
23 Trivial wrapper around a list to give it an interface like a queue, which
24 the addition of also sending notifications by way of a Deferred whenever
25 the list has something added to it.
28 _notificationDeferred = None
37 if self._notificationDeferred is not None:
38 d, self._notificationDeferred = self._notificationDeferred, None
42 def __nonzero__(self):
43 return bool(self._queue)
47 return self._queue.pop(0)
51 class _LoopbackAddress(object):
55 class _LoopbackTransport(object):
56 implements(interfaces.ITransport, interfaces.IConsumer)
62 def __init__(self, q):
65 def write(self, bytes):
68 def writeSequence(self, iovec):
69 self.q.put(''.join(iovec))
71 def loseConnection(self):
72 self.q.disconnect = True
76 return _LoopbackAddress()
79 return _LoopbackAddress()
82 def registerProducer(self, producer, streaming):
83 assert self.producer is None
84 self.producer = producer
85 self.streamingProducer = streaming
88 def unregisterProducer(self):
89 assert self.producer is not None
92 def _pollProducer(self):
93 if self.producer is not None and not self.streamingProducer:
94 self.producer.resumeProducing()
98 def identityPumpPolicy(queue, target):
100 L{identityPumpPolicy} is a policy which delivers each chunk of data written
101 to the given queue as-is to the target.
103 This isn't a particularly realistic policy.
105 @see: L{loopbackAsync}
111 target.dataReceived(bytes)
115 def collapsingPumpPolicy(queue, target):
117 L{collapsingPumpPolicy} is a policy which collapses all outstanding chunks
118 into a single string and delivers it to the target.
120 @see: L{loopbackAsync}
129 target.dataReceived(''.join(bytes))
133 def loopbackAsync(server, client, pumpPolicy=identityPumpPolicy):
135 Establish a connection between C{server} and C{client} then transfer data
136 between them until the connection is closed. This is often useful for
139 @param server: The protocol instance representing the server-side of this
142 @param client: The protocol instance representing the client-side of this
145 @param pumpPolicy: When either C{server} or C{client} writes to its
146 transport, the string passed in is added to a queue of data for the
147 other protocol. Eventually, C{pumpPolicy} will be called with one such
148 queue and the corresponding protocol object. The pump policy callable
149 is responsible for emptying the queue and passing the strings it
150 contains to the given protocol's C{dataReceived} method. The signature
151 of C{pumpPolicy} is C{(queue, protocol)}. C{queue} is an object with a
152 C{get} method which will return the next string written to the
153 transport, or C{None} if the transport has been disconnected, and which
154 evaluates to C{True} if and only if there are more items to be
155 retrieved via C{get}.
157 @return: A L{Deferred} which fires when the connection has been closed and
158 both sides have received notification of this.
160 serverToClient = _LoopbackQueue()
161 clientToServer = _LoopbackQueue()
163 server.makeConnection(_LoopbackTransport(serverToClient))
164 client.makeConnection(_LoopbackTransport(clientToServer))
166 return _loopbackAsyncBody(
167 server, serverToClient, client, clientToServer, pumpPolicy)
171 def _loopbackAsyncBody(server, serverToClient, client, clientToServer,
174 Transfer bytes from the output queue of each protocol to the input of the other.
176 @param server: The protocol instance representing the server-side of this
179 @param serverToClient: The L{_LoopbackQueue} holding the server's output.
181 @param client: The protocol instance representing the client-side of this
184 @param clientToServer: The L{_LoopbackQueue} holding the client's output.
186 @param pumpPolicy: See L{loopbackAsync}.
188 @return: A L{Deferred} which fires when the connection has been closed and
189 both sides have received notification of this.
191 def pump(source, q, target):
194 pumpPolicy(q, target)
197 # A write buffer has now been emptied. Give any producer on that
198 # side an opportunity to produce more data.
199 source.transport._pollProducer()
204 disconnect = clientSent = serverSent = False
206 # Deliver the data which has been written.
207 serverSent = pump(server, serverToClient, client)
208 clientSent = pump(client, clientToServer, server)
210 if not clientSent and not serverSent:
211 # Neither side wrote any data. Wait for some new data to be added
212 # before trying to do anything further.
214 clientToServer._notificationDeferred = d
215 serverToClient._notificationDeferred = d
217 _loopbackAsyncContinue,
218 server, serverToClient, client, clientToServer, pumpPolicy)
220 if serverToClient.disconnect:
221 # The server wants to drop the connection. Flush any remaining
224 pump(server, serverToClient, client)
225 elif clientToServer.disconnect:
226 # The client wants to drop the connection. Flush any remaining
229 pump(client, clientToServer, server)
231 # Someone wanted to disconnect, so okay, the connection is gone.
232 server.connectionLost(failure.Failure(main.CONNECTION_DONE))
233 client.connectionLost(failure.Failure(main.CONNECTION_DONE))
234 return defer.succeed(None)
238 def _loopbackAsyncContinue(ignored, server, serverToClient, client,
239 clientToServer, pumpPolicy):
240 # Clear the Deferred from each message queue, since it has already fired
241 # and cannot be used again.
242 clientToServer._notificationDeferred = None
243 serverToClient._notificationDeferred = None
245 # Schedule some more byte-pushing to happen. This isn't done
246 # synchronously because no actual transport can re-enter dataReceived as
247 # a result of calling write, and doing this synchronously could result
249 from twisted.internet import reactor
253 server, serverToClient, client, clientToServer, pumpPolicy)
259 implements(interfaces.ITransport, interfaces.IConsumer)
266 def __init__(self, target, logFile=None):
268 self.logFile = logFile
270 def write(self, data):
271 self.buffer = self.buffer + data
273 self.logFile.write("loopback writing %s\n" % repr(data))
275 def writeSequence(self, iovec):
276 self.write("".join(iovec))
278 def clearBuffer(self):
279 if self.shouldLose == -1:
283 self.producer.resumeProducing()
286 self.logFile.write("loopback receiving %s\n" % repr(self.buffer))
289 self.target.dataReceived(buffer)
290 if self.shouldLose == 1:
292 self.target.connectionLost(failure.Failure(main.CONNECTION_DONE))
294 def loseConnection(self):
295 if self.shouldLose != -1:
304 def registerProducer(self, producer, streaming):
305 self.producer = producer
307 def unregisterProducer(self):
311 return 'Loopback(%r)' % (self.target.__class__.__name__,)
315 class LoopbackClientFactory(protocol.ClientFactory):
317 def __init__(self, protocol):
318 self.disconnected = 0
319 self.deferred = defer.Deferred()
320 self.protocol = protocol
322 def buildProtocol(self, addr):
325 def clientConnectionLost(self, connector, reason):
326 self.disconnected = 1
327 self.deferred.callback(None)
330 class _FireOnClose(policies.ProtocolWrapper):
331 def __init__(self, protocol, factory):
332 policies.ProtocolWrapper.__init__(self, protocol, factory)
333 self.deferred = defer.Deferred()
335 def connectionLost(self, reason):
336 policies.ProtocolWrapper.connectionLost(self, reason)
337 self.deferred.callback(None)
340 def loopbackTCP(server, client, port=0, noisy=True):
341 """Run session between server and client protocol instances over TCP."""
342 from twisted.internet import reactor
343 f = policies.WrappingFactory(protocol.Factory())
344 serverWrapper = _FireOnClose(f, server)
346 f.buildProtocol = lambda addr: serverWrapper
347 serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
348 clientF = LoopbackClientFactory(client)
349 clientF.noisy = noisy
350 reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
352 d.addCallback(lambda x: serverWrapper.deferred)
353 d.addCallback(lambda x: serverPort.stopListening())
357 def loopbackUNIX(server, client, noisy=True):
358 """Run session between server and client protocol instances over UNIX socket."""
359 path = tempfile.mktemp()
360 from twisted.internet import reactor
361 f = policies.WrappingFactory(protocol.Factory())
362 serverWrapper = _FireOnClose(f, server)
364 f.buildProtocol = lambda addr: serverWrapper
365 serverPort = reactor.listenUNIX(path, f)
366 clientF = LoopbackClientFactory(client)
367 clientF.noisy = noisy
368 reactor.connectUNIX(path, clientF)
370 d.addCallback(lambda x: serverWrapper.deferred)
371 d.addCallback(lambda x: serverPort.stopListening())