Update to 2.7.3
[profile/ivi/python.git] / Lib / asyncore.py
1 # -*- Mode: Python -*-
2 #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 #   Author: Sam Rushing <rushing@nightmare.com>
4
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
7 #
8 #                         All Rights Reserved
9 #
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
18 #
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
27
28 """Basic infrastructure for asynchronous socket service clients and servers.
29
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time".  Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
38
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background."  Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
48
49 import select
50 import socket
51 import sys
52 import time
53 import warnings
54
55 import os
56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57      ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
58      errorcode
59
60 _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
61                            EBADF))
62
63 try:
64     socket_map
65 except NameError:
66     socket_map = {}
67
68 def _strerror(err):
69     try:
70         return os.strerror(err)
71     except (ValueError, OverflowError, NameError):
72         if err in errorcode:
73             return errorcode[err]
74         return "Unknown error %s" %err
75
76 class ExitNow(Exception):
77     pass
78
79 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
80
81 def read(obj):
82     try:
83         obj.handle_read_event()
84     except _reraised_exceptions:
85         raise
86     except:
87         obj.handle_error()
88
89 def write(obj):
90     try:
91         obj.handle_write_event()
92     except _reraised_exceptions:
93         raise
94     except:
95         obj.handle_error()
96
97 def _exception(obj):
98     try:
99         obj.handle_expt_event()
100     except _reraised_exceptions:
101         raise
102     except:
103         obj.handle_error()
104
105 def readwrite(obj, flags):
106     try:
107         if flags & select.POLLIN:
108             obj.handle_read_event()
109         if flags & select.POLLOUT:
110             obj.handle_write_event()
111         if flags & select.POLLPRI:
112             obj.handle_expt_event()
113         if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
114             obj.handle_close()
115     except socket.error, e:
116         if e.args[0] not in _DISCONNECTED:
117             obj.handle_error()
118         else:
119             obj.handle_close()
120     except _reraised_exceptions:
121         raise
122     except:
123         obj.handle_error()
124
125 def poll(timeout=0.0, map=None):
126     if map is None:
127         map = socket_map
128     if map:
129         r = []; w = []; e = []
130         for fd, obj in map.items():
131             is_r = obj.readable()
132             is_w = obj.writable()
133             if is_r:
134                 r.append(fd)
135             # accepting sockets should not be writable
136             if is_w and not obj.accepting:
137                 w.append(fd)
138             if is_r or is_w:
139                 e.append(fd)
140         if [] == r == w == e:
141             time.sleep(timeout)
142             return
143
144         try:
145             r, w, e = select.select(r, w, e, timeout)
146         except select.error, err:
147             if err.args[0] != EINTR:
148                 raise
149             else:
150                 return
151
152         for fd in r:
153             obj = map.get(fd)
154             if obj is None:
155                 continue
156             read(obj)
157
158         for fd in w:
159             obj = map.get(fd)
160             if obj is None:
161                 continue
162             write(obj)
163
164         for fd in e:
165             obj = map.get(fd)
166             if obj is None:
167                 continue
168             _exception(obj)
169
170 def poll2(timeout=0.0, map=None):
171     # Use the poll() support added to the select module in Python 2.0
172     if map is None:
173         map = socket_map
174     if timeout is not None:
175         # timeout is in milliseconds
176         timeout = int(timeout*1000)
177     pollster = select.poll()
178     if map:
179         for fd, obj in map.items():
180             flags = 0
181             if obj.readable():
182                 flags |= select.POLLIN | select.POLLPRI
183             # accepting sockets should not be writable
184             if obj.writable() and not obj.accepting:
185                 flags |= select.POLLOUT
186             if flags:
187                 # Only check for exceptions if object was either readable
188                 # or writable.
189                 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
190                 pollster.register(fd, flags)
191         try:
192             r = pollster.poll(timeout)
193         except select.error, err:
194             if err.args[0] != EINTR:
195                 raise
196             r = []
197         for fd, flags in r:
198             obj = map.get(fd)
199             if obj is None:
200                 continue
201             readwrite(obj, flags)
202
203 poll3 = poll2                           # Alias for backward compatibility
204
205 def loop(timeout=30.0, use_poll=False, map=None, count=None):
206     if map is None:
207         map = socket_map
208
209     if use_poll and hasattr(select, 'poll'):
210         poll_fun = poll2
211     else:
212         poll_fun = poll
213
214     if count is None:
215         while map:
216             poll_fun(timeout, map)
217
218     else:
219         while map and count > 0:
220             poll_fun(timeout, map)
221             count = count - 1
222
223 class dispatcher:
224
225     debug = False
226     connected = False
227     accepting = False
228     closing = False
229     addr = None
230     ignore_log_types = frozenset(['warning'])
231
232     def __init__(self, sock=None, map=None):
233         if map is None:
234             self._map = socket_map
235         else:
236             self._map = map
237
238         self._fileno = None
239
240         if sock:
241             # Set to nonblocking just to make sure for cases where we
242             # get a socket from a blocking source.
243             sock.setblocking(0)
244             self.set_socket(sock, map)
245             self.connected = True
246             # The constructor no longer requires that the socket
247             # passed be connected.
248             try:
249                 self.addr = sock.getpeername()
250             except socket.error, err:
251                 if err.args[0] == ENOTCONN:
252                     # To handle the case where we got an unconnected
253                     # socket.
254                     self.connected = False
255                 else:
256                     # The socket is broken in some unknown way, alert
257                     # the user and remove it from the map (to prevent
258                     # polling of broken sockets).
259                     self.del_channel(map)
260                     raise
261         else:
262             self.socket = None
263
264     def __repr__(self):
265         status = [self.__class__.__module__+"."+self.__class__.__name__]
266         if self.accepting and self.addr:
267             status.append('listening')
268         elif self.connected:
269             status.append('connected')
270         if self.addr is not None:
271             try:
272                 status.append('%s:%d' % self.addr)
273             except TypeError:
274                 status.append(repr(self.addr))
275         return '<%s at %#x>' % (' '.join(status), id(self))
276
277     __str__ = __repr__
278
279     def add_channel(self, map=None):
280         #self.log_info('adding channel %s' % self)
281         if map is None:
282             map = self._map
283         map[self._fileno] = self
284
285     def del_channel(self, map=None):
286         fd = self._fileno
287         if map is None:
288             map = self._map
289         if fd in map:
290             #self.log_info('closing channel %d:%s' % (fd, self))
291             del map[fd]
292         self._fileno = None
293
294     def create_socket(self, family, type):
295         self.family_and_type = family, type
296         sock = socket.socket(family, type)
297         sock.setblocking(0)
298         self.set_socket(sock)
299
300     def set_socket(self, sock, map=None):
301         self.socket = sock
302 ##        self.__dict__['socket'] = sock
303         self._fileno = sock.fileno()
304         self.add_channel(map)
305
306     def set_reuse_addr(self):
307         # try to re-use a server port if possible
308         try:
309             self.socket.setsockopt(
310                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
311                 self.socket.getsockopt(socket.SOL_SOCKET,
312                                        socket.SO_REUSEADDR) | 1
313                 )
314         except socket.error:
315             pass
316
317     # ==================================================
318     # predicates for select()
319     # these are used as filters for the lists of sockets
320     # to pass to select().
321     # ==================================================
322
323     def readable(self):
324         return True
325
326     def writable(self):
327         return True
328
329     # ==================================================
330     # socket object methods.
331     # ==================================================
332
333     def listen(self, num):
334         self.accepting = True
335         if os.name == 'nt' and num > 5:
336             num = 5
337         return self.socket.listen(num)
338
339     def bind(self, addr):
340         self.addr = addr
341         return self.socket.bind(addr)
342
343     def connect(self, address):
344         self.connected = False
345         err = self.socket.connect_ex(address)
346         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
347         or err == EINVAL and os.name in ('nt', 'ce'):
348             return
349         if err in (0, EISCONN):
350             self.addr = address
351             self.handle_connect_event()
352         else:
353             raise socket.error(err, errorcode[err])
354
355     def accept(self):
356         # XXX can return either an address pair or None
357         try:
358             conn, addr = self.socket.accept()
359         except TypeError:
360             return None
361         except socket.error as why:
362             if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
363                 return None
364             else:
365                 raise
366         else:
367             return conn, addr
368
369     def send(self, data):
370         try:
371             result = self.socket.send(data)
372             return result
373         except socket.error, why:
374             if why.args[0] == EWOULDBLOCK:
375                 return 0
376             elif why.args[0] in _DISCONNECTED:
377                 self.handle_close()
378                 return 0
379             else:
380                 raise
381
382     def recv(self, buffer_size):
383         try:
384             data = self.socket.recv(buffer_size)
385             if not data:
386                 # a closed connection is indicated by signaling
387                 # a read condition, and having recv() return 0.
388                 self.handle_close()
389                 return ''
390             else:
391                 return data
392         except socket.error, why:
393             # winsock sometimes throws ENOTCONN
394             if why.args[0] in _DISCONNECTED:
395                 self.handle_close()
396                 return ''
397             else:
398                 raise
399
400     def close(self):
401         self.connected = False
402         self.accepting = False
403         self.del_channel()
404         try:
405             self.socket.close()
406         except socket.error, why:
407             if why.args[0] not in (ENOTCONN, EBADF):
408                 raise
409
410     # cheap inheritance, used to pass all other attribute
411     # references to the underlying socket object.
412     def __getattr__(self, attr):
413         try:
414             retattr = getattr(self.socket, attr)
415         except AttributeError:
416             raise AttributeError("%s instance has no attribute '%s'"
417                                  %(self.__class__.__name__, attr))
418         else:
419             msg = "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \
420                   "instead." % {'me': self.__class__.__name__, 'attr':attr}
421             warnings.warn(msg, DeprecationWarning, stacklevel=2)
422             return retattr
423
424     # log and log_info may be overridden to provide more sophisticated
425     # logging and warning methods. In general, log is for 'hit' logging
426     # and 'log_info' is for informational, warning and error logging.
427
428     def log(self, message):
429         sys.stderr.write('log: %s\n' % str(message))
430
431     def log_info(self, message, type='info'):
432         if type not in self.ignore_log_types:
433             print '%s: %s' % (type, message)
434
435     def handle_read_event(self):
436         if self.accepting:
437             # accepting sockets are never connected, they "spawn" new
438             # sockets that are connected
439             self.handle_accept()
440         elif not self.connected:
441             self.handle_connect_event()
442             self.handle_read()
443         else:
444             self.handle_read()
445
446     def handle_connect_event(self):
447         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
448         if err != 0:
449             raise socket.error(err, _strerror(err))
450         self.handle_connect()
451         self.connected = True
452
453     def handle_write_event(self):
454         if self.accepting:
455             # Accepting sockets shouldn't get a write event.
456             # We will pretend it didn't happen.
457             return
458
459         if not self.connected:
460             #check for errors
461             err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
462             if err != 0:
463                 raise socket.error(err, _strerror(err))
464
465             self.handle_connect_event()
466         self.handle_write()
467
468     def handle_expt_event(self):
469         # handle_expt_event() is called if there might be an error on the
470         # socket, or if there is OOB data
471         # check for the error condition first
472         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
473         if err != 0:
474             # we can get here when select.select() says that there is an
475             # exceptional condition on the socket
476             # since there is an error, we'll go ahead and close the socket
477             # like we would in a subclassed handle_read() that received no
478             # data
479             self.handle_close()
480         else:
481             self.handle_expt()
482
483     def handle_error(self):
484         nil, t, v, tbinfo = compact_traceback()
485
486         # sometimes a user repr method will crash.
487         try:
488             self_repr = repr(self)
489         except:
490             self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
491
492         self.log_info(
493             'uncaptured python exception, closing channel %s (%s:%s %s)' % (
494                 self_repr,
495                 t,
496                 v,
497                 tbinfo
498                 ),
499             'error'
500             )
501         self.handle_close()
502
503     def handle_expt(self):
504         self.log_info('unhandled incoming priority event', 'warning')
505
506     def handle_read(self):
507         self.log_info('unhandled read event', 'warning')
508
509     def handle_write(self):
510         self.log_info('unhandled write event', 'warning')
511
512     def handle_connect(self):
513         self.log_info('unhandled connect event', 'warning')
514
515     def handle_accept(self):
516         self.log_info('unhandled accept event', 'warning')
517
518     def handle_close(self):
519         self.log_info('unhandled close event', 'warning')
520         self.close()
521
522 # ---------------------------------------------------------------------------
523 # adds simple buffered output capability, useful for simple clients.
524 # [for more sophisticated usage use asynchat.async_chat]
525 # ---------------------------------------------------------------------------
526
527 class dispatcher_with_send(dispatcher):
528
529     def __init__(self, sock=None, map=None):
530         dispatcher.__init__(self, sock, map)
531         self.out_buffer = ''
532
533     def initiate_send(self):
534         num_sent = 0
535         num_sent = dispatcher.send(self, self.out_buffer[:512])
536         self.out_buffer = self.out_buffer[num_sent:]
537
538     def handle_write(self):
539         self.initiate_send()
540
541     def writable(self):
542         return (not self.connected) or len(self.out_buffer)
543
544     def send(self, data):
545         if self.debug:
546             self.log_info('sending %s' % repr(data))
547         self.out_buffer = self.out_buffer + data
548         self.initiate_send()
549
550 # ---------------------------------------------------------------------------
551 # used for debugging.
552 # ---------------------------------------------------------------------------
553
554 def compact_traceback():
555     t, v, tb = sys.exc_info()
556     tbinfo = []
557     if not tb: # Must have a traceback
558         raise AssertionError("traceback does not exist")
559     while tb:
560         tbinfo.append((
561             tb.tb_frame.f_code.co_filename,
562             tb.tb_frame.f_code.co_name,
563             str(tb.tb_lineno)
564             ))
565         tb = tb.tb_next
566
567     # just to be safe
568     del tb
569
570     file, function, line = tbinfo[-1]
571     info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
572     return (file, function, line), t, v, info
573
574 def close_all(map=None, ignore_all=False):
575     if map is None:
576         map = socket_map
577     for x in map.values():
578         try:
579             x.close()
580         except OSError, x:
581             if x.args[0] == EBADF:
582                 pass
583             elif not ignore_all:
584                 raise
585         except _reraised_exceptions:
586             raise
587         except:
588             if not ignore_all:
589                 raise
590     map.clear()
591
592 # Asynchronous File I/O:
593 #
594 # After a little research (reading man pages on various unixen, and
595 # digging through the linux kernel), I've determined that select()
596 # isn't meant for doing asynchronous file i/o.
597 # Heartening, though - reading linux/mm/filemap.c shows that linux
598 # supports asynchronous read-ahead.  So _MOST_ of the time, the data
599 # will be sitting in memory for us already when we go to read it.
600 #
601 # What other OS's (besides NT) support async file i/o?  [VMS?]
602 #
603 # Regardless, this is useful for pipes, and stdin/stdout...
604
605 if os.name == 'posix':
606     import fcntl
607
608     class file_wrapper:
609         # Here we override just enough to make a file
610         # look like a socket for the purposes of asyncore.
611         # The passed fd is automatically os.dup()'d
612
613         def __init__(self, fd):
614             self.fd = os.dup(fd)
615
616         def recv(self, *args):
617             return os.read(self.fd, *args)
618
619         def send(self, *args):
620             return os.write(self.fd, *args)
621
622         def getsockopt(self, level, optname, buflen=None):
623             if (level == socket.SOL_SOCKET and
624                 optname == socket.SO_ERROR and
625                 not buflen):
626                 return 0
627             raise NotImplementedError("Only asyncore specific behaviour "
628                                       "implemented.")
629
630         read = recv
631         write = send
632
633         def close(self):
634             os.close(self.fd)
635
636         def fileno(self):
637             return self.fd
638
639     class file_dispatcher(dispatcher):
640
641         def __init__(self, fd, map=None):
642             dispatcher.__init__(self, None, map)
643             self.connected = True
644             try:
645                 fd = fd.fileno()
646             except AttributeError:
647                 pass
648             self.set_file(fd)
649             # set it to non-blocking mode
650             flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
651             flags = flags | os.O_NONBLOCK
652             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
653
654         def set_file(self, fd):
655             self.socket = file_wrapper(fd)
656             self._fileno = self.socket.fileno()
657             self.add_channel()