4 from test import test_support
16 from weakref import proxy
20 def try_address(host, port=0, family=socket.AF_INET):
21 """Try to bind a socket on the given host:port and return True
22 if that has been possible."""
24 sock = socket.socket(family, socket.SOCK_STREAM)
25 sock.bind((host, port))
26 except (socket.error, socket.gaierror):
32 HOST = test_support.HOST
33 MSG = b'Michael Gilfix was here\n'
34 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
43 HOST = test_support.HOST
44 MSG = 'Michael Gilfix was here\n'
46 class SocketTCPTest(unittest.TestCase):
49 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.port = test_support.bind_port(self.serv)
57 class SocketUDPTest(unittest.TestCase):
60 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
61 self.port = test_support.bind_port(self.serv)
68 """Threadable Test class
70 The ThreadableTest class makes it easy to create a threaded
71 client/server pair from an existing unit test. To create a
72 new threaded class from an existing unit test, use multiple
75 class NewClass (OldClass, ThreadableTest):
78 This class defines two new fixture functions with obvious
79 purposes for overriding:
84 Any new test functions within the class must then define
85 tests in pairs, where the test name is preceeded with a
86 '_' to indicate the client portion of the test. Ex:
94 Any exceptions raised by the clients during their tests
95 are caught and transferred to the main thread to alert
96 the testing framework.
98 Note, the server setup function cannot call any blocking
99 functions that rely on the client thread during setup,
100 unless serverExplicitReady() is called just before
101 the blocking call (such as in setting up a client/server
102 connection and performing the accept() in setUp().
106 # Swap the true setup function
107 self.__setUp = self.setUp
108 self.__tearDown = self.tearDown
109 self.setUp = self._setUp
110 self.tearDown = self._tearDown
112 def serverExplicitReady(self):
113 """This method allows the server to explicitly indicate that
114 it wants the client thread to proceed. This is useful if the
115 server is about to execute a blocking routine that is
116 dependent upon the client thread during its setup routine."""
117 self.server_ready.set()
120 self.server_ready = threading.Event()
121 self.client_ready = threading.Event()
122 self.done = threading.Event()
123 self.queue = Queue.Queue(1)
125 # Do some munging to start the client test.
126 methodname = self.id()
127 i = methodname.rfind('.')
128 methodname = methodname[i+1:]
129 test_method = getattr(self, '_' + methodname)
130 self.client_thread = thread.start_new_thread(
131 self.clientRun, (test_method,))
134 if not self.server_ready.is_set():
135 self.server_ready.set()
136 self.client_ready.wait()
142 if not self.queue.empty():
143 msg = self.queue.get()
146 def clientRun(self, test_func):
147 self.server_ready.wait()
148 self.client_ready.set()
150 with test_support.check_py3k_warnings():
151 if not callable(test_func):
152 raise TypeError("test_func must be a callable function.")
155 except Exception, strerror:
156 self.queue.put(strerror)
157 self.clientTearDown()
159 def clientSetUp(self):
160 raise NotImplementedError("clientSetUp must be implemented.")
162 def clientTearDown(self):
166 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
168 def __init__(self, methodName='runTest'):
169 SocketTCPTest.__init__(self, methodName=methodName)
170 ThreadableTest.__init__(self)
172 def clientSetUp(self):
173 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
175 def clientTearDown(self):
178 ThreadableTest.clientTearDown(self)
180 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
182 def __init__(self, methodName='runTest'):
183 SocketUDPTest.__init__(self, methodName=methodName)
184 ThreadableTest.__init__(self)
186 def clientSetUp(self):
187 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
189 def clientTearDown(self):
192 ThreadableTest.clientTearDown(self)
194 class SocketConnectedTest(ThreadedTCPSocketTest):
196 def __init__(self, methodName='runTest'):
197 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
200 ThreadedTCPSocketTest.setUp(self)
201 # Indicate explicitly we're ready for the client thread to
202 # proceed and then perform the blocking call to accept
203 self.serverExplicitReady()
204 conn, addr = self.serv.accept()
208 self.cli_conn.close()
210 ThreadedTCPSocketTest.tearDown(self)
212 def clientSetUp(self):
213 ThreadedTCPSocketTest.clientSetUp(self)
214 self.cli.connect((HOST, self.port))
215 self.serv_conn = self.cli
217 def clientTearDown(self):
218 self.serv_conn.close()
219 self.serv_conn = None
220 ThreadedTCPSocketTest.clientTearDown(self)
222 class SocketPairTest(unittest.TestCase, ThreadableTest):
224 def __init__(self, methodName='runTest'):
225 unittest.TestCase.__init__(self, methodName=methodName)
226 ThreadableTest.__init__(self)
229 self.serv, self.cli = socket.socketpair()
235 def clientSetUp(self):
238 def clientTearDown(self):
241 ThreadableTest.clientTearDown(self)
244 #######################################################################
247 class GeneralModuleTests(unittest.TestCase):
249 def test_weakref(self):
250 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
252 self.assertEqual(p.fileno(), s.fileno())
257 except ReferenceError:
260 self.fail('Socket proxy still exists')
262 def testSocketError(self):
263 # Testing socket module exceptions
264 def raise_error(*args, **kwargs):
266 def raise_herror(*args, **kwargs):
268 def raise_gaierror(*args, **kwargs):
269 raise socket.gaierror
270 self.assertRaises(socket.error, raise_error,
271 "Error raising socket exception.")
272 self.assertRaises(socket.error, raise_herror,
273 "Error raising socket exception.")
274 self.assertRaises(socket.error, raise_gaierror,
275 "Error raising socket exception.")
277 def testCrucialConstants(self):
278 # Testing for mission critical constants
284 socket.SOCK_SEQPACKET
288 def testHostnameRes(self):
289 # Testing hostname resolution mechanisms
290 hostname = socket.gethostname()
292 ip = socket.gethostbyname(hostname)
294 # Probably name lookup wasn't set up right; skip this test
296 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
298 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
300 # Probably a similar problem as above; skip this test
302 all_host_names = [hostname, hname] + aliases
303 fqhn = socket.getfqdn(ip)
304 if not fqhn in all_host_names:
305 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
307 def testRefCountGetNameInfo(self):
308 # Testing reference count for getnameinfo
309 if hasattr(sys, "getrefcount"):
311 # On some versions, this loses a reference
312 orig = sys.getrefcount(__name__)
313 socket.getnameinfo(__name__,0)
315 self.assertEqual(sys.getrefcount(__name__), orig,
316 "socket.getnameinfo loses a reference")
318 def testInterpreterCrash(self):
319 # Making sure getnameinfo doesn't crash the interpreter
321 # On some versions, this crashes the interpreter.
322 socket.getnameinfo(('x', 0, 0, 0), 0)
327 # This just checks that htons etc. are their own inverse,
328 # when looking at the lower 16 or 32 bits.
329 sizes = {socket.htonl: 32, socket.ntohl: 32,
330 socket.htons: 16, socket.ntohs: 16}
331 for func, size in sizes.items():
332 mask = (1L<<size) - 1
333 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
334 self.assertEqual(i & mask, func(func(i&mask)) & mask)
337 self.assertEqual(swapped & mask, mask)
338 self.assertRaises(OverflowError, func, 1L<<34)
340 def testNtoHErrors(self):
341 good_values = [ 1, 2, 3, 1L, 2L, 3L ]
342 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
343 for k in good_values:
349 self.assertRaises(OverflowError, socket.ntohl, k)
350 self.assertRaises(OverflowError, socket.ntohs, k)
351 self.assertRaises(OverflowError, socket.htonl, k)
352 self.assertRaises(OverflowError, socket.htons, k)
354 def testGetServBy(self):
355 eq = self.assertEqual
356 # Find one service that exists, then check all the related interfaces.
357 # I've ordered this by protocols that have both a tcp and udp
358 # protocol, at least for modern Linuxes.
359 if (sys.platform.startswith('linux') or
360 sys.platform.startswith('freebsd') or
361 sys.platform.startswith('netbsd') or
362 sys.platform == 'darwin'):
363 # avoid the 'echo' service on this platform, as there is an
364 # assumption breaking non-standard port/protocol entry
365 services = ('daytime', 'qotd', 'domain')
367 services = ('echo', 'daytime', 'domain')
368 for service in services:
370 port = socket.getservbyname(service, 'tcp')
376 # Try same call with optional protocol omitted
377 port2 = socket.getservbyname(service)
379 # Try udp, but don't barf it it doesn't exist
381 udpport = socket.getservbyname(service, 'udp')
386 # Now make sure the lookup by port returns the same service name
387 eq(socket.getservbyport(port2), service)
388 eq(socket.getservbyport(port, 'tcp'), service)
389 if udpport is not None:
390 eq(socket.getservbyport(udpport, 'udp'), service)
391 # Make sure getservbyport does not accept out of range ports.
392 self.assertRaises(OverflowError, socket.getservbyport, -1)
393 self.assertRaises(OverflowError, socket.getservbyport, 65536)
395 def testDefaultTimeout(self):
396 # Testing default timeout
397 # The default timeout should initially be None
398 self.assertEqual(socket.getdefaulttimeout(), None)
400 self.assertEqual(s.gettimeout(), None)
403 # Set the default timeout to 10, and see if it propagates
404 socket.setdefaulttimeout(10)
405 self.assertEqual(socket.getdefaulttimeout(), 10)
407 self.assertEqual(s.gettimeout(), 10)
410 # Reset the default timeout to None, and see if it propagates
411 socket.setdefaulttimeout(None)
412 self.assertEqual(socket.getdefaulttimeout(), None)
414 self.assertEqual(s.gettimeout(), None)
417 # Check that setting it to an invalid value raises ValueError
418 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
420 # Check that setting it to an invalid type raises TypeError
421 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
423 def testIPv4_inet_aton_fourbytes(self):
424 if not hasattr(socket, 'inet_aton'):
425 return # No inet_aton, nothing to check
426 # Test that issue1008086 and issue767150 are fixed.
427 # It must return 4 bytes.
428 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
429 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
431 def testIPv4toString(self):
432 if not hasattr(socket, 'inet_pton'):
433 return # No inet_pton() on this platform
434 from socket import inet_aton as f, inet_pton, AF_INET
435 g = lambda a: inet_pton(AF_INET, a)
437 self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))
438 self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))
439 self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
440 self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))
441 self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))
443 self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))
444 self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))
445 self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
446 self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
448 def testIPv6toString(self):
449 if not hasattr(socket, 'inet_pton'):
450 return # No inet_pton() on this platform
452 from socket import inet_pton, AF_INET6, has_ipv6
457 f = lambda a: inet_pton(AF_INET6, a)
459 self.assertEqual('\x00' * 16, f('::'))
460 self.assertEqual('\x00' * 16, f('0::0'))
461 self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
463 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
464 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
467 def testStringToIPv4(self):
468 if not hasattr(socket, 'inet_ntop'):
469 return # No inet_ntop() on this platform
470 from socket import inet_ntoa as f, inet_ntop, AF_INET
471 g = lambda a: inet_ntop(AF_INET, a)
473 self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))
474 self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))
475 self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))
476 self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))
478 self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))
479 self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
480 self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
482 def testStringToIPv6(self):
483 if not hasattr(socket, 'inet_ntop'):
484 return # No inet_ntop() on this platform
486 from socket import inet_ntop, AF_INET6, has_ipv6
491 f = lambda a: inet_ntop(AF_INET6, a)
493 self.assertEqual('::', f('\x00' * 16))
494 self.assertEqual('::1', f('\x00' * 15 + '\x01'))
496 'aef:b01:506:1001:ffff:9997:55:170',
497 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
500 # XXX The following don't test module-level functionality...
502 def _get_unused_port(self, bind_address='0.0.0.0'):
503 """Use a temporary socket to elicit an unused ephemeral port.
506 bind_address: Hostname or IP address to search for a port on.
508 Returns: A most likely to be unused port.
510 tempsock = socket.socket()
511 tempsock.bind((bind_address, 0))
512 host, port = tempsock.getsockname()
516 def testSockName(self):
517 # Testing getsockname()
518 port = self._get_unused_port()
519 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
520 self.addCleanup(sock.close)
521 sock.bind(("0.0.0.0", port))
522 name = sock.getsockname()
523 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
524 # it reasonable to get the host's addr in addition to 0.0.0.0.
525 # At least for eCos. This is required for the S/390 to pass.
527 my_ip_addr = socket.gethostbyname(socket.gethostname())
529 # Probably name lookup wasn't set up right; skip this test
531 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
532 self.assertEqual(name[1], port)
534 def testGetSockOpt(self):
535 # Testing getsockopt()
536 # We know a socket should start without reuse==0
537 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
538 self.addCleanup(sock.close)
539 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
540 self.assertFalse(reuse != 0, "initial mode is reuse")
542 def testSetSockOpt(self):
543 # Testing setsockopt()
544 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
545 self.addCleanup(sock.close)
546 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
547 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
548 self.assertFalse(reuse == 0, "failed to set reuse mode")
550 def testSendAfterClose(self):
551 # testing send() after close() with timeout
552 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
555 self.assertRaises(socket.error, sock.send, "spam")
557 def testNewAttributes(self):
558 # testing .family, .type and .protocol
559 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
560 self.assertEqual(sock.family, socket.AF_INET)
561 self.assertEqual(sock.type, socket.SOCK_STREAM)
562 self.assertEqual(sock.proto, 0)
565 def test_getsockaddrarg(self):
567 port = self._get_unused_port(bind_address=host)
568 big_port = port + 65536
569 neg_port = port - 65536
570 sock = socket.socket()
572 self.assertRaises(OverflowError, sock.bind, (host, big_port))
573 self.assertRaises(OverflowError, sock.bind, (host, neg_port))
574 sock.bind((host, port))
578 @unittest.skipUnless(os.name == "nt", "Windows specific")
579 def test_sock_ioctl(self):
580 self.assertTrue(hasattr(socket.socket, 'ioctl'))
581 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
582 self.assertTrue(hasattr(socket, 'RCVALL_ON'))
583 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
584 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
586 self.addCleanup(s.close)
587 self.assertRaises(ValueError, s.ioctl, -1, None)
588 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
590 def testGetaddrinfo(self):
592 socket.getaddrinfo('localhost', 80)
593 except socket.gaierror as err:
594 if err.errno == socket.EAI_SERVICE:
595 # see http://bugs.python.org/issue1282647
596 self.skipTest("buggy libc version")
598 # len of every sequence is supposed to be == 5
599 for info in socket.getaddrinfo(HOST, None):
600 self.assertEqual(len(info), 5)
601 # host can be a domain name, a string representation of an
602 # IPv4/v6 address or None
603 socket.getaddrinfo('localhost', 80)
604 socket.getaddrinfo('127.0.0.1', 80)
605 socket.getaddrinfo(None, 80)
607 socket.getaddrinfo('::1', 80)
608 # port can be a string service name such as "http", a numeric
609 # port number or None
610 socket.getaddrinfo(HOST, "http")
611 socket.getaddrinfo(HOST, 80)
612 socket.getaddrinfo(HOST, None)
613 # test family and socktype filters
614 infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
615 for family, _, _, _, _ in infos:
616 self.assertEqual(family, socket.AF_INET)
617 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
618 for _, socktype, _, _, _ in infos:
619 self.assertEqual(socktype, socket.SOCK_STREAM)
620 # test proto and flags arguments
621 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
622 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
623 # a server willing to support both IPv4 and IPv6 will
625 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
629 def check_sendall_interrupted(self, with_timeout):
630 # socketpair() is not stricly required, but it makes things easier.
631 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
632 self.skipTest("signal.alarm and socket.socketpair required for this test")
633 # Our signal handlers clobber the C errno by calling a math function
634 # with an invalid domain value.
635 def ok_handler(*args):
636 self.assertRaises(ValueError, math.acosh, 0)
637 def raising_handler(*args):
638 self.assertRaises(ValueError, math.acosh, 0)
640 c, s = socket.socketpair()
641 old_alarm = signal.signal(signal.SIGALRM, raising_handler)
644 # Just above the one second minimum for signal.alarm
646 with self.assertRaises(ZeroDivisionError):
648 c.sendall(b"x" * (1024**2))
650 signal.signal(signal.SIGALRM, ok_handler)
652 self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))
654 signal.signal(signal.SIGALRM, old_alarm)
658 def test_sendall_interrupted(self):
659 self.check_sendall_interrupted(False)
661 def test_sendall_interrupted_with_timeout(self):
662 self.check_sendall_interrupted(True)
665 @unittest.skipUnless(thread, 'Threading required for this test.')
666 class BasicTCPTest(SocketConnectedTest):
668 def __init__(self, methodName='runTest'):
669 SocketConnectedTest.__init__(self, methodName=methodName)
672 # Testing large receive over TCP
673 msg = self.cli_conn.recv(1024)
674 self.assertEqual(msg, MSG)
677 self.serv_conn.send(MSG)
679 def testOverFlowRecv(self):
680 # Testing receive in chunks over TCP
681 seg1 = self.cli_conn.recv(len(MSG) - 3)
682 seg2 = self.cli_conn.recv(1024)
684 self.assertEqual(msg, MSG)
686 def _testOverFlowRecv(self):
687 self.serv_conn.send(MSG)
689 def testRecvFrom(self):
690 # Testing large recvfrom() over TCP
691 msg, addr = self.cli_conn.recvfrom(1024)
692 self.assertEqual(msg, MSG)
694 def _testRecvFrom(self):
695 self.serv_conn.send(MSG)
697 def testOverFlowRecvFrom(self):
698 # Testing recvfrom() in chunks over TCP
699 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
700 seg2, addr = self.cli_conn.recvfrom(1024)
702 self.assertEqual(msg, MSG)
704 def _testOverFlowRecvFrom(self):
705 self.serv_conn.send(MSG)
707 def testSendAll(self):
708 # Testing sendall() with a 2048 byte string over TCP
711 read = self.cli_conn.recv(1024)
715 self.assertEqual(msg, 'f' * 2048)
717 def _testSendAll(self):
718 big_chunk = 'f' * 2048
719 self.serv_conn.sendall(big_chunk)
721 def testFromFd(self):
723 if not hasattr(socket, "fromfd"):
724 return # On Windows, this doesn't exist
725 fd = self.cli_conn.fileno()
726 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
727 self.addCleanup(sock.close)
728 msg = sock.recv(1024)
729 self.assertEqual(msg, MSG)
731 def _testFromFd(self):
732 self.serv_conn.send(MSG)
736 sock = self.cli_conn.dup()
737 self.addCleanup(sock.close)
738 msg = sock.recv(1024)
739 self.assertEqual(msg, MSG)
742 self.serv_conn.send(MSG)
744 def testShutdown(self):
746 msg = self.cli_conn.recv(1024)
747 self.assertEqual(msg, MSG)
748 # wait for _testShutdown to finish: on OS X, when the server
749 # closes the connection the client also becomes disconnected,
750 # and the client's shutdown call will fail. (Issue #4397.)
753 def _testShutdown(self):
754 self.serv_conn.send(MSG)
755 self.serv_conn.shutdown(2)
757 @unittest.skipUnless(thread, 'Threading required for this test.')
758 class BasicUDPTest(ThreadedUDPSocketTest):
760 def __init__(self, methodName='runTest'):
761 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
763 def testSendtoAndRecv(self):
764 # Testing sendto() and Recv() over UDP
765 msg = self.serv.recv(len(MSG))
766 self.assertEqual(msg, MSG)
768 def _testSendtoAndRecv(self):
769 self.cli.sendto(MSG, 0, (HOST, self.port))
771 def testRecvFrom(self):
772 # Testing recvfrom() over UDP
773 msg, addr = self.serv.recvfrom(len(MSG))
774 self.assertEqual(msg, MSG)
776 def _testRecvFrom(self):
777 self.cli.sendto(MSG, 0, (HOST, self.port))
779 def testRecvFromNegative(self):
780 # Negative lengths passed to recvfrom should give ValueError.
781 self.assertRaises(ValueError, self.serv.recvfrom, -1)
783 def _testRecvFromNegative(self):
784 self.cli.sendto(MSG, 0, (HOST, self.port))
786 @unittest.skipUnless(thread, 'Threading required for this test.')
787 class TCPCloserTest(ThreadedTCPSocketTest):
790 conn, addr = self.serv.accept()
794 read, write, err = select.select([sd], [], [], 1.0)
795 self.assertEqual(read, [sd])
796 self.assertEqual(sd.recv(1), '')
798 def _testClose(self):
799 self.cli.connect((HOST, self.port))
802 @unittest.skipUnless(thread, 'Threading required for this test.')
803 class BasicSocketPairTest(SocketPairTest):
805 def __init__(self, methodName='runTest'):
806 SocketPairTest.__init__(self, methodName=methodName)
809 msg = self.serv.recv(1024)
810 self.assertEqual(msg, MSG)
819 msg = self.cli.recv(1024)
820 self.assertEqual(msg, MSG)
822 @unittest.skipUnless(thread, 'Threading required for this test.')
823 class NonBlockingTCPTests(ThreadedTCPSocketTest):
825 def __init__(self, methodName='runTest'):
826 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
828 def testSetBlocking(self):
829 # Testing whether set blocking works
830 self.serv.setblocking(0)
837 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
839 def _testSetBlocking(self):
842 def testAccept(self):
843 # Testing non-blocking accept
844 self.serv.setblocking(0)
846 conn, addr = self.serv.accept()
850 self.fail("Error trying to do non-blocking accept.")
851 read, write, err = select.select([self.serv], [], [])
852 if self.serv in read:
853 conn, addr = self.serv.accept()
856 self.fail("Error trying to do accept after select.")
858 def _testAccept(self):
860 self.cli.connect((HOST, self.port))
862 def testConnect(self):
863 # Testing non-blocking connect
864 conn, addr = self.serv.accept()
867 def _testConnect(self):
868 self.cli.settimeout(10)
869 self.cli.connect((HOST, self.port))
872 # Testing non-blocking recv
873 conn, addr = self.serv.accept()
876 msg = conn.recv(len(MSG))
880 self.fail("Error trying to do non-blocking recv.")
881 read, write, err = select.select([conn], [], [])
883 msg = conn.recv(len(MSG))
885 self.assertEqual(msg, MSG)
887 self.fail("Error during select call to non-blocking socket.")
890 self.cli.connect((HOST, self.port))
894 @unittest.skipUnless(thread, 'Threading required for this test.')
895 class FileObjectClassTestCase(SocketConnectedTest):
897 bufsize = -1 # Use default buffer size
899 def __init__(self, methodName='runTest'):
900 SocketConnectedTest.__init__(self, methodName=methodName)
903 SocketConnectedTest.setUp(self)
904 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
907 self.serv_file.close()
908 self.assertTrue(self.serv_file.closed)
909 self.serv_file = None
910 SocketConnectedTest.tearDown(self)
912 def clientSetUp(self):
913 SocketConnectedTest.clientSetUp(self)
914 self.cli_file = self.serv_conn.makefile('wb')
916 def clientTearDown(self):
917 self.cli_file.close()
918 self.assertTrue(self.cli_file.closed)
920 SocketConnectedTest.clientTearDown(self)
922 def testSmallRead(self):
923 # Performing small file read test
924 first_seg = self.serv_file.read(len(MSG)-3)
925 second_seg = self.serv_file.read(3)
926 msg = first_seg + second_seg
927 self.assertEqual(msg, MSG)
929 def _testSmallRead(self):
930 self.cli_file.write(MSG)
931 self.cli_file.flush()
933 def testFullRead(self):
935 msg = self.serv_file.read()
936 self.assertEqual(msg, MSG)
938 def _testFullRead(self):
939 self.cli_file.write(MSG)
940 self.cli_file.close()
942 def testUnbufferedRead(self):
943 # Performing unbuffered file read test
946 char = self.serv_file.read(1)
950 self.assertEqual(buf, MSG)
952 def _testUnbufferedRead(self):
953 self.cli_file.write(MSG)
954 self.cli_file.flush()
956 def testReadline(self):
957 # Performing file readline test
958 line = self.serv_file.readline()
959 self.assertEqual(line, MSG)
961 def _testReadline(self):
962 self.cli_file.write(MSG)
963 self.cli_file.flush()
965 def testReadlineAfterRead(self):
966 a_baloo_is = self.serv_file.read(len("A baloo is"))
967 self.assertEqual("A baloo is", a_baloo_is)
968 _a_bear = self.serv_file.read(len(" a bear"))
969 self.assertEqual(" a bear", _a_bear)
970 line = self.serv_file.readline()
971 self.assertEqual("\n", line)
972 line = self.serv_file.readline()
973 self.assertEqual("A BALOO IS A BEAR.\n", line)
974 line = self.serv_file.readline()
975 self.assertEqual(MSG, line)
977 def _testReadlineAfterRead(self):
978 self.cli_file.write("A baloo is a bear\n")
979 self.cli_file.write("A BALOO IS A BEAR.\n")
980 self.cli_file.write(MSG)
981 self.cli_file.flush()
983 def testReadlineAfterReadNoNewline(self):
984 end_of_ = self.serv_file.read(len("End Of "))
985 self.assertEqual("End Of ", end_of_)
986 line = self.serv_file.readline()
987 self.assertEqual("Line", line)
989 def _testReadlineAfterReadNoNewline(self):
990 self.cli_file.write("End Of Line")
992 def testClosedAttr(self):
993 self.assertTrue(not self.serv_file.closed)
995 def _testClosedAttr(self):
996 self.assertTrue(not self.cli_file.closed)
999 class FileObjectInterruptedTestCase(unittest.TestCase):
1000 """Test that the file object correctly handles EINTR internally."""
1002 class MockSocket(object):
1003 def __init__(self, recv_funcs=()):
1004 # A generator that returns callables that we'll call for each
1006 self._recv_step = iter(recv_funcs)
1008 def recv(self, size):
1009 return self._recv_step.next()()
1013 raise socket.error(errno.EINTR)
1015 def _test_readline(self, size=-1, **kwargs):
1016 mock_sock = self.MockSocket(recv_funcs=[
1017 lambda : "This is the first line\nAnd the sec",
1019 lambda : "ond line is here\n",
1022 fo = socket._fileobject(mock_sock, **kwargs)
1023 self.assertEqual(fo.readline(size), "This is the first line\n")
1024 self.assertEqual(fo.readline(size), "And the second line is here\n")
1026 def _test_read(self, size=-1, **kwargs):
1027 mock_sock = self.MockSocket(recv_funcs=[
1028 lambda : "This is the first line\nAnd the sec",
1030 lambda : "ond line is here\n",
1033 fo = socket._fileobject(mock_sock, **kwargs)
1034 self.assertEqual(fo.read(size), "This is the first line\n"
1035 "And the second line is here\n")
1037 def test_default(self):
1038 self._test_readline()
1039 self._test_readline(size=100)
1041 self._test_read(size=100)
1043 def test_with_1k_buffer(self):
1044 self._test_readline(bufsize=1024)
1045 self._test_readline(size=100, bufsize=1024)
1046 self._test_read(bufsize=1024)
1047 self._test_read(size=100, bufsize=1024)
1049 def _test_readline_no_buffer(self, size=-1):
1050 mock_sock = self.MockSocket(recv_funcs=[
1058 fo = socket._fileobject(mock_sock, bufsize=0)
1059 self.assertEqual(fo.readline(size), "aa\n")
1060 self.assertEqual(fo.readline(size), "BBbb")
1062 def test_no_buffer(self):
1063 self._test_readline_no_buffer()
1064 self._test_readline_no_buffer(size=4)
1065 self._test_read(bufsize=0)
1066 self._test_read(size=100, bufsize=0)
1069 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
1071 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
1073 In this case (and in this case only), it should be possible to
1074 create a file object, read a line from it, create another file
1075 object, read another line from it, without loss of data in the
1076 first file object's buffer. Note that httplib relies on this
1077 when reading multiple requests from the same socket."""
1079 bufsize = 0 # Use unbuffered mode
1081 def testUnbufferedReadline(self):
1082 # Read a line, create a new file object, read another line with it
1083 line = self.serv_file.readline() # first line
1084 self.assertEqual(line, "A. " + MSG) # first line
1085 self.serv_file = self.cli_conn.makefile('rb', 0)
1086 line = self.serv_file.readline() # second line
1087 self.assertEqual(line, "B. " + MSG) # second line
1089 def _testUnbufferedReadline(self):
1090 self.cli_file.write("A. " + MSG)
1091 self.cli_file.write("B. " + MSG)
1092 self.cli_file.flush()
1094 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1096 bufsize = 1 # Default-buffered for reading; line-buffered for writing
1099 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1101 bufsize = 2 # Exercise the buffering code
1104 class NetworkConnectionTest(object):
1105 """Prove network connection."""
1106 def clientSetUp(self):
1107 # We're inherited below by BasicTCPTest2, which also inherits
1108 # BasicTCPTest, which defines self.port referenced below.
1109 self.cli = socket.create_connection((HOST, self.port))
1110 self.serv_conn = self.cli
1112 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
1113 """Tests that NetworkConnection does not break existing TCP functionality.
1116 class NetworkConnectionNoServer(unittest.TestCase):
1117 class MockSocket(socket.socket):
1118 def connect(self, *args):
1119 raise socket.timeout('timed out')
1121 @contextlib.contextmanager
1122 def mocked_socket_module(self):
1123 """Return a socket which times out on connect"""
1124 old_socket = socket.socket
1125 socket.socket = self.MockSocket
1129 socket.socket = old_socket
1131 def test_connect(self):
1132 port = test_support.find_unused_port()
1133 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1134 self.addCleanup(cli.close)
1135 with self.assertRaises(socket.error) as cm:
1136 cli.connect((HOST, port))
1137 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
1139 def test_create_connection(self):
1140 # Issue #9792: errors raised by create_connection() should have
1141 # a proper errno attribute.
1142 port = test_support.find_unused_port()
1143 with self.assertRaises(socket.error) as cm:
1144 socket.create_connection((HOST, port))
1145 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
1147 def test_create_connection_timeout(self):
1148 # Issue #9792: create_connection() should not recast timeout errors
1149 # as generic socket errors.
1150 with self.mocked_socket_module():
1151 with self.assertRaises(socket.timeout):
1152 socket.create_connection((HOST, 1234))
1155 @unittest.skipUnless(thread, 'Threading required for this test.')
1156 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
1158 def __init__(self, methodName='runTest'):
1159 SocketTCPTest.__init__(self, methodName=methodName)
1160 ThreadableTest.__init__(self)
1162 def clientSetUp(self):
1163 self.source_port = test_support.find_unused_port()
1165 def clientTearDown(self):
1168 ThreadableTest.clientTearDown(self)
1170 def _justAccept(self):
1171 conn, addr = self.serv.accept()
1174 testFamily = _justAccept
1175 def _testFamily(self):
1176 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1177 self.addCleanup(self.cli.close)
1178 self.assertEqual(self.cli.family, 2)
1180 testSourceAddress = _justAccept
1181 def _testSourceAddress(self):
1182 self.cli = socket.create_connection((HOST, self.port), timeout=30,
1183 source_address=('', self.source_port))
1184 self.addCleanup(self.cli.close)
1185 self.assertEqual(self.cli.getsockname()[1], self.source_port)
1186 # The port number being used is sufficient to show that the bind()
1189 testTimeoutDefault = _justAccept
1190 def _testTimeoutDefault(self):
1191 # passing no explicit timeout uses socket's global default
1192 self.assertTrue(socket.getdefaulttimeout() is None)
1193 socket.setdefaulttimeout(42)
1195 self.cli = socket.create_connection((HOST, self.port))
1196 self.addCleanup(self.cli.close)
1198 socket.setdefaulttimeout(None)
1199 self.assertEqual(self.cli.gettimeout(), 42)
1201 testTimeoutNone = _justAccept
1202 def _testTimeoutNone(self):
1203 # None timeout means the same as sock.settimeout(None)
1204 self.assertTrue(socket.getdefaulttimeout() is None)
1205 socket.setdefaulttimeout(30)
1207 self.cli = socket.create_connection((HOST, self.port), timeout=None)
1208 self.addCleanup(self.cli.close)
1210 socket.setdefaulttimeout(None)
1211 self.assertEqual(self.cli.gettimeout(), None)
1213 testTimeoutValueNamed = _justAccept
1214 def _testTimeoutValueNamed(self):
1215 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1216 self.assertEqual(self.cli.gettimeout(), 30)
1218 testTimeoutValueNonamed = _justAccept
1219 def _testTimeoutValueNonamed(self):
1220 self.cli = socket.create_connection((HOST, self.port), 30)
1221 self.addCleanup(self.cli.close)
1222 self.assertEqual(self.cli.gettimeout(), 30)
1224 @unittest.skipUnless(thread, 'Threading required for this test.')
1225 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1227 def __init__(self, methodName='runTest'):
1228 SocketTCPTest.__init__(self, methodName=methodName)
1229 ThreadableTest.__init__(self)
1231 def clientSetUp(self):
1234 def clientTearDown(self):
1237 ThreadableTest.clientTearDown(self)
1239 def testInsideTimeout(self):
1240 conn, addr = self.serv.accept()
1241 self.addCleanup(conn.close)
1244 testOutsideTimeout = testInsideTimeout
1246 def _testInsideTimeout(self):
1247 self.cli = sock = socket.create_connection((HOST, self.port))
1249 self.assertEqual(data, "done!")
1251 def _testOutsideTimeout(self):
1252 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1253 self.assertRaises(socket.timeout, lambda: sock.recv(5))
1256 class Urllib2FileobjectTest(unittest.TestCase):
1258 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1259 # it close the socket if the close c'tor argument is true
1261 def testClose(self):
1264 def flush(self): pass
1265 def close(self): self.closed = True
1267 # must not close unless we request it: the original use of _fileobject
1268 # by module socket requires that the underlying socket not be closed until
1269 # the _socketobject that created the _fileobject is closed
1271 f = socket._fileobject(s)
1273 self.assertTrue(not s.closed)
1276 f = socket._fileobject(s, close=True)
1278 self.assertTrue(s.closed)
1280 class TCPTimeoutTest(SocketTCPTest):
1282 def testTCPTimeout(self):
1283 def raise_timeout(*args, **kwargs):
1284 self.serv.settimeout(1.0)
1286 self.assertRaises(socket.timeout, raise_timeout,
1287 "Error generating a timeout exception (TCP)")
1289 def testTimeoutZero(self):
1292 self.serv.settimeout(0.0)
1293 foo = self.serv.accept()
1294 except socket.timeout:
1295 self.fail("caught timeout instead of error (TCP)")
1296 except socket.error:
1299 self.fail("caught unexpected exception (TCP)")
1301 self.fail("accept() returned success when we did not expect it")
1303 def testInterruptedTimeout(self):
1304 # XXX I don't know how to do this test on MSWindows or any other
1305 # plaform that doesn't support signal.alarm() or os.kill(), though
1306 # the bug should have existed on all platforms.
1307 if not hasattr(signal, "alarm"):
1308 return # can only test on *nix
1309 self.serv.settimeout(5.0) # must be longer than alarm
1310 class Alarm(Exception):
1312 def alarm_handler(signal, frame):
1314 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1316 signal.alarm(2) # POSIX allows alarm to be up to 1 second early
1318 foo = self.serv.accept()
1319 except socket.timeout:
1320 self.fail("caught timeout instead of Alarm")
1324 self.fail("caught other exception instead of Alarm:"
1326 (sys.exc_info()[:2] + (traceback.format_exc(),)))
1328 self.fail("nothing caught")
1330 signal.alarm(0) # shut off alarm
1332 self.fail("got Alarm in wrong place")
1334 # no alarm can be pending. Safe to restore old handler.
1335 signal.signal(signal.SIGALRM, old_alarm)
1337 class UDPTimeoutTest(SocketTCPTest):
1339 def testUDPTimeout(self):
1340 def raise_timeout(*args, **kwargs):
1341 self.serv.settimeout(1.0)
1342 self.serv.recv(1024)
1343 self.assertRaises(socket.timeout, raise_timeout,
1344 "Error generating a timeout exception (UDP)")
1346 def testTimeoutZero(self):
1349 self.serv.settimeout(0.0)
1350 foo = self.serv.recv(1024)
1351 except socket.timeout:
1352 self.fail("caught timeout instead of error (UDP)")
1353 except socket.error:
1356 self.fail("caught unexpected exception (UDP)")
1358 self.fail("recv() returned success when we did not expect it")
1360 class TestExceptions(unittest.TestCase):
1362 def testExceptionTree(self):
1363 self.assertTrue(issubclass(socket.error, Exception))
1364 self.assertTrue(issubclass(socket.herror, socket.error))
1365 self.assertTrue(issubclass(socket.gaierror, socket.error))
1366 self.assertTrue(issubclass(socket.timeout, socket.error))
1368 class TestLinuxAbstractNamespace(unittest.TestCase):
1372 def testLinuxAbstractNamespace(self):
1373 address = "\x00python-test-hello\x00\xff"
1374 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1377 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1378 s2.connect(s1.getsockname())
1380 self.assertEqual(s1.getsockname(), address)
1381 self.assertEqual(s2.getpeername(), address)
1383 def testMaxName(self):
1384 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1385 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1387 self.assertEqual(s.getsockname(), address)
1389 def testNameOverflow(self):
1390 address = "\x00" + "h" * self.UNIX_PATH_MAX
1391 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1392 self.assertRaises(socket.error, s.bind, address)
1395 @unittest.skipUnless(thread, 'Threading required for this test.')
1396 class BufferIOTest(SocketConnectedTest):
1398 Test the buffer versions of socket.recv() and socket.send().
1400 def __init__(self, methodName='runTest'):
1401 SocketConnectedTest.__init__(self, methodName=methodName)
1403 def testRecvIntoArray(self):
1404 buf = array.array('c', ' '*1024)
1405 nbytes = self.cli_conn.recv_into(buf)
1406 self.assertEqual(nbytes, len(MSG))
1407 msg = buf.tostring()[:len(MSG)]
1408 self.assertEqual(msg, MSG)
1410 def _testRecvIntoArray(self):
1411 with test_support.check_py3k_warnings():
1413 self.serv_conn.send(buf)
1415 def testRecvIntoBytearray(self):
1416 buf = bytearray(1024)
1417 nbytes = self.cli_conn.recv_into(buf)
1418 self.assertEqual(nbytes, len(MSG))
1419 msg = buf[:len(MSG)]
1420 self.assertEqual(msg, MSG)
1422 _testRecvIntoBytearray = _testRecvIntoArray
1424 def testRecvIntoMemoryview(self):
1425 buf = bytearray(1024)
1426 nbytes = self.cli_conn.recv_into(memoryview(buf))
1427 self.assertEqual(nbytes, len(MSG))
1428 msg = buf[:len(MSG)]
1429 self.assertEqual(msg, MSG)
1431 _testRecvIntoMemoryview = _testRecvIntoArray
1433 def testRecvFromIntoArray(self):
1434 buf = array.array('c', ' '*1024)
1435 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1436 self.assertEqual(nbytes, len(MSG))
1437 msg = buf.tostring()[:len(MSG)]
1438 self.assertEqual(msg, MSG)
1440 def _testRecvFromIntoArray(self):
1441 with test_support.check_py3k_warnings():
1443 self.serv_conn.send(buf)
1445 def testRecvFromIntoBytearray(self):
1446 buf = bytearray(1024)
1447 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1448 self.assertEqual(nbytes, len(MSG))
1449 msg = buf[:len(MSG)]
1450 self.assertEqual(msg, MSG)
1452 _testRecvFromIntoBytearray = _testRecvFromIntoArray
1454 def testRecvFromIntoMemoryview(self):
1455 buf = bytearray(1024)
1456 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
1457 self.assertEqual(nbytes, len(MSG))
1458 msg = buf[:len(MSG)]
1459 self.assertEqual(msg, MSG)
1461 _testRecvFromIntoMemoryview = _testRecvFromIntoArray
1468 def isTipcAvailable():
1469 """Check if the TIPC module is loaded
1471 The TIPC module is not loaded automatically on Ubuntu and probably
1472 other Linux distros.
1474 if not hasattr(socket, "AF_TIPC"):
1476 if not os.path.isfile("/proc/modules"):
1478 with open("/proc/modules") as f:
1480 if line.startswith("tipc "):
1482 if test_support.verbose:
1483 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1486 class TIPCTest (unittest.TestCase):
1488 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1489 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1491 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1492 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1493 TIPC_LOWER, TIPC_UPPER)
1496 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1497 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1498 cli.sendto(MSG, sendaddr)
1500 msg, recvaddr = srv.recvfrom(1024)
1502 self.assertEqual(cli.getsockname(), recvaddr)
1503 self.assertEqual(msg, MSG)
1506 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1507 def __init__(self, methodName = 'runTest'):
1508 unittest.TestCase.__init__(self, methodName = methodName)
1509 ThreadableTest.__init__(self)
1512 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1513 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1514 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1515 TIPC_LOWER, TIPC_UPPER)
1516 self.srv.bind(srvaddr)
1518 self.serverExplicitReady()
1519 self.conn, self.connaddr = self.srv.accept()
1521 def clientSetUp(self):
1522 # The is a hittable race between serverExplicitReady() and the
1523 # accept() call; sleep a little while to avoid it, otherwise
1524 # we could get an exception
1526 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1527 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1528 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1529 self.cli.connect(addr)
1530 self.cliaddr = self.cli.getsockname()
1532 def testStream(self):
1533 msg = self.conn.recv(1024)
1534 self.assertEqual(msg, MSG)
1535 self.assertEqual(self.cliaddr, self.connaddr)
1537 def _testStream(self):
1543 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1544 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
1548 NonBlockingTCPTests,
1549 FileObjectClassTestCase,
1550 FileObjectInterruptedTestCase,
1551 UnbufferedFileObjectClassTestCase,
1552 LineBufferedFileObjectClassTestCase,
1553 SmallBufferedFileObjectClassTestCase,
1554 Urllib2FileobjectTest,
1555 NetworkConnectionNoServer,
1556 NetworkConnectionAttributesTest,
1557 NetworkConnectionBehaviourTest,
1559 if hasattr(socket, "socketpair"):
1560 tests.append(BasicSocketPairTest)
1561 if sys.platform == 'linux2':
1562 tests.append(TestLinuxAbstractNamespace)
1563 if isTipcAvailable():
1564 tests.append(TIPCTest)
1565 tests.append(TIPCThreadableTest)
1567 thread_info = test_support.threading_setup()
1568 test_support.run_unittest(*tests)
1569 test_support.threading_cleanup(*thread_info)
1571 if __name__ == "__main__":