Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_pcp.py
1 # -*- Python -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 __version__ = '$Revision: 1.5 $'[11:-2]
7
8 from StringIO import StringIO
9 from twisted.trial import unittest
10 from twisted.protocols import pcp
11
12 # Goal:
13
14 # Take a Protocol instance.  Own all outgoing data - anything that
15 # would go to p.transport.write.  Own all incoming data - anything
16 # that comes to p.dataReceived.
17
18 # I need:
19 # Something with the AbstractFileDescriptor interface.
20 # That is:
21 #  - acts as a Transport
22 #    - has a method write()
23 #    - which buffers
24 #  - acts as a Consumer
25 #    - has a registerProducer, unRegisterProducer
26 #    - tells the Producer to back off (pauseProducing) when its buffer is full.
27 #    - tells the Producer to resumeProducing when its buffer is not so full.
28 #  - acts as a Producer
29 #    - calls registerProducer
30 #    - calls write() on consumers
31 #    - honors requests to pause/resume producing
32 #    - honors stopProducing, and passes it along to upstream Producers
33
34
35 class DummyTransport:
36     """A dumb transport to wrap around."""
37
38     def __init__(self):
39         self._writes = []
40
41     def write(self, data):
42         self._writes.append(data)
43
44     def getvalue(self):
45         return ''.join(self._writes)
46
47 class DummyProducer:
48     resumed = False
49     stopped = False
50     paused = False
51
52     def __init__(self, consumer):
53         self.consumer = consumer
54
55     def resumeProducing(self):
56         self.resumed = True
57         self.paused = False
58
59     def pauseProducing(self):
60         self.paused = True
61
62     def stopProducing(self):
63         self.stopped = True
64
65
66 class DummyConsumer(DummyTransport):
67     producer = None
68     finished = False
69     unregistered = True
70
71     def registerProducer(self, producer, streaming):
72         self.producer = (producer, streaming)
73
74     def unregisterProducer(self):
75         self.unregistered = True
76
77     def finish(self):
78         self.finished = True
79
80 class TransportInterfaceTest(unittest.TestCase):
81     proxyClass = pcp.BasicProducerConsumerProxy
82
83     def setUp(self):
84         self.underlying = DummyConsumer()
85         self.transport = self.proxyClass(self.underlying)
86
87     def testWrite(self):
88         self.transport.write("some bytes")
89
90 class ConsumerInterfaceTest:
91     """Test ProducerConsumerProxy as a Consumer.
92
93     Normally we have ProducingServer -> ConsumingTransport.
94
95     If I am to go between (Server -> Shaper -> Transport), I have to
96     play the role of Consumer convincingly for the ProducingServer.
97     """
98
99     def setUp(self):
100         self.underlying = DummyConsumer()
101         self.consumer = self.proxyClass(self.underlying)
102         self.producer = DummyProducer(self.consumer)
103
104     def testRegisterPush(self):
105         self.consumer.registerProducer(self.producer, True)
106         ## Consumer should NOT have called PushProducer.resumeProducing
107         self.failIf(self.producer.resumed)
108
109     ## I'm I'm just a proxy, should I only do resumeProducing when
110     ## I get poked myself?
111     #def testRegisterPull(self):
112     #    self.consumer.registerProducer(self.producer, False)
113     #    ## Consumer SHOULD have called PushProducer.resumeProducing
114     #    self.failUnless(self.producer.resumed)
115
116     def testUnregister(self):
117         self.consumer.registerProducer(self.producer, False)
118         self.consumer.unregisterProducer()
119         # Now when the consumer would ordinarily want more data, it
120         # shouldn't ask producer for it.
121         # The most succinct way to trigger "want more data" is to proxy for
122         # a PullProducer and have someone ask me for data.
123         self.producer.resumed = False
124         self.consumer.resumeProducing()
125         self.failIf(self.producer.resumed)
126
127     def testFinish(self):
128         self.consumer.registerProducer(self.producer, False)
129         self.consumer.finish()
130         # I guess finish should behave like unregister?
131         self.producer.resumed = False
132         self.consumer.resumeProducing()
133         self.failIf(self.producer.resumed)
134
135
136 class ProducerInterfaceTest:
137     """Test ProducerConsumerProxy as a Producer.
138
139     Normally we have ProducingServer -> ConsumingTransport.
140
141     If I am to go between (Server -> Shaper -> Transport), I have to
142     play the role of Producer convincingly for the ConsumingTransport.
143     """
144
145     def setUp(self):
146         self.consumer = DummyConsumer()
147         self.producer = self.proxyClass(self.consumer)
148
149     def testRegistersProducer(self):
150         self.assertEqual(self.consumer.producer[0], self.producer)
151
152     def testPause(self):
153         self.producer.pauseProducing()
154         self.producer.write("yakkity yak")
155         self.failIf(self.consumer.getvalue(),
156                     "Paused producer should not have sent data.")
157
158     def testResume(self):
159         self.producer.pauseProducing()
160         self.producer.resumeProducing()
161         self.producer.write("yakkity yak")
162         self.assertEqual(self.consumer.getvalue(), "yakkity yak")
163
164     def testResumeNoEmptyWrite(self):
165         self.producer.pauseProducing()
166         self.producer.resumeProducing()
167         self.assertEqual(len(self.consumer._writes), 0,
168                              "Resume triggered an empty write.")
169
170     def testResumeBuffer(self):
171         self.producer.pauseProducing()
172         self.producer.write("buffer this")
173         self.producer.resumeProducing()
174         self.assertEqual(self.consumer.getvalue(), "buffer this")
175
176     def testStop(self):
177         self.producer.stopProducing()
178         self.producer.write("yakkity yak")
179         self.failIf(self.consumer.getvalue(),
180                     "Stopped producer should not have sent data.")
181
182
183 class PCP_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):
184     proxyClass = pcp.BasicProducerConsumerProxy
185
186 class PCPII_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):
187     proxyClass = pcp.ProducerConsumerProxy
188
189 class PCP_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):
190     proxyClass = pcp.BasicProducerConsumerProxy
191
192 class PCPII_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):
193     proxyClass = pcp.ProducerConsumerProxy
194
195 class ProducerProxyTest(unittest.TestCase):
196     """Producer methods on me should be relayed to the Producer I proxy.
197     """
198     proxyClass = pcp.BasicProducerConsumerProxy
199
200     def setUp(self):
201         self.proxy = self.proxyClass(None)
202         self.parentProducer = DummyProducer(self.proxy)
203         self.proxy.registerProducer(self.parentProducer, True)
204
205     def testStop(self):
206         self.proxy.stopProducing()
207         self.failUnless(self.parentProducer.stopped)
208
209
210 class ConsumerProxyTest(unittest.TestCase):
211     """Consumer methods on me should be relayed to the Consumer I proxy.
212     """
213     proxyClass = pcp.BasicProducerConsumerProxy
214
215     def setUp(self):
216         self.underlying = DummyConsumer()
217         self.consumer = self.proxyClass(self.underlying)
218
219     def testWrite(self):
220         # NOTE: This test only valid for streaming (Push) systems.
221         self.consumer.write("some bytes")
222         self.assertEqual(self.underlying.getvalue(), "some bytes")
223
224     def testFinish(self):
225         self.consumer.finish()
226         self.failUnless(self.underlying.finished)
227
228     def testUnregister(self):
229         self.consumer.unregisterProducer()
230         self.failUnless(self.underlying.unregistered)
231
232
233 class PullProducerTest:
234     def setUp(self):
235         self.underlying = DummyConsumer()
236         self.proxy = self.proxyClass(self.underlying)
237         self.parentProducer = DummyProducer(self.proxy)
238         self.proxy.registerProducer(self.parentProducer, True)
239
240     def testHoldWrites(self):
241         self.proxy.write("hello")
242         # Consumer should get no data before it says resumeProducing.
243         self.failIf(self.underlying.getvalue(),
244                     "Pulling Consumer got data before it pulled.")
245
246     def testPull(self):
247         self.proxy.write("hello")
248         self.proxy.resumeProducing()
249         self.assertEqual(self.underlying.getvalue(), "hello")
250
251     def testMergeWrites(self):
252         self.proxy.write("hello ")
253         self.proxy.write("sunshine")
254         self.proxy.resumeProducing()
255         nwrites = len(self.underlying._writes)
256         self.assertEqual(nwrites, 1, "Pull resulted in %d writes instead "
257                              "of 1." % (nwrites,))
258         self.assertEqual(self.underlying.getvalue(), "hello sunshine")
259
260
261     def testLateWrite(self):
262         # consumer sends its initial pull before we have data
263         self.proxy.resumeProducing()
264         self.proxy.write("data")
265         # This data should answer that pull request.
266         self.assertEqual(self.underlying.getvalue(), "data")
267
268 class PCP_PullProducerTest(PullProducerTest, unittest.TestCase):
269     class proxyClass(pcp.BasicProducerConsumerProxy):
270         iAmStreaming = False
271
272 class PCPII_PullProducerTest(PullProducerTest, unittest.TestCase):
273     class proxyClass(pcp.ProducerConsumerProxy):
274         iAmStreaming = False
275
276 # Buffering!
277
278 class BufferedConsumerTest(unittest.TestCase):
279     """As a consumer, ask the producer to pause after too much data."""
280
281     proxyClass = pcp.ProducerConsumerProxy
282
283     def setUp(self):
284         self.underlying = DummyConsumer()
285         self.proxy = self.proxyClass(self.underlying)
286         self.proxy.bufferSize = 100
287
288         self.parentProducer = DummyProducer(self.proxy)
289         self.proxy.registerProducer(self.parentProducer, True)
290
291     def testRegisterPull(self):
292         self.proxy.registerProducer(self.parentProducer, False)
293         ## Consumer SHOULD have called PushProducer.resumeProducing
294         self.failUnless(self.parentProducer.resumed)
295
296     def testPauseIntercept(self):
297         self.proxy.pauseProducing()
298         self.failIf(self.parentProducer.paused)
299
300     def testResumeIntercept(self):
301         self.proxy.pauseProducing()
302         self.proxy.resumeProducing()
303         # With a streaming producer, just because the proxy was resumed is
304         # not necessarily a reason to resume the parent producer.  The state
305         # of the buffer should decide that.
306         self.failIf(self.parentProducer.resumed)
307
308     def testTriggerPause(self):
309         """Make sure I say \"when.\""""
310
311         # Pause the proxy so data sent to it builds up in its buffer.
312         self.proxy.pauseProducing()
313         self.failIf(self.parentProducer.paused, "don't pause yet")
314         self.proxy.write("x" * 51)
315         self.failIf(self.parentProducer.paused, "don't pause yet")
316         self.proxy.write("x" * 51)
317         self.failUnless(self.parentProducer.paused)
318
319     def testTriggerResume(self):
320         """Make sure I resumeProducing when my buffer empties."""
321         self.proxy.pauseProducing()
322         self.proxy.write("x" * 102)
323         self.failUnless(self.parentProducer.paused, "should be paused")
324         self.proxy.resumeProducing()
325         # Resuming should have emptied my buffer, so I should tell my
326         # parent to resume too.
327         self.failIf(self.parentProducer.paused,
328                     "Producer should have resumed.")
329         self.failIf(self.proxy.producerPaused)
330
331 class BufferedPullTests(unittest.TestCase):
332     class proxyClass(pcp.ProducerConsumerProxy):
333         iAmStreaming = False
334
335         def _writeSomeData(self, data):
336             pcp.ProducerConsumerProxy._writeSomeData(self, data[:100])
337             return min(len(data), 100)
338
339     def setUp(self):
340         self.underlying = DummyConsumer()
341         self.proxy = self.proxyClass(self.underlying)
342         self.proxy.bufferSize = 100
343
344         self.parentProducer = DummyProducer(self.proxy)
345         self.proxy.registerProducer(self.parentProducer, False)
346
347     def testResumePull(self):
348         # If proxy has no data to send on resumeProducing, it had better pull
349         # some from its PullProducer.
350         self.parentProducer.resumed = False
351         self.proxy.resumeProducing()
352         self.failUnless(self.parentProducer.resumed)
353
354     def testLateWriteBuffering(self):
355         # consumer sends its initial pull before we have data
356         self.proxy.resumeProducing()
357         self.proxy.write("datum" * 21)
358         # This data should answer that pull request.
359         self.assertEqual(self.underlying.getvalue(), "datum" * 20)
360         # but there should be some left over
361         self.assertEqual(self.proxy._buffer, ["datum"])
362
363
364 # TODO:
365 #  test that web request finishing bug (when we weren't proxying
366 #    unregisterProducer but were proxying finish, web file transfers
367 #    would hang on the last block.)
368 #  test what happens if writeSomeBytes decided to write zero bytes.