Branch and push for 2.0
[profile/ivi/pygobject2.git] / tests / test_gsocket.py
1 # -*- Mode: Python -*-
2
3 import os
4 import unittest
5
6 import glib
7 import gobject
8 import gio
9
10
11 class TestSocket(unittest.TestCase):
12     def setUp(self):
13         self.sock = gio.Socket(gio.SOCKET_FAMILY_IPV4,
14                                gio.SOCKET_TYPE_STREAM,
15                                gio.SOCKET_PROTOCOL_TCP)
16
17     def test_socket_condition_check(self):
18         check = self.sock.condition_check(glib.IO_OUT)
19         self.failUnless(isinstance(check, gobject.GFlags))
20         self.failUnlessEqual(check, glib.IO_OUT | glib.IO_HUP)
21
22     def test_socket_condition_wait(self):
23         res = self.sock.condition_wait(glib.IO_OUT)
24         self.failUnless(res)
25
26     def tearDown(self):
27         self.sock.close()
28
29 class TestSocketAddress(unittest.TestCase):
30     def test_socket_address_enumerator_next_async(self):
31         def callback(enumerator, result):
32             try:
33                 address = enumerator.next_finish(result)
34                 self.failUnless(isinstance(address, gio.SocketAddress))
35             finally:
36                 loop.quit()
37
38         socket = gio.NetworkAddress("www.pygtk.org", 80)
39         enumerator = socket.enumerate()
40         enumerator.next_async(callback)
41
42         loop = glib.MainLoop()
43         loop.run()
44
45 class TestSocketListener(unittest.TestCase):
46     def test_socket_listener_add_address(self):
47         address = gio.inet_address_new_from_string("127.0.0.1")
48         inetsock = gio.InetSocketAddress(address, 1024)
49         
50         listener = gio.SocketListener()
51         effective = listener.add_address(inetsock, gio.SOCKET_TYPE_STREAM, gio.SOCKET_PROTOCOL_TCP)
52         self.failUnless(isinstance(effective, gio.InetSocketAddress))
53
54     def test_socket_listener_accept(self):
55         address = gio.inet_address_new_from_string("127.0.0.1")
56         inetsock = gio.InetSocketAddress(address, 1024)
57
58         listener = gio.SocketListener()
59         listener.add_address(inetsock, gio.SOCKET_TYPE_STREAM, gio.SOCKET_PROTOCOL_TCP)
60
61         client = gio.SocketClient()
62         client.connect_to_host("127.0.0.1:1024", 1024)
63
64         connection, source = listener.accept(cancellable=None)
65         self.failUnless(isinstance(connection, gio.TcpConnection))
66
67     def test_socket_listener_accept_socket(self):
68         address = gio.inet_address_new_from_string("127.0.0.1")
69         inetsock = gio.InetSocketAddress(address, 1024)
70
71         listener = gio.SocketListener()
72         listener.add_address(inetsock, gio.SOCKET_TYPE_STREAM, gio.SOCKET_PROTOCOL_TCP)
73
74         client = gio.SocketClient()
75         client.connect_to_host("127.0.0.1:1024", 1024)
76
77         socket, source = listener.accept_socket(cancellable=None)
78         self.failUnless(isinstance(socket, gio.Socket))
79
80     def test_socket_listener_accept_async(self):
81         def callback(listener, result):
82             try:
83                 connection, source = listener.accept_finish(result)
84                 self.failUnless(isinstance(connection, gio.TcpConnection))
85             finally:
86                 loop.quit()
87
88         address = gio.inet_address_new_from_string("127.0.0.1")
89         inetsock = gio.InetSocketAddress(address, 1024)
90         
91         listener = gio.SocketListener()
92         listener.add_address(inetsock,
93                              gio.SOCKET_TYPE_STREAM,
94                              gio.SOCKET_PROTOCOL_TCP)
95
96         client = gio.SocketClient()
97         client.connect_to_host("127.0.0.1:1024", 1024)
98         
99         listener.accept_async(callback)
100
101         loop = glib.MainLoop()
102         loop.run()
103
104     def test_socket_listener_accept_socket_async(self):
105         def callback(listener, result):
106             try:
107                 socket, source = listener.accept_socket_finish(result)
108                 self.failUnless(isinstance(socket, gio.Socket))
109             finally:
110                 loop.quit()
111
112         address = gio.inet_address_new_from_string("127.0.0.1")
113         inetsock = gio.InetSocketAddress(address, 1024)
114         
115         listener = gio.SocketListener()
116         listener.add_address(inetsock,
117                              gio.SOCKET_TYPE_STREAM,
118                              gio.SOCKET_PROTOCOL_TCP)
119
120         client = gio.SocketClient()
121         client.connect_to_host("127.0.0.1:1024", 1024)
122         
123         listener.accept_socket_async(callback)
124
125         loop = glib.MainLoop()
126         loop.run()