11 class TestSocket(unittest.TestCase):
13 self.sock = gio.Socket(gio.SOCKET_FAMILY_IPV4,
14 gio.SOCKET_TYPE_STREAM,
15 gio.SOCKET_PROTOCOL_TCP)
17 def test_socket_condition_check(self):
18 check = self.sock.condition_check(glib.IO_OUT)
19 self.failUnless(isinstance(check, gobject.GFlags))
20 self.failUnlessEqual(check, glib.IO_OUT | glib.IO_HUP)
22 def test_socket_condition_wait(self):
23 res = self.sock.condition_wait(glib.IO_OUT)
29 class TestSocketAddress(unittest.TestCase):
30 def test_socket_address_enumerator_next_async(self):
31 def callback(enumerator, result):
33 address = enumerator.next_finish(result)
34 self.failUnless(isinstance(address, gio.SocketAddress))
38 socket = gio.NetworkAddress("www.pygtk.org", 80)
39 enumerator = socket.enumerate()
40 enumerator.next_async(callback)
42 loop = glib.MainLoop()
45 class TestSocketListener(unittest.TestCase):
46 def test_socket_listener_add_address(self):
47 address = gio.inet_address_new_from_string("127.0.0.1")
48 inetsock = gio.InetSocketAddress(address, 1024)
50 listener = gio.SocketListener()
51 effective = listener.add_address(inetsock, gio.SOCKET_TYPE_STREAM, gio.SOCKET_PROTOCOL_TCP)
52 self.failUnless(isinstance(effective, gio.InetSocketAddress))
54 def test_socket_listener_accept(self):
55 address = gio.inet_address_new_from_string("127.0.0.1")
56 inetsock = gio.InetSocketAddress(address, 1024)
58 listener = gio.SocketListener()
59 listener.add_address(inetsock, gio.SOCKET_TYPE_STREAM, gio.SOCKET_PROTOCOL_TCP)
61 client = gio.SocketClient()
62 client.connect_to_host("127.0.0.1:1024", 1024)
64 connection, source = listener.accept(cancellable=None)
65 self.failUnless(isinstance(connection, gio.TcpConnection))
67 def test_socket_listener_accept_socket(self):
68 address = gio.inet_address_new_from_string("127.0.0.1")
69 inetsock = gio.InetSocketAddress(address, 1024)
71 listener = gio.SocketListener()
72 listener.add_address(inetsock, gio.SOCKET_TYPE_STREAM, gio.SOCKET_PROTOCOL_TCP)
74 client = gio.SocketClient()
75 client.connect_to_host("127.0.0.1:1024", 1024)
77 socket, source = listener.accept_socket(cancellable=None)
78 self.failUnless(isinstance(socket, gio.Socket))
80 def test_socket_listener_accept_async(self):
81 def callback(listener, result):
83 connection, source = listener.accept_finish(result)
84 self.failUnless(isinstance(connection, gio.TcpConnection))
88 address = gio.inet_address_new_from_string("127.0.0.1")
89 inetsock = gio.InetSocketAddress(address, 1024)
91 listener = gio.SocketListener()
92 listener.add_address(inetsock,
93 gio.SOCKET_TYPE_STREAM,
94 gio.SOCKET_PROTOCOL_TCP)
96 client = gio.SocketClient()
97 client.connect_to_host("127.0.0.1:1024", 1024)
99 listener.accept_async(callback)
101 loop = glib.MainLoop()
104 def test_socket_listener_accept_socket_async(self):
105 def callback(listener, result):
107 socket, source = listener.accept_socket_finish(result)
108 self.failUnless(isinstance(socket, gio.Socket))
112 address = gio.inet_address_new_from_string("127.0.0.1")
113 inetsock = gio.InetSocketAddress(address, 1024)
115 listener = gio.SocketListener()
116 listener.add_address(inetsock,
117 gio.SOCKET_TYPE_STREAM,
118 gio.SOCKET_PROTOCOL_TCP)
120 client = gio.SocketClient()
121 client.connect_to_host("127.0.0.1:1024", 1024)
123 listener.accept_socket_async(callback)
125 loop = glib.MainLoop()