d41bca51b588f5a8c16e5aea625dcf1a2a19b2ac
[profile/ivi/python.git] / Lib / test / test_socket.py
1 #!/usr/bin/env python
2
3 import unittest
4 from test import test_support
5
6 import errno
7 import socket
8 import select
9 import time
10 import traceback
11 import Queue
12 import sys
13 import os
14 import array
15 import contextlib
16 from weakref import proxy
17 import signal
18 import math
19
20 def try_address(host, port=0, family=socket.AF_INET):
21     """Try to bind a socket on the given host:port and return True
22     if that has been possible."""
23     try:
24         sock = socket.socket(family, socket.SOCK_STREAM)
25         sock.bind((host, port))
26     except (socket.error, socket.gaierror):
27         return False
28     else:
29         sock.close()
30         return True
31
32 HOST = test_support.HOST
33 MSG = b'Michael Gilfix was here\n'
34 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
35
36 try:
37     import thread
38     import threading
39 except ImportError:
40     thread = None
41     threading = None
42
43 HOST = test_support.HOST
44 MSG = 'Michael Gilfix was here\n'
45
46 class SocketTCPTest(unittest.TestCase):
47
48     def setUp(self):
49         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50         self.port = test_support.bind_port(self.serv)
51         self.serv.listen(1)
52
53     def tearDown(self):
54         self.serv.close()
55         self.serv = None
56
57 class SocketUDPTest(unittest.TestCase):
58
59     def setUp(self):
60         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
61         self.port = test_support.bind_port(self.serv)
62
63     def tearDown(self):
64         self.serv.close()
65         self.serv = None
66
67 class ThreadableTest:
68     """Threadable Test class
69
70     The ThreadableTest class makes it easy to create a threaded
71     client/server pair from an existing unit test. To create a
72     new threaded class from an existing unit test, use multiple
73     inheritance:
74
75         class NewClass (OldClass, ThreadableTest):
76             pass
77
78     This class defines two new fixture functions with obvious
79     purposes for overriding:
80
81         clientSetUp ()
82         clientTearDown ()
83
84     Any new test functions within the class must then define
85     tests in pairs, where the test name is preceeded with a
86     '_' to indicate the client portion of the test. Ex:
87
88         def testFoo(self):
89             # Server portion
90
91         def _testFoo(self):
92             # Client portion
93
94     Any exceptions raised by the clients during their tests
95     are caught and transferred to the main thread to alert
96     the testing framework.
97
98     Note, the server setup function cannot call any blocking
99     functions that rely on the client thread during setup,
100     unless serverExplicitReady() is called just before
101     the blocking call (such as in setting up a client/server
102     connection and performing the accept() in setUp().
103     """
104
105     def __init__(self):
106         # Swap the true setup function
107         self.__setUp = self.setUp
108         self.__tearDown = self.tearDown
109         self.setUp = self._setUp
110         self.tearDown = self._tearDown
111
112     def serverExplicitReady(self):
113         """This method allows the server to explicitly indicate that
114         it wants the client thread to proceed. This is useful if the
115         server is about to execute a blocking routine that is
116         dependent upon the client thread during its setup routine."""
117         self.server_ready.set()
118
119     def _setUp(self):
120         self.server_ready = threading.Event()
121         self.client_ready = threading.Event()
122         self.done = threading.Event()
123         self.queue = Queue.Queue(1)
124
125         # Do some munging to start the client test.
126         methodname = self.id()
127         i = methodname.rfind('.')
128         methodname = methodname[i+1:]
129         test_method = getattr(self, '_' + methodname)
130         self.client_thread = thread.start_new_thread(
131             self.clientRun, (test_method,))
132
133         self.__setUp()
134         if not self.server_ready.is_set():
135             self.server_ready.set()
136         self.client_ready.wait()
137
138     def _tearDown(self):
139         self.__tearDown()
140         self.done.wait()
141
142         if not self.queue.empty():
143             msg = self.queue.get()
144             self.fail(msg)
145
146     def clientRun(self, test_func):
147         self.server_ready.wait()
148         self.client_ready.set()
149         self.clientSetUp()
150         with test_support.check_py3k_warnings():
151             if not callable(test_func):
152                 raise TypeError("test_func must be a callable function.")
153         try:
154             test_func()
155         except Exception, strerror:
156             self.queue.put(strerror)
157         self.clientTearDown()
158
159     def clientSetUp(self):
160         raise NotImplementedError("clientSetUp must be implemented.")
161
162     def clientTearDown(self):
163         self.done.set()
164         thread.exit()
165
166 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
167
168     def __init__(self, methodName='runTest'):
169         SocketTCPTest.__init__(self, methodName=methodName)
170         ThreadableTest.__init__(self)
171
172     def clientSetUp(self):
173         self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174
175     def clientTearDown(self):
176         self.cli.close()
177         self.cli = None
178         ThreadableTest.clientTearDown(self)
179
180 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
181
182     def __init__(self, methodName='runTest'):
183         SocketUDPTest.__init__(self, methodName=methodName)
184         ThreadableTest.__init__(self)
185
186     def clientSetUp(self):
187         self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
188
189     def clientTearDown(self):
190         self.cli.close()
191         self.cli = None
192         ThreadableTest.clientTearDown(self)
193
194 class SocketConnectedTest(ThreadedTCPSocketTest):
195
196     def __init__(self, methodName='runTest'):
197         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
198
199     def setUp(self):
200         ThreadedTCPSocketTest.setUp(self)
201         # Indicate explicitly we're ready for the client thread to
202         # proceed and then perform the blocking call to accept
203         self.serverExplicitReady()
204         conn, addr = self.serv.accept()
205         self.cli_conn = conn
206
207     def tearDown(self):
208         self.cli_conn.close()
209         self.cli_conn = None
210         ThreadedTCPSocketTest.tearDown(self)
211
212     def clientSetUp(self):
213         ThreadedTCPSocketTest.clientSetUp(self)
214         self.cli.connect((HOST, self.port))
215         self.serv_conn = self.cli
216
217     def clientTearDown(self):
218         self.serv_conn.close()
219         self.serv_conn = None
220         ThreadedTCPSocketTest.clientTearDown(self)
221
222 class SocketPairTest(unittest.TestCase, ThreadableTest):
223
224     def __init__(self, methodName='runTest'):
225         unittest.TestCase.__init__(self, methodName=methodName)
226         ThreadableTest.__init__(self)
227
228     def setUp(self):
229         self.serv, self.cli = socket.socketpair()
230
231     def tearDown(self):
232         self.serv.close()
233         self.serv = None
234
235     def clientSetUp(self):
236         pass
237
238     def clientTearDown(self):
239         self.cli.close()
240         self.cli = None
241         ThreadableTest.clientTearDown(self)
242
243
244 #######################################################################
245 ## Begin Tests
246
247 class GeneralModuleTests(unittest.TestCase):
248
249     def test_weakref(self):
250         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
251         p = proxy(s)
252         self.assertEqual(p.fileno(), s.fileno())
253         s.close()
254         s = None
255         try:
256             p.fileno()
257         except ReferenceError:
258             pass
259         else:
260             self.fail('Socket proxy still exists')
261
262     def testSocketError(self):
263         # Testing socket module exceptions
264         def raise_error(*args, **kwargs):
265             raise socket.error
266         def raise_herror(*args, **kwargs):
267             raise socket.herror
268         def raise_gaierror(*args, **kwargs):
269             raise socket.gaierror
270         self.assertRaises(socket.error, raise_error,
271                               "Error raising socket exception.")
272         self.assertRaises(socket.error, raise_herror,
273                               "Error raising socket exception.")
274         self.assertRaises(socket.error, raise_gaierror,
275                               "Error raising socket exception.")
276
277     def testCrucialConstants(self):
278         # Testing for mission critical constants
279         socket.AF_INET
280         socket.SOCK_STREAM
281         socket.SOCK_DGRAM
282         socket.SOCK_RAW
283         socket.SOCK_RDM
284         socket.SOCK_SEQPACKET
285         socket.SOL_SOCKET
286         socket.SO_REUSEADDR
287
288     def testHostnameRes(self):
289         # Testing hostname resolution mechanisms
290         hostname = socket.gethostname()
291         try:
292             ip = socket.gethostbyname(hostname)
293         except socket.error:
294             # Probably name lookup wasn't set up right; skip this test
295             return
296         self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
297         try:
298             hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
299         except socket.error:
300             # Probably a similar problem as above; skip this test
301             return
302         all_host_names = [hostname, hname] + aliases
303         fqhn = socket.getfqdn(ip)
304         if not fqhn in all_host_names:
305             self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
306
307     def testRefCountGetNameInfo(self):
308         # Testing reference count for getnameinfo
309         if hasattr(sys, "getrefcount"):
310             try:
311                 # On some versions, this loses a reference
312                 orig = sys.getrefcount(__name__)
313                 socket.getnameinfo(__name__,0)
314             except TypeError:
315                 self.assertEqual(sys.getrefcount(__name__), orig,
316                                  "socket.getnameinfo loses a reference")
317
318     def testInterpreterCrash(self):
319         # Making sure getnameinfo doesn't crash the interpreter
320         try:
321             # On some versions, this crashes the interpreter.
322             socket.getnameinfo(('x', 0, 0, 0), 0)
323         except socket.error:
324             pass
325
326     def testNtoH(self):
327         # This just checks that htons etc. are their own inverse,
328         # when looking at the lower 16 or 32 bits.
329         sizes = {socket.htonl: 32, socket.ntohl: 32,
330                  socket.htons: 16, socket.ntohs: 16}
331         for func, size in sizes.items():
332             mask = (1L<<size) - 1
333             for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
334                 self.assertEqual(i & mask, func(func(i&mask)) & mask)
335
336             swapped = func(mask)
337             self.assertEqual(swapped & mask, mask)
338             self.assertRaises(OverflowError, func, 1L<<34)
339
340     def testNtoHErrors(self):
341         good_values = [ 1, 2, 3, 1L, 2L, 3L ]
342         bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
343         for k in good_values:
344             socket.ntohl(k)
345             socket.ntohs(k)
346             socket.htonl(k)
347             socket.htons(k)
348         for k in bad_values:
349             self.assertRaises(OverflowError, socket.ntohl, k)
350             self.assertRaises(OverflowError, socket.ntohs, k)
351             self.assertRaises(OverflowError, socket.htonl, k)
352             self.assertRaises(OverflowError, socket.htons, k)
353
354     def testGetServBy(self):
355         eq = self.assertEqual
356         # Find one service that exists, then check all the related interfaces.
357         # I've ordered this by protocols that have both a tcp and udp
358         # protocol, at least for modern Linuxes.
359         if (sys.platform.startswith('linux') or
360             sys.platform.startswith('freebsd') or
361             sys.platform.startswith('netbsd') or
362             sys.platform == 'darwin'):
363             # avoid the 'echo' service on this platform, as there is an
364             # assumption breaking non-standard port/protocol entry
365             services = ('daytime', 'qotd', 'domain')
366         else:
367             services = ('echo', 'daytime', 'domain')
368         for service in services:
369             try:
370                 port = socket.getservbyname(service, 'tcp')
371                 break
372             except socket.error:
373                 pass
374         else:
375             raise socket.error
376         # Try same call with optional protocol omitted
377         port2 = socket.getservbyname(service)
378         eq(port, port2)
379         # Try udp, but don't barf it it doesn't exist
380         try:
381             udpport = socket.getservbyname(service, 'udp')
382         except socket.error:
383             udpport = None
384         else:
385             eq(udpport, port)
386         # Now make sure the lookup by port returns the same service name
387         eq(socket.getservbyport(port2), service)
388         eq(socket.getservbyport(port, 'tcp'), service)
389         if udpport is not None:
390             eq(socket.getservbyport(udpport, 'udp'), service)
391         # Make sure getservbyport does not accept out of range ports.
392         self.assertRaises(OverflowError, socket.getservbyport, -1)
393         self.assertRaises(OverflowError, socket.getservbyport, 65536)
394
395     def testDefaultTimeout(self):
396         # Testing default timeout
397         # The default timeout should initially be None
398         self.assertEqual(socket.getdefaulttimeout(), None)
399         s = socket.socket()
400         self.assertEqual(s.gettimeout(), None)
401         s.close()
402
403         # Set the default timeout to 10, and see if it propagates
404         socket.setdefaulttimeout(10)
405         self.assertEqual(socket.getdefaulttimeout(), 10)
406         s = socket.socket()
407         self.assertEqual(s.gettimeout(), 10)
408         s.close()
409
410         # Reset the default timeout to None, and see if it propagates
411         socket.setdefaulttimeout(None)
412         self.assertEqual(socket.getdefaulttimeout(), None)
413         s = socket.socket()
414         self.assertEqual(s.gettimeout(), None)
415         s.close()
416
417         # Check that setting it to an invalid value raises ValueError
418         self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
419
420         # Check that setting it to an invalid type raises TypeError
421         self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
422
423     def testIPv4_inet_aton_fourbytes(self):
424         if not hasattr(socket, 'inet_aton'):
425             return  # No inet_aton, nothing to check
426         # Test that issue1008086 and issue767150 are fixed.
427         # It must return 4 bytes.
428         self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
429         self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
430
431     def testIPv4toString(self):
432         if not hasattr(socket, 'inet_pton'):
433             return # No inet_pton() on this platform
434         from socket import inet_aton as f, inet_pton, AF_INET
435         g = lambda a: inet_pton(AF_INET, a)
436
437         self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))
438         self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))
439         self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
440         self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))
441         self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))
442
443         self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))
444         self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))
445         self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
446         self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
447
448     def testIPv6toString(self):
449         if not hasattr(socket, 'inet_pton'):
450             return # No inet_pton() on this platform
451         try:
452             from socket import inet_pton, AF_INET6, has_ipv6
453             if not has_ipv6:
454                 return
455         except ImportError:
456             return
457         f = lambda a: inet_pton(AF_INET6, a)
458
459         self.assertEqual('\x00' * 16, f('::'))
460         self.assertEqual('\x00' * 16, f('0::0'))
461         self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
462         self.assertEqual(
463             '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
464             f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
465         )
466
467     def testStringToIPv4(self):
468         if not hasattr(socket, 'inet_ntop'):
469             return # No inet_ntop() on this platform
470         from socket import inet_ntoa as f, inet_ntop, AF_INET
471         g = lambda a: inet_ntop(AF_INET, a)
472
473         self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))
474         self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))
475         self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))
476         self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))
477
478         self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))
479         self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
480         self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
481
482     def testStringToIPv6(self):
483         if not hasattr(socket, 'inet_ntop'):
484             return # No inet_ntop() on this platform
485         try:
486             from socket import inet_ntop, AF_INET6, has_ipv6
487             if not has_ipv6:
488                 return
489         except ImportError:
490             return
491         f = lambda a: inet_ntop(AF_INET6, a)
492
493         self.assertEqual('::', f('\x00' * 16))
494         self.assertEqual('::1', f('\x00' * 15 + '\x01'))
495         self.assertEqual(
496             'aef:b01:506:1001:ffff:9997:55:170',
497             f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
498         )
499
500     # XXX The following don't test module-level functionality...
501
502     def _get_unused_port(self, bind_address='0.0.0.0'):
503         """Use a temporary socket to elicit an unused ephemeral port.
504
505         Args:
506             bind_address: Hostname or IP address to search for a port on.
507
508         Returns: A most likely to be unused port.
509         """
510         tempsock = socket.socket()
511         tempsock.bind((bind_address, 0))
512         host, port = tempsock.getsockname()
513         tempsock.close()
514         return port
515
516     def testSockName(self):
517         # Testing getsockname()
518         port = self._get_unused_port()
519         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
520         self.addCleanup(sock.close)
521         sock.bind(("0.0.0.0", port))
522         name = sock.getsockname()
523         # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
524         # it reasonable to get the host's addr in addition to 0.0.0.0.
525         # At least for eCos.  This is required for the S/390 to pass.
526         try:
527             my_ip_addr = socket.gethostbyname(socket.gethostname())
528         except socket.error:
529             # Probably name lookup wasn't set up right; skip this test
530             return
531         self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
532         self.assertEqual(name[1], port)
533
534     def testGetSockOpt(self):
535         # Testing getsockopt()
536         # We know a socket should start without reuse==0
537         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
538         self.addCleanup(sock.close)
539         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
540         self.assertFalse(reuse != 0, "initial mode is reuse")
541
542     def testSetSockOpt(self):
543         # Testing setsockopt()
544         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
545         self.addCleanup(sock.close)
546         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
547         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
548         self.assertFalse(reuse == 0, "failed to set reuse mode")
549
550     def testSendAfterClose(self):
551         # testing send() after close() with timeout
552         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
553         sock.settimeout(1)
554         sock.close()
555         self.assertRaises(socket.error, sock.send, "spam")
556
557     def testNewAttributes(self):
558         # testing .family, .type and .protocol
559         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
560         self.assertEqual(sock.family, socket.AF_INET)
561         self.assertEqual(sock.type, socket.SOCK_STREAM)
562         self.assertEqual(sock.proto, 0)
563         sock.close()
564
565     def test_getsockaddrarg(self):
566         host = '0.0.0.0'
567         port = self._get_unused_port(bind_address=host)
568         big_port = port + 65536
569         neg_port = port - 65536
570         sock = socket.socket()
571         try:
572             self.assertRaises(OverflowError, sock.bind, (host, big_port))
573             self.assertRaises(OverflowError, sock.bind, (host, neg_port))
574             sock.bind((host, port))
575         finally:
576             sock.close()
577
578     @unittest.skipUnless(os.name == "nt", "Windows specific")
579     def test_sock_ioctl(self):
580         self.assertTrue(hasattr(socket.socket, 'ioctl'))
581         self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
582         self.assertTrue(hasattr(socket, 'RCVALL_ON'))
583         self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
584         self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
585         s = socket.socket()
586         self.addCleanup(s.close)
587         self.assertRaises(ValueError, s.ioctl, -1, None)
588         s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
589
590     def testGetaddrinfo(self):
591         try:
592             socket.getaddrinfo('localhost', 80)
593         except socket.gaierror as err:
594             if err.errno == socket.EAI_SERVICE:
595                 # see http://bugs.python.org/issue1282647
596                 self.skipTest("buggy libc version")
597             raise
598         # len of every sequence is supposed to be == 5
599         for info in socket.getaddrinfo(HOST, None):
600             self.assertEqual(len(info), 5)
601         # host can be a domain name, a string representation of an
602         # IPv4/v6 address or None
603         socket.getaddrinfo('localhost', 80)
604         socket.getaddrinfo('127.0.0.1', 80)
605         socket.getaddrinfo(None, 80)
606         if SUPPORTS_IPV6:
607             socket.getaddrinfo('::1', 80)
608         # port can be a string service name such as "http", a numeric
609         # port number or None
610         socket.getaddrinfo(HOST, "http")
611         socket.getaddrinfo(HOST, 80)
612         socket.getaddrinfo(HOST, None)
613         # test family and socktype filters
614         infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
615         for family, _, _, _, _ in infos:
616             self.assertEqual(family, socket.AF_INET)
617         infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
618         for _, socktype, _, _, _ in infos:
619             self.assertEqual(socktype, socket.SOCK_STREAM)
620         # test proto and flags arguments
621         socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
622         socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
623         # a server willing to support both IPv4 and IPv6 will
624         # usually do this
625         socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
626                            socket.AI_PASSIVE)
627
628
629     def check_sendall_interrupted(self, with_timeout):
630         # socketpair() is not stricly required, but it makes things easier.
631         if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
632             self.skipTest("signal.alarm and socket.socketpair required for this test")
633         # Our signal handlers clobber the C errno by calling a math function
634         # with an invalid domain value.
635         def ok_handler(*args):
636             self.assertRaises(ValueError, math.acosh, 0)
637         def raising_handler(*args):
638             self.assertRaises(ValueError, math.acosh, 0)
639             1 // 0
640         c, s = socket.socketpair()
641         old_alarm = signal.signal(signal.SIGALRM, raising_handler)
642         try:
643             if with_timeout:
644                 # Just above the one second minimum for signal.alarm
645                 c.settimeout(1.5)
646             with self.assertRaises(ZeroDivisionError):
647                 signal.alarm(1)
648                 c.sendall(b"x" * (1024**2))
649             if with_timeout:
650                 signal.signal(signal.SIGALRM, ok_handler)
651                 signal.alarm(1)
652                 self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))
653         finally:
654             signal.signal(signal.SIGALRM, old_alarm)
655             c.close()
656             s.close()
657
658     def test_sendall_interrupted(self):
659         self.check_sendall_interrupted(False)
660
661     def test_sendall_interrupted_with_timeout(self):
662         self.check_sendall_interrupted(True)
663
664
665 @unittest.skipUnless(thread, 'Threading required for this test.')
666 class BasicTCPTest(SocketConnectedTest):
667
668     def __init__(self, methodName='runTest'):
669         SocketConnectedTest.__init__(self, methodName=methodName)
670
671     def testRecv(self):
672         # Testing large receive over TCP
673         msg = self.cli_conn.recv(1024)
674         self.assertEqual(msg, MSG)
675
676     def _testRecv(self):
677         self.serv_conn.send(MSG)
678
679     def testOverFlowRecv(self):
680         # Testing receive in chunks over TCP
681         seg1 = self.cli_conn.recv(len(MSG) - 3)
682         seg2 = self.cli_conn.recv(1024)
683         msg = seg1 + seg2
684         self.assertEqual(msg, MSG)
685
686     def _testOverFlowRecv(self):
687         self.serv_conn.send(MSG)
688
689     def testRecvFrom(self):
690         # Testing large recvfrom() over TCP
691         msg, addr = self.cli_conn.recvfrom(1024)
692         self.assertEqual(msg, MSG)
693
694     def _testRecvFrom(self):
695         self.serv_conn.send(MSG)
696
697     def testOverFlowRecvFrom(self):
698         # Testing recvfrom() in chunks over TCP
699         seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
700         seg2, addr = self.cli_conn.recvfrom(1024)
701         msg = seg1 + seg2
702         self.assertEqual(msg, MSG)
703
704     def _testOverFlowRecvFrom(self):
705         self.serv_conn.send(MSG)
706
707     def testSendAll(self):
708         # Testing sendall() with a 2048 byte string over TCP
709         msg = ''
710         while 1:
711             read = self.cli_conn.recv(1024)
712             if not read:
713                 break
714             msg += read
715         self.assertEqual(msg, 'f' * 2048)
716
717     def _testSendAll(self):
718         big_chunk = 'f' * 2048
719         self.serv_conn.sendall(big_chunk)
720
721     def testFromFd(self):
722         # Testing fromfd()
723         if not hasattr(socket, "fromfd"):
724             return # On Windows, this doesn't exist
725         fd = self.cli_conn.fileno()
726         sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
727         self.addCleanup(sock.close)
728         msg = sock.recv(1024)
729         self.assertEqual(msg, MSG)
730
731     def _testFromFd(self):
732         self.serv_conn.send(MSG)
733
734     def testDup(self):
735         # Testing dup()
736         sock = self.cli_conn.dup()
737         self.addCleanup(sock.close)
738         msg = sock.recv(1024)
739         self.assertEqual(msg, MSG)
740
741     def _testDup(self):
742         self.serv_conn.send(MSG)
743
744     def testShutdown(self):
745         # Testing shutdown()
746         msg = self.cli_conn.recv(1024)
747         self.assertEqual(msg, MSG)
748         # wait for _testShutdown to finish: on OS X, when the server
749         # closes the connection the client also becomes disconnected,
750         # and the client's shutdown call will fail. (Issue #4397.)
751         self.done.wait()
752
753     def _testShutdown(self):
754         self.serv_conn.send(MSG)
755         self.serv_conn.shutdown(2)
756
757 @unittest.skipUnless(thread, 'Threading required for this test.')
758 class BasicUDPTest(ThreadedUDPSocketTest):
759
760     def __init__(self, methodName='runTest'):
761         ThreadedUDPSocketTest.__init__(self, methodName=methodName)
762
763     def testSendtoAndRecv(self):
764         # Testing sendto() and Recv() over UDP
765         msg = self.serv.recv(len(MSG))
766         self.assertEqual(msg, MSG)
767
768     def _testSendtoAndRecv(self):
769         self.cli.sendto(MSG, 0, (HOST, self.port))
770
771     def testRecvFrom(self):
772         # Testing recvfrom() over UDP
773         msg, addr = self.serv.recvfrom(len(MSG))
774         self.assertEqual(msg, MSG)
775
776     def _testRecvFrom(self):
777         self.cli.sendto(MSG, 0, (HOST, self.port))
778
779     def testRecvFromNegative(self):
780         # Negative lengths passed to recvfrom should give ValueError.
781         self.assertRaises(ValueError, self.serv.recvfrom, -1)
782
783     def _testRecvFromNegative(self):
784         self.cli.sendto(MSG, 0, (HOST, self.port))
785
786 @unittest.skipUnless(thread, 'Threading required for this test.')
787 class TCPCloserTest(ThreadedTCPSocketTest):
788
789     def testClose(self):
790         conn, addr = self.serv.accept()
791         conn.close()
792
793         sd = self.cli
794         read, write, err = select.select([sd], [], [], 1.0)
795         self.assertEqual(read, [sd])
796         self.assertEqual(sd.recv(1), '')
797
798     def _testClose(self):
799         self.cli.connect((HOST, self.port))
800         time.sleep(1.0)
801
802 @unittest.skipUnless(thread, 'Threading required for this test.')
803 class BasicSocketPairTest(SocketPairTest):
804
805     def __init__(self, methodName='runTest'):
806         SocketPairTest.__init__(self, methodName=methodName)
807
808     def testRecv(self):
809         msg = self.serv.recv(1024)
810         self.assertEqual(msg, MSG)
811
812     def _testRecv(self):
813         self.cli.send(MSG)
814
815     def testSend(self):
816         self.serv.send(MSG)
817
818     def _testSend(self):
819         msg = self.cli.recv(1024)
820         self.assertEqual(msg, MSG)
821
822 @unittest.skipUnless(thread, 'Threading required for this test.')
823 class NonBlockingTCPTests(ThreadedTCPSocketTest):
824
825     def __init__(self, methodName='runTest'):
826         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
827
828     def testSetBlocking(self):
829         # Testing whether set blocking works
830         self.serv.setblocking(0)
831         start = time.time()
832         try:
833             self.serv.accept()
834         except socket.error:
835             pass
836         end = time.time()
837         self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
838
839     def _testSetBlocking(self):
840         pass
841
842     def testAccept(self):
843         # Testing non-blocking accept
844         self.serv.setblocking(0)
845         try:
846             conn, addr = self.serv.accept()
847         except socket.error:
848             pass
849         else:
850             self.fail("Error trying to do non-blocking accept.")
851         read, write, err = select.select([self.serv], [], [])
852         if self.serv in read:
853             conn, addr = self.serv.accept()
854             conn.close()
855         else:
856             self.fail("Error trying to do accept after select.")
857
858     def _testAccept(self):
859         time.sleep(0.1)
860         self.cli.connect((HOST, self.port))
861
862     def testConnect(self):
863         # Testing non-blocking connect
864         conn, addr = self.serv.accept()
865         conn.close()
866
867     def _testConnect(self):
868         self.cli.settimeout(10)
869         self.cli.connect((HOST, self.port))
870
871     def testRecv(self):
872         # Testing non-blocking recv
873         conn, addr = self.serv.accept()
874         conn.setblocking(0)
875         try:
876             msg = conn.recv(len(MSG))
877         except socket.error:
878             pass
879         else:
880             self.fail("Error trying to do non-blocking recv.")
881         read, write, err = select.select([conn], [], [])
882         if conn in read:
883             msg = conn.recv(len(MSG))
884             conn.close()
885             self.assertEqual(msg, MSG)
886         else:
887             self.fail("Error during select call to non-blocking socket.")
888
889     def _testRecv(self):
890         self.cli.connect((HOST, self.port))
891         time.sleep(0.1)
892         self.cli.send(MSG)
893
894 @unittest.skipUnless(thread, 'Threading required for this test.')
895 class FileObjectClassTestCase(SocketConnectedTest):
896
897     bufsize = -1 # Use default buffer size
898
899     def __init__(self, methodName='runTest'):
900         SocketConnectedTest.__init__(self, methodName=methodName)
901
902     def setUp(self):
903         SocketConnectedTest.setUp(self)
904         self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
905
906     def tearDown(self):
907         self.serv_file.close()
908         self.assertTrue(self.serv_file.closed)
909         self.serv_file = None
910         SocketConnectedTest.tearDown(self)
911
912     def clientSetUp(self):
913         SocketConnectedTest.clientSetUp(self)
914         self.cli_file = self.serv_conn.makefile('wb')
915
916     def clientTearDown(self):
917         self.cli_file.close()
918         self.assertTrue(self.cli_file.closed)
919         self.cli_file = None
920         SocketConnectedTest.clientTearDown(self)
921
922     def testSmallRead(self):
923         # Performing small file read test
924         first_seg = self.serv_file.read(len(MSG)-3)
925         second_seg = self.serv_file.read(3)
926         msg = first_seg + second_seg
927         self.assertEqual(msg, MSG)
928
929     def _testSmallRead(self):
930         self.cli_file.write(MSG)
931         self.cli_file.flush()
932
933     def testFullRead(self):
934         # read until EOF
935         msg = self.serv_file.read()
936         self.assertEqual(msg, MSG)
937
938     def _testFullRead(self):
939         self.cli_file.write(MSG)
940         self.cli_file.close()
941
942     def testUnbufferedRead(self):
943         # Performing unbuffered file read test
944         buf = ''
945         while 1:
946             char = self.serv_file.read(1)
947             if not char:
948                 break
949             buf += char
950         self.assertEqual(buf, MSG)
951
952     def _testUnbufferedRead(self):
953         self.cli_file.write(MSG)
954         self.cli_file.flush()
955
956     def testReadline(self):
957         # Performing file readline test
958         line = self.serv_file.readline()
959         self.assertEqual(line, MSG)
960
961     def _testReadline(self):
962         self.cli_file.write(MSG)
963         self.cli_file.flush()
964
965     def testReadlineAfterRead(self):
966         a_baloo_is = self.serv_file.read(len("A baloo is"))
967         self.assertEqual("A baloo is", a_baloo_is)
968         _a_bear = self.serv_file.read(len(" a bear"))
969         self.assertEqual(" a bear", _a_bear)
970         line = self.serv_file.readline()
971         self.assertEqual("\n", line)
972         line = self.serv_file.readline()
973         self.assertEqual("A BALOO IS A BEAR.\n", line)
974         line = self.serv_file.readline()
975         self.assertEqual(MSG, line)
976
977     def _testReadlineAfterRead(self):
978         self.cli_file.write("A baloo is a bear\n")
979         self.cli_file.write("A BALOO IS A BEAR.\n")
980         self.cli_file.write(MSG)
981         self.cli_file.flush()
982
983     def testReadlineAfterReadNoNewline(self):
984         end_of_ = self.serv_file.read(len("End Of "))
985         self.assertEqual("End Of ", end_of_)
986         line = self.serv_file.readline()
987         self.assertEqual("Line", line)
988
989     def _testReadlineAfterReadNoNewline(self):
990         self.cli_file.write("End Of Line")
991
992     def testClosedAttr(self):
993         self.assertTrue(not self.serv_file.closed)
994
995     def _testClosedAttr(self):
996         self.assertTrue(not self.cli_file.closed)
997
998
999 class FileObjectInterruptedTestCase(unittest.TestCase):
1000     """Test that the file object correctly handles EINTR internally."""
1001
1002     class MockSocket(object):
1003         def __init__(self, recv_funcs=()):
1004             # A generator that returns callables that we'll call for each
1005             # call to recv().
1006             self._recv_step = iter(recv_funcs)
1007
1008         def recv(self, size):
1009             return self._recv_step.next()()
1010
1011     @staticmethod
1012     def _raise_eintr():
1013         raise socket.error(errno.EINTR)
1014
1015     def _test_readline(self, size=-1, **kwargs):
1016         mock_sock = self.MockSocket(recv_funcs=[
1017                 lambda : "This is the first line\nAnd the sec",
1018                 self._raise_eintr,
1019                 lambda : "ond line is here\n",
1020                 lambda : "",
1021             ])
1022         fo = socket._fileobject(mock_sock, **kwargs)
1023         self.assertEqual(fo.readline(size), "This is the first line\n")
1024         self.assertEqual(fo.readline(size), "And the second line is here\n")
1025
1026     def _test_read(self, size=-1, **kwargs):
1027         mock_sock = self.MockSocket(recv_funcs=[
1028                 lambda : "This is the first line\nAnd the sec",
1029                 self._raise_eintr,
1030                 lambda : "ond line is here\n",
1031                 lambda : "",
1032             ])
1033         fo = socket._fileobject(mock_sock, **kwargs)
1034         self.assertEqual(fo.read(size), "This is the first line\n"
1035                           "And the second line is here\n")
1036
1037     def test_default(self):
1038         self._test_readline()
1039         self._test_readline(size=100)
1040         self._test_read()
1041         self._test_read(size=100)
1042
1043     def test_with_1k_buffer(self):
1044         self._test_readline(bufsize=1024)
1045         self._test_readline(size=100, bufsize=1024)
1046         self._test_read(bufsize=1024)
1047         self._test_read(size=100, bufsize=1024)
1048
1049     def _test_readline_no_buffer(self, size=-1):
1050         mock_sock = self.MockSocket(recv_funcs=[
1051                 lambda : "aa",
1052                 lambda : "\n",
1053                 lambda : "BB",
1054                 self._raise_eintr,
1055                 lambda : "bb",
1056                 lambda : "",
1057             ])
1058         fo = socket._fileobject(mock_sock, bufsize=0)
1059         self.assertEqual(fo.readline(size), "aa\n")
1060         self.assertEqual(fo.readline(size), "BBbb")
1061
1062     def test_no_buffer(self):
1063         self._test_readline_no_buffer()
1064         self._test_readline_no_buffer(size=4)
1065         self._test_read(bufsize=0)
1066         self._test_read(size=100, bufsize=0)
1067
1068
1069 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
1070
1071     """Repeat the tests from FileObjectClassTestCase with bufsize==0.
1072
1073     In this case (and in this case only), it should be possible to
1074     create a file object, read a line from it, create another file
1075     object, read another line from it, without loss of data in the
1076     first file object's buffer.  Note that httplib relies on this
1077     when reading multiple requests from the same socket."""
1078
1079     bufsize = 0 # Use unbuffered mode
1080
1081     def testUnbufferedReadline(self):
1082         # Read a line, create a new file object, read another line with it
1083         line = self.serv_file.readline() # first line
1084         self.assertEqual(line, "A. " + MSG) # first line
1085         self.serv_file = self.cli_conn.makefile('rb', 0)
1086         line = self.serv_file.readline() # second line
1087         self.assertEqual(line, "B. " + MSG) # second line
1088
1089     def _testUnbufferedReadline(self):
1090         self.cli_file.write("A. " + MSG)
1091         self.cli_file.write("B. " + MSG)
1092         self.cli_file.flush()
1093
1094 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1095
1096     bufsize = 1 # Default-buffered for reading; line-buffered for writing
1097
1098
1099 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1100
1101     bufsize = 2 # Exercise the buffering code
1102
1103
1104 class NetworkConnectionTest(object):
1105     """Prove network connection."""
1106     def clientSetUp(self):
1107         # We're inherited below by BasicTCPTest2, which also inherits
1108         # BasicTCPTest, which defines self.port referenced below.
1109         self.cli = socket.create_connection((HOST, self.port))
1110         self.serv_conn = self.cli
1111
1112 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
1113     """Tests that NetworkConnection does not break existing TCP functionality.
1114     """
1115
1116 class NetworkConnectionNoServer(unittest.TestCase):
1117     class MockSocket(socket.socket):
1118         def connect(self, *args):
1119             raise socket.timeout('timed out')
1120
1121     @contextlib.contextmanager
1122     def mocked_socket_module(self):
1123         """Return a socket which times out on connect"""
1124         old_socket = socket.socket
1125         socket.socket = self.MockSocket
1126         try:
1127             yield
1128         finally:
1129             socket.socket = old_socket
1130
1131     def test_connect(self):
1132         port = test_support.find_unused_port()
1133         cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1134         self.addCleanup(cli.close)
1135         with self.assertRaises(socket.error) as cm:
1136             cli.connect((HOST, port))
1137         self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
1138
1139     def test_create_connection(self):
1140         # Issue #9792: errors raised by create_connection() should have
1141         # a proper errno attribute.
1142         port = test_support.find_unused_port()
1143         with self.assertRaises(socket.error) as cm:
1144             socket.create_connection((HOST, port))
1145         self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
1146
1147     def test_create_connection_timeout(self):
1148         # Issue #9792: create_connection() should not recast timeout errors
1149         # as generic socket errors.
1150         with self.mocked_socket_module():
1151             with self.assertRaises(socket.timeout):
1152                 socket.create_connection((HOST, 1234))
1153
1154
1155 @unittest.skipUnless(thread, 'Threading required for this test.')
1156 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
1157
1158     def __init__(self, methodName='runTest'):
1159         SocketTCPTest.__init__(self, methodName=methodName)
1160         ThreadableTest.__init__(self)
1161
1162     def clientSetUp(self):
1163         self.source_port = test_support.find_unused_port()
1164
1165     def clientTearDown(self):
1166         self.cli.close()
1167         self.cli = None
1168         ThreadableTest.clientTearDown(self)
1169
1170     def _justAccept(self):
1171         conn, addr = self.serv.accept()
1172         conn.close()
1173
1174     testFamily = _justAccept
1175     def _testFamily(self):
1176         self.cli = socket.create_connection((HOST, self.port), timeout=30)
1177         self.addCleanup(self.cli.close)
1178         self.assertEqual(self.cli.family, 2)
1179
1180     testSourceAddress = _justAccept
1181     def _testSourceAddress(self):
1182         self.cli = socket.create_connection((HOST, self.port), timeout=30,
1183                 source_address=('', self.source_port))
1184         self.addCleanup(self.cli.close)
1185         self.assertEqual(self.cli.getsockname()[1], self.source_port)
1186         # The port number being used is sufficient to show that the bind()
1187         # call happened.
1188
1189     testTimeoutDefault = _justAccept
1190     def _testTimeoutDefault(self):
1191         # passing no explicit timeout uses socket's global default
1192         self.assertTrue(socket.getdefaulttimeout() is None)
1193         socket.setdefaulttimeout(42)
1194         try:
1195             self.cli = socket.create_connection((HOST, self.port))
1196             self.addCleanup(self.cli.close)
1197         finally:
1198             socket.setdefaulttimeout(None)
1199         self.assertEqual(self.cli.gettimeout(), 42)
1200
1201     testTimeoutNone = _justAccept
1202     def _testTimeoutNone(self):
1203         # None timeout means the same as sock.settimeout(None)
1204         self.assertTrue(socket.getdefaulttimeout() is None)
1205         socket.setdefaulttimeout(30)
1206         try:
1207             self.cli = socket.create_connection((HOST, self.port), timeout=None)
1208             self.addCleanup(self.cli.close)
1209         finally:
1210             socket.setdefaulttimeout(None)
1211         self.assertEqual(self.cli.gettimeout(), None)
1212
1213     testTimeoutValueNamed = _justAccept
1214     def _testTimeoutValueNamed(self):
1215         self.cli = socket.create_connection((HOST, self.port), timeout=30)
1216         self.assertEqual(self.cli.gettimeout(), 30)
1217
1218     testTimeoutValueNonamed = _justAccept
1219     def _testTimeoutValueNonamed(self):
1220         self.cli = socket.create_connection((HOST, self.port), 30)
1221         self.addCleanup(self.cli.close)
1222         self.assertEqual(self.cli.gettimeout(), 30)
1223
1224 @unittest.skipUnless(thread, 'Threading required for this test.')
1225 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1226
1227     def __init__(self, methodName='runTest'):
1228         SocketTCPTest.__init__(self, methodName=methodName)
1229         ThreadableTest.__init__(self)
1230
1231     def clientSetUp(self):
1232         pass
1233
1234     def clientTearDown(self):
1235         self.cli.close()
1236         self.cli = None
1237         ThreadableTest.clientTearDown(self)
1238
1239     def testInsideTimeout(self):
1240         conn, addr = self.serv.accept()
1241         self.addCleanup(conn.close)
1242         time.sleep(3)
1243         conn.send("done!")
1244     testOutsideTimeout = testInsideTimeout
1245
1246     def _testInsideTimeout(self):
1247         self.cli = sock = socket.create_connection((HOST, self.port))
1248         data = sock.recv(5)
1249         self.assertEqual(data, "done!")
1250
1251     def _testOutsideTimeout(self):
1252         self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1253         self.assertRaises(socket.timeout, lambda: sock.recv(5))
1254
1255
1256 class Urllib2FileobjectTest(unittest.TestCase):
1257
1258     # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1259     # it close the socket if the close c'tor argument is true
1260
1261     def testClose(self):
1262         class MockSocket:
1263             closed = False
1264             def flush(self): pass
1265             def close(self): self.closed = True
1266
1267         # must not close unless we request it: the original use of _fileobject
1268         # by module socket requires that the underlying socket not be closed until
1269         # the _socketobject that created the _fileobject is closed
1270         s = MockSocket()
1271         f = socket._fileobject(s)
1272         f.close()
1273         self.assertTrue(not s.closed)
1274
1275         s = MockSocket()
1276         f = socket._fileobject(s, close=True)
1277         f.close()
1278         self.assertTrue(s.closed)
1279
1280 class TCPTimeoutTest(SocketTCPTest):
1281
1282     def testTCPTimeout(self):
1283         def raise_timeout(*args, **kwargs):
1284             self.serv.settimeout(1.0)
1285             self.serv.accept()
1286         self.assertRaises(socket.timeout, raise_timeout,
1287                               "Error generating a timeout exception (TCP)")
1288
1289     def testTimeoutZero(self):
1290         ok = False
1291         try:
1292             self.serv.settimeout(0.0)
1293             foo = self.serv.accept()
1294         except socket.timeout:
1295             self.fail("caught timeout instead of error (TCP)")
1296         except socket.error:
1297             ok = True
1298         except:
1299             self.fail("caught unexpected exception (TCP)")
1300         if not ok:
1301             self.fail("accept() returned success when we did not expect it")
1302
1303     def testInterruptedTimeout(self):
1304         # XXX I don't know how to do this test on MSWindows or any other
1305         # plaform that doesn't support signal.alarm() or os.kill(), though
1306         # the bug should have existed on all platforms.
1307         if not hasattr(signal, "alarm"):
1308             return                  # can only test on *nix
1309         self.serv.settimeout(5.0)   # must be longer than alarm
1310         class Alarm(Exception):
1311             pass
1312         def alarm_handler(signal, frame):
1313             raise Alarm
1314         old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1315         try:
1316             signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
1317             try:
1318                 foo = self.serv.accept()
1319             except socket.timeout:
1320                 self.fail("caught timeout instead of Alarm")
1321             except Alarm:
1322                 pass
1323             except:
1324                 self.fail("caught other exception instead of Alarm:"
1325                           " %s(%s):\n%s" %
1326                           (sys.exc_info()[:2] + (traceback.format_exc(),)))
1327             else:
1328                 self.fail("nothing caught")
1329             finally:
1330                 signal.alarm(0)         # shut off alarm
1331         except Alarm:
1332             self.fail("got Alarm in wrong place")
1333         finally:
1334             # no alarm can be pending.  Safe to restore old handler.
1335             signal.signal(signal.SIGALRM, old_alarm)
1336
1337 class UDPTimeoutTest(SocketTCPTest):
1338
1339     def testUDPTimeout(self):
1340         def raise_timeout(*args, **kwargs):
1341             self.serv.settimeout(1.0)
1342             self.serv.recv(1024)
1343         self.assertRaises(socket.timeout, raise_timeout,
1344                               "Error generating a timeout exception (UDP)")
1345
1346     def testTimeoutZero(self):
1347         ok = False
1348         try:
1349             self.serv.settimeout(0.0)
1350             foo = self.serv.recv(1024)
1351         except socket.timeout:
1352             self.fail("caught timeout instead of error (UDP)")
1353         except socket.error:
1354             ok = True
1355         except:
1356             self.fail("caught unexpected exception (UDP)")
1357         if not ok:
1358             self.fail("recv() returned success when we did not expect it")
1359
1360 class TestExceptions(unittest.TestCase):
1361
1362     def testExceptionTree(self):
1363         self.assertTrue(issubclass(socket.error, Exception))
1364         self.assertTrue(issubclass(socket.herror, socket.error))
1365         self.assertTrue(issubclass(socket.gaierror, socket.error))
1366         self.assertTrue(issubclass(socket.timeout, socket.error))
1367
1368 class TestLinuxAbstractNamespace(unittest.TestCase):
1369
1370     UNIX_PATH_MAX = 108
1371
1372     def testLinuxAbstractNamespace(self):
1373         address = "\x00python-test-hello\x00\xff"
1374         s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1375         s1.bind(address)
1376         s1.listen(1)
1377         s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1378         s2.connect(s1.getsockname())
1379         s1.accept()
1380         self.assertEqual(s1.getsockname(), address)
1381         self.assertEqual(s2.getpeername(), address)
1382
1383     def testMaxName(self):
1384         address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1385         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1386         s.bind(address)
1387         self.assertEqual(s.getsockname(), address)
1388
1389     def testNameOverflow(self):
1390         address = "\x00" + "h" * self.UNIX_PATH_MAX
1391         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1392         self.assertRaises(socket.error, s.bind, address)
1393
1394
1395 @unittest.skipUnless(thread, 'Threading required for this test.')
1396 class BufferIOTest(SocketConnectedTest):
1397     """
1398     Test the buffer versions of socket.recv() and socket.send().
1399     """
1400     def __init__(self, methodName='runTest'):
1401         SocketConnectedTest.__init__(self, methodName=methodName)
1402
1403     def testRecvIntoArray(self):
1404         buf = array.array('c', ' '*1024)
1405         nbytes = self.cli_conn.recv_into(buf)
1406         self.assertEqual(nbytes, len(MSG))
1407         msg = buf.tostring()[:len(MSG)]
1408         self.assertEqual(msg, MSG)
1409
1410     def _testRecvIntoArray(self):
1411         with test_support.check_py3k_warnings():
1412             buf = buffer(MSG)
1413         self.serv_conn.send(buf)
1414
1415     def testRecvIntoBytearray(self):
1416         buf = bytearray(1024)
1417         nbytes = self.cli_conn.recv_into(buf)
1418         self.assertEqual(nbytes, len(MSG))
1419         msg = buf[:len(MSG)]
1420         self.assertEqual(msg, MSG)
1421
1422     _testRecvIntoBytearray = _testRecvIntoArray
1423
1424     def testRecvIntoMemoryview(self):
1425         buf = bytearray(1024)
1426         nbytes = self.cli_conn.recv_into(memoryview(buf))
1427         self.assertEqual(nbytes, len(MSG))
1428         msg = buf[:len(MSG)]
1429         self.assertEqual(msg, MSG)
1430
1431     _testRecvIntoMemoryview = _testRecvIntoArray
1432
1433     def testRecvFromIntoArray(self):
1434         buf = array.array('c', ' '*1024)
1435         nbytes, addr = self.cli_conn.recvfrom_into(buf)
1436         self.assertEqual(nbytes, len(MSG))
1437         msg = buf.tostring()[:len(MSG)]
1438         self.assertEqual(msg, MSG)
1439
1440     def _testRecvFromIntoArray(self):
1441         with test_support.check_py3k_warnings():
1442             buf = buffer(MSG)
1443         self.serv_conn.send(buf)
1444
1445     def testRecvFromIntoBytearray(self):
1446         buf = bytearray(1024)
1447         nbytes, addr = self.cli_conn.recvfrom_into(buf)
1448         self.assertEqual(nbytes, len(MSG))
1449         msg = buf[:len(MSG)]
1450         self.assertEqual(msg, MSG)
1451
1452     _testRecvFromIntoBytearray = _testRecvFromIntoArray
1453
1454     def testRecvFromIntoMemoryview(self):
1455         buf = bytearray(1024)
1456         nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
1457         self.assertEqual(nbytes, len(MSG))
1458         msg = buf[:len(MSG)]
1459         self.assertEqual(msg, MSG)
1460
1461     _testRecvFromIntoMemoryview = _testRecvFromIntoArray
1462
1463
1464 TIPC_STYPE = 2000
1465 TIPC_LOWER = 200
1466 TIPC_UPPER = 210
1467
1468 def isTipcAvailable():
1469     """Check if the TIPC module is loaded
1470
1471     The TIPC module is not loaded automatically on Ubuntu and probably
1472     other Linux distros.
1473     """
1474     if not hasattr(socket, "AF_TIPC"):
1475         return False
1476     if not os.path.isfile("/proc/modules"):
1477         return False
1478     with open("/proc/modules") as f:
1479         for line in f:
1480             if line.startswith("tipc "):
1481                 return True
1482     if test_support.verbose:
1483         print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1484     return False
1485
1486 class TIPCTest (unittest.TestCase):
1487     def testRDM(self):
1488         srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1489         cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1490
1491         srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1492         srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1493                 TIPC_LOWER, TIPC_UPPER)
1494         srv.bind(srvaddr)
1495
1496         sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1497                 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1498         cli.sendto(MSG, sendaddr)
1499
1500         msg, recvaddr = srv.recvfrom(1024)
1501
1502         self.assertEqual(cli.getsockname(), recvaddr)
1503         self.assertEqual(msg, MSG)
1504
1505
1506 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1507     def __init__(self, methodName = 'runTest'):
1508         unittest.TestCase.__init__(self, methodName = methodName)
1509         ThreadableTest.__init__(self)
1510
1511     def setUp(self):
1512         self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1513         self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1514         srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1515                 TIPC_LOWER, TIPC_UPPER)
1516         self.srv.bind(srvaddr)
1517         self.srv.listen(5)
1518         self.serverExplicitReady()
1519         self.conn, self.connaddr = self.srv.accept()
1520
1521     def clientSetUp(self):
1522         # The is a hittable race between serverExplicitReady() and the
1523         # accept() call; sleep a little while to avoid it, otherwise
1524         # we could get an exception
1525         time.sleep(0.1)
1526         self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1527         addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1528                 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1529         self.cli.connect(addr)
1530         self.cliaddr = self.cli.getsockname()
1531
1532     def testStream(self):
1533         msg = self.conn.recv(1024)
1534         self.assertEqual(msg, MSG)
1535         self.assertEqual(self.cliaddr, self.connaddr)
1536
1537     def _testStream(self):
1538         self.cli.send(MSG)
1539         self.cli.close()
1540
1541
1542 def test_main():
1543     tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1544              TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
1545              UDPTimeoutTest ]
1546
1547     tests.extend([
1548         NonBlockingTCPTests,
1549         FileObjectClassTestCase,
1550         FileObjectInterruptedTestCase,
1551         UnbufferedFileObjectClassTestCase,
1552         LineBufferedFileObjectClassTestCase,
1553         SmallBufferedFileObjectClassTestCase,
1554         Urllib2FileobjectTest,
1555         NetworkConnectionNoServer,
1556         NetworkConnectionAttributesTest,
1557         NetworkConnectionBehaviourTest,
1558     ])
1559     if hasattr(socket, "socketpair"):
1560         tests.append(BasicSocketPairTest)
1561     if sys.platform == 'linux2':
1562         tests.append(TestLinuxAbstractNamespace)
1563     if isTipcAvailable():
1564         tests.append(TIPCTest)
1565         tests.append(TIPCThreadableTest)
1566
1567     thread_info = test_support.threading_setup()
1568     test_support.run_unittest(*tests)
1569     test_support.threading_cleanup(*thread_info)
1570
1571 if __name__ == "__main__":
1572     test_main()