Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / stdio_test_producer.py
1 # -*- test-case-name: twisted.test.test_stdio.StandardInputOutputTestCase.test_producer -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Main program for the child process run by
7 L{twisted.test.test_stdio.StandardInputOutputTestCase.test_producer} to test
8 that process transports implement IProducer properly.
9 """
10
11 import sys, _preamble
12
13 from twisted.internet import stdio, protocol
14 from twisted.python import log, reflect
15
16 class ProducerChild(protocol.Protocol):
17     _paused = False
18     buf = ''
19
20     def connectionLost(self, reason):
21         log.msg("*****OVER*****")
22         reactor.callLater(1, reactor.stop)
23         # reactor.stop()
24
25
26     def dataReceived(self, bytes):
27         self.buf += bytes
28         if self._paused:
29             log.startLogging(sys.stderr)
30             log.msg("dataReceived while transport paused!")
31             self.transport.loseConnection()
32         else:
33             self.transport.write(bytes)
34             if self.buf.endswith('\n0\n'):
35                 self.transport.loseConnection()
36             else:
37                 self.pause()
38
39
40     def pause(self):
41         self._paused = True
42         self.transport.pauseProducing()
43         reactor.callLater(0.01, self.unpause)
44
45
46     def unpause(self):
47         self._paused = False
48         self.transport.resumeProducing()
49
50
51 if __name__ == '__main__':
52     reflect.namedAny(sys.argv[1]).install()
53     from twisted.internet import reactor
54     stdio.StandardIO(ProducerChild())
55     reactor.run()