Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / protocols / pcp.py
1 # -*- test-case-name: twisted.test.test_pcp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Producer-Consumer Proxy.
7 """
8
9 from zope.interface import implements
10
11 from twisted.internet import interfaces
12
13
14 class BasicProducerConsumerProxy:
15     """
16     I can act as a man in the middle between any Producer and Consumer.
17
18     @ivar producer: the Producer I subscribe to.
19     @type producer: L{IProducer<interfaces.IProducer>}
20     @ivar consumer: the Consumer I publish to.
21     @type consumer: L{IConsumer<interfaces.IConsumer>}
22     @ivar paused: As a Producer, am I paused?
23     @type paused: bool
24     """
25     implements(interfaces.IProducer, interfaces.IConsumer)
26
27     consumer = None
28     producer = None
29     producerIsStreaming = None
30     iAmStreaming = True
31     outstandingPull = False
32     paused = False
33     stopped = False
34
35     def __init__(self, consumer):
36         self._buffer = []
37         if consumer is not None:
38             self.consumer = consumer
39             consumer.registerProducer(self, self.iAmStreaming)
40
41     # Producer methods:
42
43     def pauseProducing(self):
44         self.paused = True
45         if self.producer:
46             self.producer.pauseProducing()
47
48     def resumeProducing(self):
49         self.paused = False
50         if self._buffer:
51             # TODO: Check to see if consumer supports writeSeq.
52             self.consumer.write(''.join(self._buffer))
53             self._buffer[:] = []
54         else:
55             if not self.iAmStreaming:
56                 self.outstandingPull = True
57
58         if self.producer is not None:
59             self.producer.resumeProducing()
60
61     def stopProducing(self):
62         if self.producer is not None:
63             self.producer.stopProducing()
64         if self.consumer is not None:
65             del self.consumer
66
67     # Consumer methods:
68
69     def write(self, data):
70         if self.paused or (not self.iAmStreaming and not self.outstandingPull):
71             # We could use that fifo queue here.
72             self._buffer.append(data)
73
74         elif self.consumer is not None:
75             self.consumer.write(data)
76             self.outstandingPull = False
77
78     def finish(self):
79         if self.consumer is not None:
80             self.consumer.finish()
81         self.unregisterProducer()
82
83     def registerProducer(self, producer, streaming):
84         self.producer = producer
85         self.producerIsStreaming = streaming
86
87     def unregisterProducer(self):
88         if self.producer is not None:
89             del self.producer
90             del self.producerIsStreaming
91         if self.consumer:
92             self.consumer.unregisterProducer()
93
94     def __repr__(self):
95         return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
96
97
98 class ProducerConsumerProxy(BasicProducerConsumerProxy):
99     """ProducerConsumerProxy with a finite buffer.
100
101     When my buffer fills up, I have my parent Producer pause until my buffer
102     has room in it again.
103     """
104     # Copies much from abstract.FileDescriptor
105     bufferSize = 2**2**2**2
106
107     producerPaused = False
108     unregistered = False
109
110     def pauseProducing(self):
111         # Does *not* call up to ProducerConsumerProxy to relay the pause
112         # message through to my parent Producer.
113         self.paused = True
114
115     def resumeProducing(self):
116         self.paused = False
117         if self._buffer:
118             data = ''.join(self._buffer)
119             bytesSent = self._writeSomeData(data)
120             if bytesSent < len(data):
121                 unsent = data[bytesSent:]
122                 assert not self.iAmStreaming, (
123                     "Streaming producer did not write all its data.")
124                 self._buffer[:] = [unsent]
125             else:
126                 self._buffer[:] = []
127         else:
128             bytesSent = 0
129
130         if (self.unregistered and bytesSent and not self._buffer and
131             self.consumer is not None):
132             self.consumer.unregisterProducer()
133
134         if not self.iAmStreaming:
135             self.outstandingPull = not bytesSent
136
137         if self.producer is not None:
138             bytesBuffered = sum([len(s) for s in self._buffer])
139             # TODO: You can see here the potential for high and low
140             # watermarks, where bufferSize would be the high mark when we
141             # ask the upstream producer to pause, and we wouldn't have
142             # it resume again until it hit the low mark.  Or if producer
143             # is Pull, maybe we'd like to pull from it as much as necessary
144             # to keep our buffer full to the low mark, so we're never caught
145             # without something to send.
146             if self.producerPaused and (bytesBuffered < self.bufferSize):
147                 # Now that our buffer is empty,
148                 self.producerPaused = False
149                 self.producer.resumeProducing()
150             elif self.outstandingPull:
151                 # I did not have any data to write in response to a pull,
152                 # so I'd better pull some myself.
153                 self.producer.resumeProducing()
154
155     def write(self, data):
156         if self.paused or (not self.iAmStreaming and not self.outstandingPull):
157             # We could use that fifo queue here.
158             self._buffer.append(data)
159
160         elif self.consumer is not None:
161             assert not self._buffer, (
162                 "Writing fresh data to consumer before my buffer is empty!")
163             # I'm going to use _writeSomeData here so that there is only one
164             # path to self.consumer.write.  But it doesn't actually make sense,
165             # if I am streaming, for some data to not be all data.  But maybe I
166             # am not streaming, but I am writing here anyway, because there was
167             # an earlier request for data which was not answered.
168             bytesSent = self._writeSomeData(data)
169             self.outstandingPull = False
170             if not bytesSent == len(data):
171                 assert not self.iAmStreaming, (
172                     "Streaming producer did not write all its data.")
173                 self._buffer.append(data[bytesSent:])
174
175         if (self.producer is not None) and self.producerIsStreaming:
176             bytesBuffered = sum([len(s) for s in self._buffer])
177             if bytesBuffered >= self.bufferSize:
178
179                 self.producer.pauseProducing()
180                 self.producerPaused = True
181
182     def registerProducer(self, producer, streaming):
183         self.unregistered = False
184         BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
185         if not streaming:
186             producer.resumeProducing()
187
188     def unregisterProducer(self):
189         if self.producer is not None:
190             del self.producer
191             del self.producerIsStreaming
192         self.unregistered = True
193         if self.consumer and not self._buffer:
194             self.consumer.unregisterProducer()
195
196     def _writeSomeData(self, data):
197         """Write as much of this data as possible.
198
199         @returns: The number of bytes written.
200         """
201         if self.consumer is None:
202             return 0
203         self.consumer.write(data)
204         return len(data)