621733402994a7accef1a7e5968d1514777ea209
[profile/ivi/python.git] / Lib / test / test_asyncore.py
1 import asyncore
2 import unittest
3 import select
4 import os
5 import socket
6 import sys
7 import time
8 import warnings
9 import errno
10
11 from test import test_support
12 from test.test_support import TESTFN, run_unittest, unlink
13 from StringIO import StringIO
14
15 try:
16     import threading
17 except ImportError:
18     threading = None
19
20 HOST = test_support.HOST
21
22 class dummysocket:
23     def __init__(self):
24         self.closed = False
25
26     def close(self):
27         self.closed = True
28
29     def fileno(self):
30         return 42
31
32 class dummychannel:
33     def __init__(self):
34         self.socket = dummysocket()
35
36     def close(self):
37         self.socket.close()
38
39 class exitingdummy:
40     def __init__(self):
41         pass
42
43     def handle_read_event(self):
44         raise asyncore.ExitNow()
45
46     handle_write_event = handle_read_event
47     handle_close = handle_read_event
48     handle_expt_event = handle_read_event
49
50 class crashingdummy:
51     def __init__(self):
52         self.error_handled = False
53
54     def handle_read_event(self):
55         raise Exception()
56
57     handle_write_event = handle_read_event
58     handle_close = handle_read_event
59     handle_expt_event = handle_read_event
60
61     def handle_error(self):
62         self.error_handled = True
63
64 # used when testing senders; just collects what it gets until newline is sent
65 def capture_server(evt, buf, serv):
66     try:
67         serv.listen(5)
68         conn, addr = serv.accept()
69     except socket.timeout:
70         pass
71     else:
72         n = 200
73         while n > 0:
74             r, w, e = select.select([conn], [], [])
75             if r:
76                 data = conn.recv(10)
77                 # keep everything except for the newline terminator
78                 buf.write(data.replace('\n', ''))
79                 if '\n' in data:
80                     break
81             n -= 1
82             time.sleep(0.01)
83
84         conn.close()
85     finally:
86         serv.close()
87         evt.set()
88
89
90 class HelperFunctionTests(unittest.TestCase):
91     def test_readwriteexc(self):
92         # Check exception handling behavior of read, write and _exception
93
94         # check that ExitNow exceptions in the object handler method
95         # bubbles all the way up through asyncore read/write/_exception calls
96         tr1 = exitingdummy()
97         self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
98         self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
99         self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
100
101         # check that an exception other than ExitNow in the object handler
102         # method causes the handle_error method to get called
103         tr2 = crashingdummy()
104         asyncore.read(tr2)
105         self.assertEqual(tr2.error_handled, True)
106
107         tr2 = crashingdummy()
108         asyncore.write(tr2)
109         self.assertEqual(tr2.error_handled, True)
110
111         tr2 = crashingdummy()
112         asyncore._exception(tr2)
113         self.assertEqual(tr2.error_handled, True)
114
115     # asyncore.readwrite uses constants in the select module that
116     # are not present in Windows systems (see this thread:
117     # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
118     # These constants should be present as long as poll is available
119
120     @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
121     def test_readwrite(self):
122         # Check that correct methods are called by readwrite()
123
124         attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
125
126         expected = (
127             (select.POLLIN, 'read'),
128             (select.POLLPRI, 'expt'),
129             (select.POLLOUT, 'write'),
130             (select.POLLERR, 'closed'),
131             (select.POLLHUP, 'closed'),
132             (select.POLLNVAL, 'closed'),
133             )
134
135         class testobj:
136             def __init__(self):
137                 self.read = False
138                 self.write = False
139                 self.closed = False
140                 self.expt = False
141                 self.error_handled = False
142
143             def handle_read_event(self):
144                 self.read = True
145
146             def handle_write_event(self):
147                 self.write = True
148
149             def handle_close(self):
150                 self.closed = True
151
152             def handle_expt_event(self):
153                 self.expt = True
154
155             def handle_error(self):
156                 self.error_handled = True
157
158         for flag, expectedattr in expected:
159             tobj = testobj()
160             self.assertEqual(getattr(tobj, expectedattr), False)
161             asyncore.readwrite(tobj, flag)
162
163             # Only the attribute modified by the routine we expect to be
164             # called should be True.
165             for attr in attributes:
166                 self.assertEqual(getattr(tobj, attr), attr==expectedattr)
167
168             # check that ExitNow exceptions in the object handler method
169             # bubbles all the way up through asyncore readwrite call
170             tr1 = exitingdummy()
171             self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
172
173             # check that an exception other than ExitNow in the object handler
174             # method causes the handle_error method to get called
175             tr2 = crashingdummy()
176             self.assertEqual(tr2.error_handled, False)
177             asyncore.readwrite(tr2, flag)
178             self.assertEqual(tr2.error_handled, True)
179
180     def test_closeall(self):
181         self.closeall_check(False)
182
183     def test_closeall_default(self):
184         self.closeall_check(True)
185
186     def closeall_check(self, usedefault):
187         # Check that close_all() closes everything in a given map
188
189         l = []
190         testmap = {}
191         for i in range(10):
192             c = dummychannel()
193             l.append(c)
194             self.assertEqual(c.socket.closed, False)
195             testmap[i] = c
196
197         if usedefault:
198             socketmap = asyncore.socket_map
199             try:
200                 asyncore.socket_map = testmap
201                 asyncore.close_all()
202             finally:
203                 testmap, asyncore.socket_map = asyncore.socket_map, socketmap
204         else:
205             asyncore.close_all(testmap)
206
207         self.assertEqual(len(testmap), 0)
208
209         for c in l:
210             self.assertEqual(c.socket.closed, True)
211
212     def test_compact_traceback(self):
213         try:
214             raise Exception("I don't like spam!")
215         except:
216             real_t, real_v, real_tb = sys.exc_info()
217             r = asyncore.compact_traceback()
218         else:
219             self.fail("Expected exception")
220
221         (f, function, line), t, v, info = r
222         self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
223         self.assertEqual(function, 'test_compact_traceback')
224         self.assertEqual(t, real_t)
225         self.assertEqual(v, real_v)
226         self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
227
228
229 class DispatcherTests(unittest.TestCase):
230     def setUp(self):
231         pass
232
233     def tearDown(self):
234         asyncore.close_all()
235
236     def test_basic(self):
237         d = asyncore.dispatcher()
238         self.assertEqual(d.readable(), True)
239         self.assertEqual(d.writable(), True)
240
241     def test_repr(self):
242         d = asyncore.dispatcher()
243         self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
244
245     def test_log(self):
246         d = asyncore.dispatcher()
247
248         # capture output of dispatcher.log() (to stderr)
249         fp = StringIO()
250         stderr = sys.stderr
251         l1 = "Lovely spam! Wonderful spam!"
252         l2 = "I don't like spam!"
253         try:
254             sys.stderr = fp
255             d.log(l1)
256             d.log(l2)
257         finally:
258             sys.stderr = stderr
259
260         lines = fp.getvalue().splitlines()
261         self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
262
263     def test_log_info(self):
264         d = asyncore.dispatcher()
265
266         # capture output of dispatcher.log_info() (to stdout via print)
267         fp = StringIO()
268         stdout = sys.stdout
269         l1 = "Have you got anything without spam?"
270         l2 = "Why can't she have egg bacon spam and sausage?"
271         l3 = "THAT'S got spam in it!"
272         try:
273             sys.stdout = fp
274             d.log_info(l1, 'EGGS')
275             d.log_info(l2)
276             d.log_info(l3, 'SPAM')
277         finally:
278             sys.stdout = stdout
279
280         lines = fp.getvalue().splitlines()
281         expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
282
283         self.assertEqual(lines, expected)
284
285     def test_unhandled(self):
286         d = asyncore.dispatcher()
287         d.ignore_log_types = ()
288
289         # capture output of dispatcher.log_info() (to stdout via print)
290         fp = StringIO()
291         stdout = sys.stdout
292         try:
293             sys.stdout = fp
294             d.handle_expt()
295             d.handle_read()
296             d.handle_write()
297             d.handle_connect()
298             d.handle_accept()
299         finally:
300             sys.stdout = stdout
301
302         lines = fp.getvalue().splitlines()
303         expected = ['warning: unhandled incoming priority event',
304                     'warning: unhandled read event',
305                     'warning: unhandled write event',
306                     'warning: unhandled connect event',
307                     'warning: unhandled accept event']
308         self.assertEqual(lines, expected)
309
310     def test_issue_8594(self):
311         # XXX - this test is supposed to be removed in next major Python
312         # version
313         d = asyncore.dispatcher(socket.socket())
314         # make sure the error message no longer refers to the socket
315         # object but the dispatcher instance instead
316         self.assertRaisesRegexp(AttributeError, 'dispatcher instance',
317                                 getattr, d, 'foo')
318         # cheap inheritance with the underlying socket is supposed
319         # to still work but a DeprecationWarning is expected
320         with warnings.catch_warnings(record=True) as w:
321             warnings.simplefilter("always")
322             family = d.family
323             self.assertEqual(family, socket.AF_INET)
324             self.assertEqual(len(w), 1)
325             self.assertTrue(issubclass(w[0].category, DeprecationWarning))
326
327     def test_strerror(self):
328         # refers to bug #8573
329         err = asyncore._strerror(errno.EPERM)
330         if hasattr(os, 'strerror'):
331             self.assertEqual(err, os.strerror(errno.EPERM))
332         err = asyncore._strerror(-1)
333         self.assertIn("unknown error", err.lower())
334
335
336 class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
337     def readable(self):
338         return False
339
340     def handle_connect(self):
341         pass
342
343 class DispatcherWithSendTests(unittest.TestCase):
344     usepoll = False
345
346     def setUp(self):
347         pass
348
349     def tearDown(self):
350         asyncore.close_all()
351
352     @unittest.skipUnless(threading, 'Threading required for this test.')
353     @test_support.reap_threads
354     def test_send(self):
355         evt = threading.Event()
356         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
357         sock.settimeout(3)
358         port = test_support.bind_port(sock)
359
360         cap = StringIO()
361         args = (evt, cap, sock)
362         t = threading.Thread(target=capture_server, args=args)
363         t.start()
364         try:
365             # wait a little longer for the server to initialize (it sometimes
366             # refuses connections on slow machines without this wait)
367             time.sleep(0.2)
368
369             data = "Suppose there isn't a 16-ton weight?"
370             d = dispatcherwithsend_noread()
371             d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
372             d.connect((HOST, port))
373
374             # give time for socket to connect
375             time.sleep(0.1)
376
377             d.send(data)
378             d.send(data)
379             d.send('\n')
380
381             n = 1000
382             while d.out_buffer and n > 0:
383                 asyncore.poll()
384                 n -= 1
385
386             evt.wait()
387
388             self.assertEqual(cap.getvalue(), data*2)
389         finally:
390             t.join()
391
392
393 class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
394     usepoll = True
395
396 @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
397                      'asyncore.file_wrapper required')
398 class FileWrapperTest(unittest.TestCase):
399     def setUp(self):
400         self.d = "It's not dead, it's sleeping!"
401         file(TESTFN, 'w').write(self.d)
402
403     def tearDown(self):
404         unlink(TESTFN)
405
406     def test_recv(self):
407         fd = os.open(TESTFN, os.O_RDONLY)
408         w = asyncore.file_wrapper(fd)
409         os.close(fd)
410
411         self.assertNotEqual(w.fd, fd)
412         self.assertNotEqual(w.fileno(), fd)
413         self.assertEqual(w.recv(13), "It's not dead")
414         self.assertEqual(w.read(6), ", it's")
415         w.close()
416         self.assertRaises(OSError, w.read, 1)
417
418
419     def test_send(self):
420         d1 = "Come again?"
421         d2 = "I want to buy some cheese."
422         fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
423         w = asyncore.file_wrapper(fd)
424         os.close(fd)
425
426         w.write(d1)
427         w.send(d2)
428         w.close()
429         self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
430
431     @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
432                          'asyncore.file_dispatcher required')
433     def test_dispatcher(self):
434         fd = os.open(TESTFN, os.O_RDONLY)
435         data = []
436         class FileDispatcher(asyncore.file_dispatcher):
437             def handle_read(self):
438                 data.append(self.recv(29))
439         s = FileDispatcher(fd)
440         os.close(fd)
441         asyncore.loop(timeout=0.01, use_poll=True, count=2)
442         self.assertEqual(b"".join(data), self.d)
443
444
445 class BaseTestHandler(asyncore.dispatcher):
446
447     def __init__(self, sock=None):
448         asyncore.dispatcher.__init__(self, sock)
449         self.flag = False
450
451     def handle_accept(self):
452         raise Exception("handle_accept not supposed to be called")
453
454     def handle_connect(self):
455         raise Exception("handle_connect not supposed to be called")
456
457     def handle_expt(self):
458         raise Exception("handle_expt not supposed to be called")
459
460     def handle_close(self):
461         raise Exception("handle_close not supposed to be called")
462
463     def handle_error(self):
464         raise
465
466
467 class TCPServer(asyncore.dispatcher):
468     """A server which listens on an address and dispatches the
469     connection to a handler.
470     """
471
472     def __init__(self, handler=BaseTestHandler, host=HOST, port=0):
473         asyncore.dispatcher.__init__(self)
474         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
475         self.set_reuse_addr()
476         self.bind((host, port))
477         self.listen(5)
478         self.handler = handler
479
480     @property
481     def address(self):
482         return self.socket.getsockname()[:2]
483
484     def handle_accept(self):
485         sock, addr = self.accept()
486         self.handler(sock)
487
488     def handle_error(self):
489         raise
490
491
492 class BaseClient(BaseTestHandler):
493
494     def __init__(self, address):
495         BaseTestHandler.__init__(self)
496         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
497         self.connect(address)
498
499     def handle_connect(self):
500         pass
501
502
503 class BaseTestAPI(unittest.TestCase):
504
505     def tearDown(self):
506         asyncore.close_all()
507
508     def loop_waiting_for_flag(self, instance, timeout=5):
509         timeout = float(timeout) / 100
510         count = 100
511         while asyncore.socket_map and count > 0:
512             asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
513             if instance.flag:
514                 return
515             count -= 1
516             time.sleep(timeout)
517         self.fail("flag not set")
518
519     def test_handle_connect(self):
520         # make sure handle_connect is called on connect()
521
522         class TestClient(BaseClient):
523             def handle_connect(self):
524                 self.flag = True
525
526         server = TCPServer()
527         client = TestClient(server.address)
528         self.loop_waiting_for_flag(client)
529
530     def test_handle_accept(self):
531         # make sure handle_accept() is called when a client connects
532
533         class TestListener(BaseTestHandler):
534
535             def __init__(self):
536                 BaseTestHandler.__init__(self)
537                 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
538                 self.bind((HOST, 0))
539                 self.listen(5)
540                 self.address = self.socket.getsockname()[:2]
541
542             def handle_accept(self):
543                 self.flag = True
544
545         server = TestListener()
546         client = BaseClient(server.address)
547         self.loop_waiting_for_flag(server)
548
549     def test_handle_read(self):
550         # make sure handle_read is called on data received
551
552         class TestClient(BaseClient):
553             def handle_read(self):
554                 self.flag = True
555
556         class TestHandler(BaseTestHandler):
557             def __init__(self, conn):
558                 BaseTestHandler.__init__(self, conn)
559                 self.send('x' * 1024)
560
561         server = TCPServer(TestHandler)
562         client = TestClient(server.address)
563         self.loop_waiting_for_flag(client)
564
565     def test_handle_write(self):
566         # make sure handle_write is called
567
568         class TestClient(BaseClient):
569             def handle_write(self):
570                 self.flag = True
571
572         server = TCPServer()
573         client = TestClient(server.address)
574         self.loop_waiting_for_flag(client)
575
576     def test_handle_close(self):
577         # make sure handle_close is called when the other end closes
578         # the connection
579
580         class TestClient(BaseClient):
581
582             def handle_read(self):
583                 # in order to make handle_close be called we are supposed
584                 # to make at least one recv() call
585                 self.recv(1024)
586
587             def handle_close(self):
588                 self.flag = True
589                 self.close()
590
591         class TestHandler(BaseTestHandler):
592             def __init__(self, conn):
593                 BaseTestHandler.__init__(self, conn)
594                 self.close()
595
596         server = TCPServer(TestHandler)
597         client = TestClient(server.address)
598         self.loop_waiting_for_flag(client)
599
600     @unittest.skipIf(sys.platform.startswith("sunos"),
601                      "OOB support is broken on Solaris")
602     def test_handle_expt(self):
603         # Make sure handle_expt is called on OOB data received.
604         # Note: this might fail on some platforms as OOB data is
605         # tenuously supported and rarely used.
606
607         class TestClient(BaseClient):
608             def handle_expt(self):
609                 self.flag = True
610
611         class TestHandler(BaseTestHandler):
612             def __init__(self, conn):
613                 BaseTestHandler.__init__(self, conn)
614                 self.socket.send(chr(244), socket.MSG_OOB)
615
616         server = TCPServer(TestHandler)
617         client = TestClient(server.address)
618         self.loop_waiting_for_flag(client)
619
620     def test_handle_error(self):
621
622         class TestClient(BaseClient):
623             def handle_write(self):
624                 1.0 / 0
625             def handle_error(self):
626                 self.flag = True
627                 try:
628                     raise
629                 except ZeroDivisionError:
630                     pass
631                 else:
632                     raise Exception("exception not raised")
633
634         server = TCPServer()
635         client = TestClient(server.address)
636         self.loop_waiting_for_flag(client)
637
638     def test_connection_attributes(self):
639         server = TCPServer()
640         client = BaseClient(server.address)
641
642         # we start disconnected
643         self.assertFalse(server.connected)
644         self.assertTrue(server.accepting)
645         # this can't be taken for granted across all platforms
646         #self.assertFalse(client.connected)
647         self.assertFalse(client.accepting)
648
649         # execute some loops so that client connects to server
650         asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
651         self.assertFalse(server.connected)
652         self.assertTrue(server.accepting)
653         self.assertTrue(client.connected)
654         self.assertFalse(client.accepting)
655
656         # disconnect the client
657         client.close()
658         self.assertFalse(server.connected)
659         self.assertTrue(server.accepting)
660         self.assertFalse(client.connected)
661         self.assertFalse(client.accepting)
662
663         # stop serving
664         server.close()
665         self.assertFalse(server.connected)
666         self.assertFalse(server.accepting)
667
668     def test_create_socket(self):
669         s = asyncore.dispatcher()
670         s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
671         self.assertEqual(s.socket.family, socket.AF_INET)
672         self.assertEqual(s.socket.type, socket.SOCK_STREAM)
673
674     def test_bind(self):
675         s1 = asyncore.dispatcher()
676         s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)
677         s1.bind((HOST, 0))
678         s1.listen(5)
679         port = s1.socket.getsockname()[1]
680
681         s2 = asyncore.dispatcher()
682         s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
683         # EADDRINUSE indicates the socket was correctly bound
684         self.assertRaises(socket.error, s2.bind, (HOST, port))
685
686     def test_set_reuse_addr(self):
687         sock = socket.socket()
688         try:
689             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
690         except socket.error:
691             unittest.skip("SO_REUSEADDR not supported on this platform")
692         else:
693             # if SO_REUSEADDR succeeded for sock we expect asyncore
694             # to do the same
695             s = asyncore.dispatcher(socket.socket())
696             self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
697                                                  socket.SO_REUSEADDR))
698             s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
699             s.set_reuse_addr()
700             self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
701                                                  socket.SO_REUSEADDR))
702         finally:
703             sock.close()
704
705
706 class TestAPI_UseSelect(BaseTestAPI):
707     use_poll = False
708
709 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
710 class TestAPI_UsePoll(BaseTestAPI):
711     use_poll = True
712
713
714 def test_main():
715     tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
716              DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,
717              TestAPI_UsePoll, FileWrapperTest]
718     run_unittest(*tests)
719
720 if __name__ == "__main__":
721     test_main()