Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / protocols / loopback.py
1 # -*- test-case-name: twisted.test.test_loopback -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Testing support for protocols -- loopback between client and server.
7 """
8
9 # system imports
10 import tempfile
11 from zope.interface import implements
12
13 # Twisted Imports
14 from twisted.protocols import policies
15 from twisted.internet import interfaces, protocol, main, defer
16 from twisted.internet.task import deferLater
17 from twisted.python import failure
18 from twisted.internet.interfaces import IAddress
19
20
21 class _LoopbackQueue(object):
22     """
23     Trivial wrapper around a list to give it an interface like a queue, which
24     the addition of also sending notifications by way of a Deferred whenever
25     the list has something added to it.
26     """
27
28     _notificationDeferred = None
29     disconnect = False
30
31     def __init__(self):
32         self._queue = []
33
34
35     def put(self, v):
36         self._queue.append(v)
37         if self._notificationDeferred is not None:
38             d, self._notificationDeferred = self._notificationDeferred, None
39             d.callback(None)
40
41
42     def __nonzero__(self):
43         return bool(self._queue)
44
45
46     def get(self):
47         return self._queue.pop(0)
48
49
50
51 class _LoopbackAddress(object):
52     implements(IAddress)
53
54
55 class _LoopbackTransport(object):
56     implements(interfaces.ITransport, interfaces.IConsumer)
57
58     disconnecting = False
59     producer = None
60
61     # ITransport
62     def __init__(self, q):
63         self.q = q
64
65     def write(self, bytes):
66         self.q.put(bytes)
67
68     def writeSequence(self, iovec):
69         self.q.put(''.join(iovec))
70
71     def loseConnection(self):
72         self.q.disconnect = True
73         self.q.put(None)
74
75     def getPeer(self):
76         return _LoopbackAddress()
77
78     def getHost(self):
79         return _LoopbackAddress()
80
81     # IConsumer
82     def registerProducer(self, producer, streaming):
83         assert self.producer is None
84         self.producer = producer
85         self.streamingProducer = streaming
86         self._pollProducer()
87
88     def unregisterProducer(self):
89         assert self.producer is not None
90         self.producer = None
91
92     def _pollProducer(self):
93         if self.producer is not None and not self.streamingProducer:
94             self.producer.resumeProducing()
95
96
97
98 def identityPumpPolicy(queue, target):
99     """
100     L{identityPumpPolicy} is a policy which delivers each chunk of data written
101     to the given queue as-is to the target.
102
103     This isn't a particularly realistic policy.
104
105     @see: L{loopbackAsync}
106     """
107     while queue:
108         bytes = queue.get()
109         if bytes is None:
110             break
111         target.dataReceived(bytes)
112
113
114
115 def collapsingPumpPolicy(queue, target):
116     """
117     L{collapsingPumpPolicy} is a policy which collapses all outstanding chunks
118     into a single string and delivers it to the target.
119
120     @see: L{loopbackAsync}
121     """
122     bytes = []
123     while queue:
124         chunk = queue.get()
125         if chunk is None:
126             break
127         bytes.append(chunk)
128     if bytes:
129         target.dataReceived(''.join(bytes))
130
131
132
133 def loopbackAsync(server, client, pumpPolicy=identityPumpPolicy):
134     """
135     Establish a connection between C{server} and C{client} then transfer data
136     between them until the connection is closed. This is often useful for
137     testing a protocol.
138
139     @param server: The protocol instance representing the server-side of this
140         connection.
141
142     @param client: The protocol instance representing the client-side of this
143         connection.
144
145     @param pumpPolicy: When either C{server} or C{client} writes to its
146         transport, the string passed in is added to a queue of data for the
147         other protocol.  Eventually, C{pumpPolicy} will be called with one such
148         queue and the corresponding protocol object.  The pump policy callable
149         is responsible for emptying the queue and passing the strings it
150         contains to the given protocol's C{dataReceived} method.  The signature
151         of C{pumpPolicy} is C{(queue, protocol)}.  C{queue} is an object with a
152         C{get} method which will return the next string written to the
153         transport, or C{None} if the transport has been disconnected, and which
154         evaluates to C{True} if and only if there are more items to be
155         retrieved via C{get}.
156
157     @return: A L{Deferred} which fires when the connection has been closed and
158         both sides have received notification of this.
159     """
160     serverToClient = _LoopbackQueue()
161     clientToServer = _LoopbackQueue()
162
163     server.makeConnection(_LoopbackTransport(serverToClient))
164     client.makeConnection(_LoopbackTransport(clientToServer))
165
166     return _loopbackAsyncBody(
167         server, serverToClient, client, clientToServer, pumpPolicy)
168
169
170
171 def _loopbackAsyncBody(server, serverToClient, client, clientToServer,
172                        pumpPolicy):
173     """
174     Transfer bytes from the output queue of each protocol to the input of the other.
175
176     @param server: The protocol instance representing the server-side of this
177     connection.
178
179     @param serverToClient: The L{_LoopbackQueue} holding the server's output.
180
181     @param client: The protocol instance representing the client-side of this
182     connection.
183
184     @param clientToServer: The L{_LoopbackQueue} holding the client's output.
185
186     @param pumpPolicy: See L{loopbackAsync}.
187
188     @return: A L{Deferred} which fires when the connection has been closed and
189         both sides have received notification of this.
190     """
191     def pump(source, q, target):
192         sent = False
193         if q:
194             pumpPolicy(q, target)
195             sent = True
196         if sent and not q:
197             # A write buffer has now been emptied.  Give any producer on that
198             # side an opportunity to produce more data.
199             source.transport._pollProducer()
200
201         return sent
202
203     while 1:
204         disconnect = clientSent = serverSent = False
205
206         # Deliver the data which has been written.
207         serverSent = pump(server, serverToClient, client)
208         clientSent = pump(client, clientToServer, server)
209
210         if not clientSent and not serverSent:
211             # Neither side wrote any data.  Wait for some new data to be added
212             # before trying to do anything further.
213             d = defer.Deferred()
214             clientToServer._notificationDeferred = d
215             serverToClient._notificationDeferred = d
216             d.addCallback(
217                 _loopbackAsyncContinue,
218                 server, serverToClient, client, clientToServer, pumpPolicy)
219             return d
220         if serverToClient.disconnect:
221             # The server wants to drop the connection.  Flush any remaining
222             # data it has.
223             disconnect = True
224             pump(server, serverToClient, client)
225         elif clientToServer.disconnect:
226             # The client wants to drop the connection.  Flush any remaining
227             # data it has.
228             disconnect = True
229             pump(client, clientToServer, server)
230         if disconnect:
231             # Someone wanted to disconnect, so okay, the connection is gone.
232             server.connectionLost(failure.Failure(main.CONNECTION_DONE))
233             client.connectionLost(failure.Failure(main.CONNECTION_DONE))
234             return defer.succeed(None)
235
236
237
238 def _loopbackAsyncContinue(ignored, server, serverToClient, client,
239                            clientToServer, pumpPolicy):
240     # Clear the Deferred from each message queue, since it has already fired
241     # and cannot be used again.
242     clientToServer._notificationDeferred = None
243     serverToClient._notificationDeferred = None
244
245     # Schedule some more byte-pushing to happen.  This isn't done
246     # synchronously because no actual transport can re-enter dataReceived as
247     # a result of calling write, and doing this synchronously could result
248     # in that.
249     from twisted.internet import reactor
250     return deferLater(
251         reactor, 0,
252         _loopbackAsyncBody,
253         server, serverToClient, client, clientToServer, pumpPolicy)
254
255
256
257 class LoopbackRelay:
258
259     implements(interfaces.ITransport, interfaces.IConsumer)
260
261     buffer = ''
262     shouldLose = 0
263     disconnecting = 0
264     producer = None
265
266     def __init__(self, target, logFile=None):
267         self.target = target
268         self.logFile = logFile
269
270     def write(self, data):
271         self.buffer = self.buffer + data
272         if self.logFile:
273             self.logFile.write("loopback writing %s\n" % repr(data))
274
275     def writeSequence(self, iovec):
276         self.write("".join(iovec))
277
278     def clearBuffer(self):
279         if self.shouldLose == -1:
280             return
281
282         if self.producer:
283             self.producer.resumeProducing()
284         if self.buffer:
285             if self.logFile:
286                 self.logFile.write("loopback receiving %s\n" % repr(self.buffer))
287             buffer = self.buffer
288             self.buffer = ''
289             self.target.dataReceived(buffer)
290         if self.shouldLose == 1:
291             self.shouldLose = -1
292             self.target.connectionLost(failure.Failure(main.CONNECTION_DONE))
293
294     def loseConnection(self):
295         if self.shouldLose != -1:
296             self.shouldLose = 1
297
298     def getHost(self):
299         return 'loopback'
300
301     def getPeer(self):
302         return 'loopback'
303
304     def registerProducer(self, producer, streaming):
305         self.producer = producer
306
307     def unregisterProducer(self):
308         self.producer = None
309
310     def logPrefix(self):
311         return 'Loopback(%r)' % (self.target.__class__.__name__,)
312
313
314
315 class LoopbackClientFactory(protocol.ClientFactory):
316
317     def __init__(self, protocol):
318         self.disconnected = 0
319         self.deferred = defer.Deferred()
320         self.protocol = protocol
321
322     def buildProtocol(self, addr):
323         return self.protocol
324
325     def clientConnectionLost(self, connector, reason):
326         self.disconnected = 1
327         self.deferred.callback(None)
328
329
330 class _FireOnClose(policies.ProtocolWrapper):
331     def __init__(self, protocol, factory):
332         policies.ProtocolWrapper.__init__(self, protocol, factory)
333         self.deferred = defer.Deferred()
334
335     def connectionLost(self, reason):
336         policies.ProtocolWrapper.connectionLost(self, reason)
337         self.deferred.callback(None)
338
339
340 def loopbackTCP(server, client, port=0, noisy=True):
341     """Run session between server and client protocol instances over TCP."""
342     from twisted.internet import reactor
343     f = policies.WrappingFactory(protocol.Factory())
344     serverWrapper = _FireOnClose(f, server)
345     f.noisy = noisy
346     f.buildProtocol = lambda addr: serverWrapper
347     serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
348     clientF = LoopbackClientFactory(client)
349     clientF.noisy = noisy
350     reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
351     d = clientF.deferred
352     d.addCallback(lambda x: serverWrapper.deferred)
353     d.addCallback(lambda x: serverPort.stopListening())
354     return d
355
356
357 def loopbackUNIX(server, client, noisy=True):
358     """Run session between server and client protocol instances over UNIX socket."""
359     path = tempfile.mktemp()
360     from twisted.internet import reactor
361     f = policies.WrappingFactory(protocol.Factory())
362     serverWrapper = _FireOnClose(f, server)
363     f.noisy = noisy
364     f.buildProtocol = lambda addr: serverWrapper
365     serverPort = reactor.listenUNIX(path, f)
366     clientF = LoopbackClientFactory(client)
367     clientF.noisy = noisy
368     reactor.connectUNIX(path, clientF)
369     d = clientF.deferred
370     d.addCallback(lambda x: serverWrapper.deferred)
371     d.addCallback(lambda x: serverPort.stopListening())
372     return d