1 # -*- test-case-name: twisted.test.test_stdio -*-
4 Windows-specific implementation of the L{twisted.internet.stdio} interface.
10 from zope.interface import implements
12 from twisted.internet.interfaces import IHalfCloseableProtocol, ITransport, IAddress
13 from twisted.internet.interfaces import IConsumer, IPushProducer
15 from twisted.internet import _pollingfile, main
16 from twisted.python.failure import Failure
19 class Win32PipeAddress(object):
24 class StandardIO(_pollingfile._PollingTimer):
26 implements(ITransport,
33 def __init__(self, proto):
35 Start talking to standard IO with the given protocol.
37 Also, put it stdin/stdout/stderr into binary mode.
39 from twisted.internet import reactor
41 for stdfd in range(0, 1, 2):
42 msvcrt.setmode(stdfd, os.O_BINARY)
44 _pollingfile._PollingTimer.__init__(self, reactor)
47 hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
48 hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
50 self.stdin = _pollingfile._PollableReadPipe(
51 hstdin, self.dataReceived, self.readConnectionLost)
53 self.stdout = _pollingfile._PollableWritePipe(
54 hstdout, self.writeConnectionLost)
56 self._addPollableResource(self.stdin)
57 self._addPollableResource(self.stdout)
59 self.proto.makeConnection(self)
61 def dataReceived(self, data):
62 self.proto.dataReceived(data)
64 def readConnectionLost(self):
65 if IHalfCloseableProtocol.providedBy(self.proto):
66 self.proto.readConnectionLost()
69 def writeConnectionLost(self):
70 if IHalfCloseableProtocol.providedBy(self.proto):
71 self.proto.writeConnectionLost()
76 def checkConnLost(self):
78 if self.connsLost >= 2:
79 self.disconnecting = True
80 self.disconnected = True
81 self.proto.connectionLost(Failure(main.CONNECTION_DONE))
85 def write(self, data):
86 self.stdout.write(data)
88 def writeSequence(self, seq):
89 self.stdout.write(''.join(seq))
91 def loseConnection(self):
92 self.disconnecting = True
97 return Win32PipeAddress()
100 return Win32PipeAddress()
104 def registerProducer(self, producer, streaming):
105 return self.stdout.registerProducer(producer, streaming)
107 def unregisterProducer(self):
108 return self.stdout.unregisterProducer()
114 def stopProducing(self):
115 self.stdin.stopProducing()
119 def pauseProducing(self):
120 self.stdin.pauseProducing()
122 def resumeProducing(self):
123 self.stdin.resumeProducing()