1 # -*- test-case-name: twisted.test.test_stdio.StandardInputOutputTestCase.test_readConnectionLost -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Main program for the child process run by
7 L{twisted.test.test_stdio.StandardInputOutputTestCase.test_readConnectionLost}
8 to test that IHalfCloseableProtocol.readConnectionLost works for process
14 from zope.interface import implements
16 from twisted.internet.interfaces import IHalfCloseableProtocol
17 from twisted.internet import stdio, protocol
18 from twisted.python import reflect, log
21 class HalfCloseProtocol(protocol.Protocol):
23 A protocol to hook up to stdio and observe its transport being
24 half-closed. If all goes as expected, C{exitCode} will be set to C{0};
25 otherwise it will be set to C{1} to indicate failure.
27 implements(IHalfCloseableProtocol)
31 def connectionMade(self):
33 Signal the parent process that we're ready.
35 self.transport.write("x")
38 def readConnectionLost(self):
40 This is the desired event. Once it has happened, stop the reactor so
41 the process will exit.
47 def connectionLost(self, reason):
49 This may only be invoked after C{readConnectionLost}. If it happens
50 otherwise, mark it as an error and shut down.
52 if self.exitCode is None:
54 log.err(reason, "Unexpected call to connectionLost")
59 if __name__ == '__main__':
60 reflect.namedAny(sys.argv[1]).install()
61 log.startLogging(file(sys.argv[2], 'w'))
62 from twisted.internet import reactor
63 protocol = HalfCloseProtocol()
64 stdio.StandardIO(protocol)
66 sys.exit(protocol.exitCode)