2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
60 _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
70 return os.strerror(err)
71 except (ValueError, OverflowError, NameError):
74 return "Unknown error %s" %err
76 class ExitNow(Exception):
79 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
83 obj.handle_read_event()
84 except _reraised_exceptions:
91 obj.handle_write_event()
92 except _reraised_exceptions:
99 obj.handle_expt_event()
100 except _reraised_exceptions:
105 def readwrite(obj, flags):
107 if flags & select.POLLIN:
108 obj.handle_read_event()
109 if flags & select.POLLOUT:
110 obj.handle_write_event()
111 if flags & select.POLLPRI:
112 obj.handle_expt_event()
113 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
115 except socket.error, e:
116 if e.args[0] not in _DISCONNECTED:
120 except _reraised_exceptions:
125 def poll(timeout=0.0, map=None):
129 r = []; w = []; e = []
130 for fd, obj in map.items():
131 is_r = obj.readable()
132 is_w = obj.writable()
135 # accepting sockets should not be writable
136 if is_w and not obj.accepting:
140 if [] == r == w == e:
145 r, w, e = select.select(r, w, e, timeout)
146 except select.error, err:
147 if err.args[0] != EINTR:
170 def poll2(timeout=0.0, map=None):
171 # Use the poll() support added to the select module in Python 2.0
174 if timeout is not None:
175 # timeout is in milliseconds
176 timeout = int(timeout*1000)
177 pollster = select.poll()
179 for fd, obj in map.items():
182 flags |= select.POLLIN | select.POLLPRI
183 # accepting sockets should not be writable
184 if obj.writable() and not obj.accepting:
185 flags |= select.POLLOUT
187 # Only check for exceptions if object was either readable
189 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
190 pollster.register(fd, flags)
192 r = pollster.poll(timeout)
193 except select.error, err:
194 if err.args[0] != EINTR:
201 readwrite(obj, flags)
203 poll3 = poll2 # Alias for backward compatibility
205 def loop(timeout=30.0, use_poll=False, map=None, count=None):
209 if use_poll and hasattr(select, 'poll'):
216 poll_fun(timeout, map)
219 while map and count > 0:
220 poll_fun(timeout, map)
230 ignore_log_types = frozenset(['warning'])
232 def __init__(self, sock=None, map=None):
234 self._map = socket_map
241 # Set to nonblocking just to make sure for cases where we
242 # get a socket from a blocking source.
244 self.set_socket(sock, map)
245 self.connected = True
246 # The constructor no longer requires that the socket
247 # passed be connected.
249 self.addr = sock.getpeername()
250 except socket.error, err:
251 if err.args[0] == ENOTCONN:
252 # To handle the case where we got an unconnected
254 self.connected = False
256 # The socket is broken in some unknown way, alert
257 # the user and remove it from the map (to prevent
258 # polling of broken sockets).
259 self.del_channel(map)
265 status = [self.__class__.__module__+"."+self.__class__.__name__]
266 if self.accepting and self.addr:
267 status.append('listening')
269 status.append('connected')
270 if self.addr is not None:
272 status.append('%s:%d' % self.addr)
274 status.append(repr(self.addr))
275 return '<%s at %#x>' % (' '.join(status), id(self))
279 def add_channel(self, map=None):
280 #self.log_info('adding channel %s' % self)
283 map[self._fileno] = self
285 def del_channel(self, map=None):
290 #self.log_info('closing channel %d:%s' % (fd, self))
294 def create_socket(self, family, type):
295 self.family_and_type = family, type
296 sock = socket.socket(family, type)
298 self.set_socket(sock)
300 def set_socket(self, sock, map=None):
302 ## self.__dict__['socket'] = sock
303 self._fileno = sock.fileno()
304 self.add_channel(map)
306 def set_reuse_addr(self):
307 # try to re-use a server port if possible
309 self.socket.setsockopt(
310 socket.SOL_SOCKET, socket.SO_REUSEADDR,
311 self.socket.getsockopt(socket.SOL_SOCKET,
312 socket.SO_REUSEADDR) | 1
317 # ==================================================
318 # predicates for select()
319 # these are used as filters for the lists of sockets
320 # to pass to select().
321 # ==================================================
329 # ==================================================
330 # socket object methods.
331 # ==================================================
333 def listen(self, num):
334 self.accepting = True
335 if os.name == 'nt' and num > 5:
337 return self.socket.listen(num)
339 def bind(self, addr):
341 return self.socket.bind(addr)
343 def connect(self, address):
344 self.connected = False
345 err = self.socket.connect_ex(address)
346 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
347 or err == EINVAL and os.name in ('nt', 'ce'):
349 if err in (0, EISCONN):
351 self.handle_connect_event()
353 raise socket.error(err, errorcode[err])
356 # XXX can return either an address pair or None
358 conn, addr = self.socket.accept()
361 except socket.error as why:
362 if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
369 def send(self, data):
371 result = self.socket.send(data)
373 except socket.error, why:
374 if why.args[0] == EWOULDBLOCK:
376 elif why.args[0] in _DISCONNECTED:
382 def recv(self, buffer_size):
384 data = self.socket.recv(buffer_size)
386 # a closed connection is indicated by signaling
387 # a read condition, and having recv() return 0.
392 except socket.error, why:
393 # winsock sometimes throws ENOTCONN
394 if why.args[0] in _DISCONNECTED:
401 self.connected = False
402 self.accepting = False
406 except socket.error, why:
407 if why.args[0] not in (ENOTCONN, EBADF):
410 # cheap inheritance, used to pass all other attribute
411 # references to the underlying socket object.
412 def __getattr__(self, attr):
414 retattr = getattr(self.socket, attr)
415 except AttributeError:
416 raise AttributeError("%s instance has no attribute '%s'"
417 %(self.__class__.__name__, attr))
419 msg = "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \
420 "instead." % {'me': self.__class__.__name__, 'attr':attr}
421 warnings.warn(msg, DeprecationWarning, stacklevel=2)
424 # log and log_info may be overridden to provide more sophisticated
425 # logging and warning methods. In general, log is for 'hit' logging
426 # and 'log_info' is for informational, warning and error logging.
428 def log(self, message):
429 sys.stderr.write('log: %s\n' % str(message))
431 def log_info(self, message, type='info'):
432 if type not in self.ignore_log_types:
433 print '%s: %s' % (type, message)
435 def handle_read_event(self):
437 # accepting sockets are never connected, they "spawn" new
438 # sockets that are connected
440 elif not self.connected:
441 self.handle_connect_event()
446 def handle_connect_event(self):
447 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
449 raise socket.error(err, _strerror(err))
450 self.handle_connect()
451 self.connected = True
453 def handle_write_event(self):
455 # Accepting sockets shouldn't get a write event.
456 # We will pretend it didn't happen.
459 if not self.connected:
461 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
463 raise socket.error(err, _strerror(err))
465 self.handle_connect_event()
468 def handle_expt_event(self):
469 # handle_expt_event() is called if there might be an error on the
470 # socket, or if there is OOB data
471 # check for the error condition first
472 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
474 # we can get here when select.select() says that there is an
475 # exceptional condition on the socket
476 # since there is an error, we'll go ahead and close the socket
477 # like we would in a subclassed handle_read() that received no
483 def handle_error(self):
484 nil, t, v, tbinfo = compact_traceback()
486 # sometimes a user repr method will crash.
488 self_repr = repr(self)
490 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
493 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
503 def handle_expt(self):
504 self.log_info('unhandled incoming priority event', 'warning')
506 def handle_read(self):
507 self.log_info('unhandled read event', 'warning')
509 def handle_write(self):
510 self.log_info('unhandled write event', 'warning')
512 def handle_connect(self):
513 self.log_info('unhandled connect event', 'warning')
515 def handle_accept(self):
516 self.log_info('unhandled accept event', 'warning')
518 def handle_close(self):
519 self.log_info('unhandled close event', 'warning')
522 # ---------------------------------------------------------------------------
523 # adds simple buffered output capability, useful for simple clients.
524 # [for more sophisticated usage use asynchat.async_chat]
525 # ---------------------------------------------------------------------------
527 class dispatcher_with_send(dispatcher):
529 def __init__(self, sock=None, map=None):
530 dispatcher.__init__(self, sock, map)
533 def initiate_send(self):
535 num_sent = dispatcher.send(self, self.out_buffer[:512])
536 self.out_buffer = self.out_buffer[num_sent:]
538 def handle_write(self):
542 return (not self.connected) or len(self.out_buffer)
544 def send(self, data):
546 self.log_info('sending %s' % repr(data))
547 self.out_buffer = self.out_buffer + data
550 # ---------------------------------------------------------------------------
551 # used for debugging.
552 # ---------------------------------------------------------------------------
554 def compact_traceback():
555 t, v, tb = sys.exc_info()
557 if not tb: # Must have a traceback
558 raise AssertionError("traceback does not exist")
561 tb.tb_frame.f_code.co_filename,
562 tb.tb_frame.f_code.co_name,
570 file, function, line = tbinfo[-1]
571 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
572 return (file, function, line), t, v, info
574 def close_all(map=None, ignore_all=False):
577 for x in map.values():
581 if x.args[0] == EBADF:
585 except _reraised_exceptions:
592 # Asynchronous File I/O:
594 # After a little research (reading man pages on various unixen, and
595 # digging through the linux kernel), I've determined that select()
596 # isn't meant for doing asynchronous file i/o.
597 # Heartening, though - reading linux/mm/filemap.c shows that linux
598 # supports asynchronous read-ahead. So _MOST_ of the time, the data
599 # will be sitting in memory for us already when we go to read it.
601 # What other OS's (besides NT) support async file i/o? [VMS?]
603 # Regardless, this is useful for pipes, and stdin/stdout...
605 if os.name == 'posix':
609 # Here we override just enough to make a file
610 # look like a socket for the purposes of asyncore.
611 # The passed fd is automatically os.dup()'d
613 def __init__(self, fd):
616 def recv(self, *args):
617 return os.read(self.fd, *args)
619 def send(self, *args):
620 return os.write(self.fd, *args)
622 def getsockopt(self, level, optname, buflen=None):
623 if (level == socket.SOL_SOCKET and
624 optname == socket.SO_ERROR and
627 raise NotImplementedError("Only asyncore specific behaviour "
639 class file_dispatcher(dispatcher):
641 def __init__(self, fd, map=None):
642 dispatcher.__init__(self, None, map)
643 self.connected = True
646 except AttributeError:
649 # set it to non-blocking mode
650 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
651 flags = flags | os.O_NONBLOCK
652 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
654 def set_file(self, fd):
655 self.socket = file_wrapper(fd)
656 self._fileno = self.socket.fileno()