Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / testutils.py
1 from cStringIO import StringIO
2 from twisted.internet.protocol import FileWrapper
3
4 class IOPump:
5     """Utility to pump data between clients and servers for protocol testing.
6
7     Perhaps this is a utility worthy of being in protocol.py?
8     """
9     def __init__(self, client, server, clientIO, serverIO):
10         self.client = client
11         self.server = server
12         self.clientIO = clientIO
13         self.serverIO = serverIO
14
15     def flush(self):
16         "Pump until there is no more input or output."
17         while self.pump():
18             pass
19
20     def pump(self):
21         """Move data back and forth.
22
23         Returns whether any data was moved.
24         """
25         self.clientIO.seek(0)
26         self.serverIO.seek(0)
27         cData = self.clientIO.read()
28         sData = self.serverIO.read()
29         self.clientIO.seek(0)
30         self.serverIO.seek(0)
31         self.clientIO.truncate()
32         self.serverIO.truncate()
33         for byte in cData:
34             self.server.dataReceived(byte)
35         for byte in sData:
36             self.client.dataReceived(byte)
37         if cData or sData:
38             return 1
39         else:
40             return 0
41
42
43 def returnConnected(server, client):
44     """Take two Protocol instances and connect them.
45     """
46     cio = StringIO()
47     sio = StringIO()
48     client.makeConnection(FileWrapper(cio))
49     server.makeConnection(FileWrapper(sio))
50     pump = IOPump(client, server, cio, sio)
51     # Challenge-response authentication:
52     pump.flush()
53     # Uh...
54     pump.flush()
55     return pump