History
-------
+0.13.4 (2012-xx-xx)
++++++++++++++++++++
+
+- Fix leaking connections (from urllib3 update)
+- OAuthlib path hack fi
+- App Engine 2.7 Fixes!
+
+
0.13.3 (2012-07-12)
+++++++++++++++++++
# This module is part of urllib3 and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-from collections import deque
+from collections import MutableMapping
+from threading import Lock
-from threading import RLock
+try: # Python 2.7+
+ from collections import OrderedDict
+except ImportError:
+ from .packages.ordered_dict import OrderedDict
-__all__ = ['RecentlyUsedContainer']
+__all__ = ['RecentlyUsedContainer']
-class AccessEntry(object):
- __slots__ = ('key', 'is_valid')
- def __init__(self, key, is_valid=True):
- self.key = key
- self.is_valid = is_valid
+_Null = object()
-class RecentlyUsedContainer(dict):
- """
- Provides a dict-like that maintains up to ``maxsize`` keys while throwing
- away the least-recently-used keys beyond ``maxsize``.
+class RecentlyUsedContainer(MutableMapping):
"""
+ Provides a thread-safe dict-like container which maintains up to
+ ``maxsize`` keys while throwing away the least-recently-used keys beyond
+ ``maxsize``.
- # If len(self.access_log) exceeds self._maxsize * CLEANUP_FACTOR, then we
- # will attempt to cleanup the invalidated entries in the access_log
- # datastructure during the next 'get' operation.
- CLEANUP_FACTOR = 10
-
- def __init__(self, maxsize=10):
- self._maxsize = maxsize
-
- self._container = {}
-
- # We use a deque to to store our keys ordered by the last access.
- self.access_log = deque()
- self.access_log_lock = RLock()
-
- # We look up the access log entry by the key to invalidate it so we can
- # insert a new authorative entry at the head without having to dig and
- # find the old entry for removal immediately.
- self.access_lookup = {}
-
- # Trigger a heap cleanup when we get past this size
- self.access_log_limit = maxsize * self.CLEANUP_FACTOR
-
- def _invalidate_entry(self, key):
- "If exists: Invalidate old entry and return it."
- old_entry = self.access_lookup.get(key)
- if old_entry:
- old_entry.is_valid = False
+ :param maxsize:
+ Maximum number of recent elements to retain.
- return old_entry
-
- def _push_entry(self, key):
- "Push entry onto our access log, invalidate the old entry if exists."
- self._invalidate_entry(key)
-
- new_entry = AccessEntry(key)
- self.access_lookup[key] = new_entry
-
- self.access_log_lock.acquire()
- self.access_log.appendleft(new_entry)
- self.access_log_lock.release()
-
- def _prune_entries(self, num):
- "Pop entries from our access log until we popped ``num`` valid ones."
- while num > 0:
- self.access_log_lock.acquire()
- p = self.access_log.pop()
- self.access_log_lock.release()
-
- if not p.is_valid:
- continue # Invalidated entry, skip
-
- dict.pop(self, p.key, None)
- self.access_lookup.pop(p.key, None)
- num -= 1
+ :param dispose_func:
+ Every time an item is evicted from the container,
+ ``dispose_func(value)`` is called. Callback which will get called
+ """
- def _prune_invalidated_entries(self):
- "Rebuild our access_log without the invalidated entries."
- self.access_log_lock.acquire()
- self.access_log = deque(e for e in self.access_log if e.is_valid)
- self.access_log_lock.release()
+ ContainerCls = OrderedDict
- def _get_ordered_access_keys(self):
- "Return ordered access keys for inspection. Used for testing."
- self.access_log_lock.acquire()
- r = [e.key for e in self.access_log if e.is_valid]
- self.access_log_lock.release()
+ def __init__(self, maxsize=10, dispose_func=None):
+ self._maxsize = maxsize
+ self.dispose_func = dispose_func
- return r
+ self._container = self.ContainerCls()
+ self._lock = Lock()
def __getitem__(self, key):
- item = dict.get(self, key)
+ # Re-insert the item, moving it to the end of the eviction line.
+ with self._lock:
+ item = self._container.pop(key)
+ self._container[key] = item
+ return item
+
+ def __setitem__(self, key, value):
+ evicted_value = _Null
+ with self._lock:
+ # Possibly evict the existing value of 'key'
+ evicted_value = self._container.get(key, _Null)
+ self._container[key] = value
+
+ # If we didn't evict an existing value, we might have to evict the
+ # least recently used item from the beginning of the container.
+ if len(self._container) > self._maxsize:
+ _key, evicted_value = self._container.popitem(last=False)
+
+ if self.dispose_func and evicted_value is not _Null:
+ self.dispose_func(evicted_value)
- if not item:
- raise KeyError(key)
+ def __delitem__(self, key):
+ with self._lock:
+ value = self._container.pop(key)
- # Insert new entry with new high priority, also implicitly invalidates
- # the old entry.
- self._push_entry(key)
+ if self.dispose_func:
+ self.dispose_func(value)
- if len(self.access_log) > self.access_log_limit:
- # Heap is getting too big, try to clean up any tailing invalidated
- # entries.
- self._prune_invalidated_entries()
+ def __len__(self):
+ with self._lock:
+ return len(self._container)
- return item
+ def __iter__(self):
+ raise NotImplementedError('Iteration over this class is unlikely to be threadsafe.')
- def __setitem__(self, key, item):
- # Add item to our container and access log
- dict.__setitem__(self, key, item)
- self._push_entry(key)
+ def clear(self):
+ with self._lock:
+ # Copy pointers to all values, then wipe the mapping
+ # under Python 2, this copies the list of values twice :-|
+ values = list(self._container.values())
+ self._container.clear()
- # Discard invalid and excess entries
- self._prune_entries(len(self) - self._maxsize)
+ if self.dispose_func:
+ for value in values:
+ self.dispose_func(value)
- def __delitem__(self, key):
- self._invalidate_entry(key)
- self.access_lookup.pop(key, None)
- dict.__delitem__(self, key)
-
- def get(self, key, default=None):
- try:
- return self[key]
- except KeyError:
- return default
+ def keys(self):
+ with self._lock:
+ return self._container.keys()
import logging
import socket
-from socket import error as SocketError, timeout as SocketTimeout
+from socket import timeout as SocketTimeout
-try: # Python 3
+try: # Python 3
from http.client import HTTPConnection, HTTPException
from http.client import HTTP_PORT, HTTPS_PORT
except ImportError:
from httplib import HTTPConnection, HTTPException
from httplib import HTTP_PORT, HTTPS_PORT
-try: # Python 3
+try: # Python 3
from queue import LifoQueue, Empty, Full
except ImportError:
from Queue import LifoQueue, Empty, Full
-try: # Compiled with SSL?
+try: # Compiled with SSL?
HTTPSConnection = object
BaseSSLError = None
ssl = None
- try: # Python 3
+ try: # Python 3
from http.client import HTTPSConnection
except ImportError:
from httplib import HTTPSConnection
import ssl
BaseSSLError = ssl.SSLError
-except (ImportError, AttributeError):
+except (ImportError, AttributeError): # Platform-specific: No SSL.
pass
from .response import HTTPResponse
from .util import get_host, is_connection_dropped
from .exceptions import (
+ ClosedPoolError,
EmptyPoolError,
HostChangedError,
MaxRetryError,
try:
conn = self.pool.get(block=self.block, timeout=timeout)
- # If this is a persistent connection, check if it got disconnected
- if conn and is_connection_dropped(conn):
- log.info("Resetting dropped connection: %s" % self.host)
- conn.close()
+ except AttributeError: # self.pool is None
+ raise ClosedPoolError(self, "Pool is closed.")
except Empty:
if self.block:
"connections are allowed.")
pass # Oh well, we'll create a new connection then
+ # If this is a persistent connection, check if it got disconnected
+ if conn and is_connection_dropped(conn):
+ log.info("Resetting dropped connection: %s" % self.host)
+ conn.close()
+
return conn or self._new_conn()
def _put_conn(self, conn):
Connection object for the current host and port as returned by
:meth:`._new_conn` or :meth:`._get_conn`.
- If the pool is already full, the connection is discarded because we
- exceeded maxsize. If connections are discarded frequently, then maxsize
- should be increased.
+ If the pool is already full, the connection is closed and discarded
+ because we exceeded maxsize. If connections are discarded frequently,
+ then maxsize should be increased.
+
+ If the pool is closed, then the connection will be closed and discarded.
"""
try:
self.pool.put(conn, block=False)
+ return # Everything is dandy, done.
+ except AttributeError:
+ # self.pool is None.
+ pass
except Full:
# This should never happen if self.block == True
log.warning("HttpConnectionPool is full, discarding connection: %s"
% self.host)
+ # Connection never got put back into the pool, close it.
+ conn.close()
+
def _make_request(self, conn, method, url, timeout=_Default,
**httplib_request_kw):
"""
log.debug("\"%s %s %s\" %s %s" % (method, url, http_version,
httplib_response.status,
httplib_response.length))
-
return httplib_response
+ def close(self):
+ """
+ Close all pooled connections and disable the pool.
+ """
+ # Disable access to the pool
+ old_pool, self.pool = self.pool, None
+
+ try:
+ while True:
+ conn = old_pool.get(block=False)
+ if conn:
+ conn.close()
+
+ except Empty:
+ pass # Done.
def is_same_host(self, url):
"""
Check if the given ``url`` is a member of the same host as this
connection pool.
"""
+ if url.startswith('/'):
+ return True
+
# TODO: Add optional support for socket.gethostbyname checking.
scheme, host, port = get_host(url)
# Use explicit default port for comparison when none is given.
port = port_by_scheme.get(scheme)
- return (url.startswith('/') or
- (scheme, host, port) == (self.scheme, self.host, self.port))
+ return (scheme, host, port) == (self.scheme, self.host, self.port)
def urlopen(self, method, url, body=None, headers=None, retries=3,
redirect=True, assert_same_host=True, timeout=_Default,
try:
# Request a connection from the queue
- # (Could raise SocketError: Bad file descriptor)
conn = self._get_conn(timeout=pool_timeout)
# Make the request on the httplib connection object
# Name mismatch
raise SSLError(e)
- except (HTTPException, SocketError) as e:
+ except HTTPException as e:
# Connection broken, discard. It will be replaced next _get_conn().
conn = None
# This is necessary so we can access e below
err = e
finally:
- if conn and release_conn:
- # Put the connection back to be reused
+ if release_conn:
+ # Put the connection back to be reused. If the connection is
+ # expired then it will be None, which will get replaced with a
+ # fresh connection during _get_conn.
self._put_conn(conn)
if not conn:
+ # Try again
log.warn("Retrying (%d attempts remain) after connection "
"broken by '%r': %s" % (retries, err, url))
return self.urlopen(method, url, body, headers, retries - 1,
- redirect, assert_same_host) # Try again
+ redirect, assert_same_host,
+ timeout=timeout, pool_timeout=pool_timeout,
+ release_conn=release_conn, **response_kw)
# Handle redirect?
redirect_location = redirect and response.get_redirect_location()
if redirect_location:
+ if response.status == 303:
+ method = 'GET'
log.info("Redirecting %s -> %s" % (url, redirect_location))
return self.urlopen(method, redirect_location, body, headers,
- retries - 1, redirect, assert_same_host)
+ retries - 1, redirect, assert_same_host,
+ timeout=timeout, pool_timeout=pool_timeout,
+ release_conn=release_conn, **response_kw)
return response
pass
+class DecodeError(HTTPError):
+ "Raised when automatic decoding based on Content-Type fails."
+ pass
+
+
## Leaf Exceptions
class MaxRetryError(PoolError):
pass
+class ClosedPoolError(PoolError):
+ "Raised when a request enters a pool after the pool has been closed."
+ pass
+
+
class LocationParseError(ValueError, HTTPError):
"Raised when get_host or similar fails to parse the URL input."
from ._collections import RecentlyUsedContainer
from .connectionpool import HTTPConnectionPool, HTTPSConnectionPool
-from .connectionpool import get_host, connection_from_url, port_by_scheme
-from .exceptions import HostChangedError
+from .connectionpool import connection_from_url, port_by_scheme
from .request import RequestMethods
+from .util import parse_url
__all__ = ['PoolManager', 'ProxyManager', 'proxy_from_url']
"""
- # TODO: Make sure there are no memory leaks here.
-
def __init__(self, num_pools=10, **connection_pool_kw):
self.connection_pool_kw = connection_pool_kw
- self.pools = RecentlyUsedContainer(num_pools)
+ self.pools = RecentlyUsedContainer(num_pools,
+ dispose_func=lambda p: p.close())
+
+ def clear(self):
+ """
+ Empty our store of pools and direct them all to close.
+
+ This will not affect in-flight connections, but they will not be
+ re-used after completion.
+ """
+ self.pools.clear()
- def connection_from_host(self, host, port=80, scheme='http'):
+ def connection_from_host(self, host, port=None, scheme='http'):
"""
Get a :class:`ConnectionPool` based on the host, port, and scheme.
- Note that an appropriate ``port`` value is required here to normalize
- connection pools in our container most effectively.
+ If ``port`` isn't given, it will be derived from the ``scheme`` using
+ ``urllib3.connectionpool.port_by_scheme``.
"""
+ port = port or port_by_scheme.get(scheme, 80)
+
pool_key = (scheme, host, port)
# If the scheme, host, or port doesn't match existing open connections,
Additional parameters are taken from the :class:`.PoolManager`
constructor.
"""
- scheme, host, port = get_host(url)
-
- port = port or port_by_scheme.get(scheme, 80)
-
- return self.connection_from_host(host, port=port, scheme=scheme)
+ u = parse_url(url)
+ return self.connection_from_host(u.host, port=u.port, scheme=u.scheme)
- def urlopen(self, method, url, **kw):
+ def urlopen(self, method, url, redirect=True, **kw):
"""
- Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`.
+ Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
+ with custom cross-host redirect logic and only sends the request-uri
+ portion of the ``url``.
- ``url`` must be absolute, such that an appropriate
+ The given ``url`` parameter must be absolute, such that an appropriate
:class:`urllib3.connectionpool.ConnectionPool` can be chosen for it.
"""
- conn = self.connection_from_url(url)
- try:
- return conn.urlopen(method, url, **kw)
+ u = parse_url(url)
+ conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme)
+
+ kw['assert_same_host'] = False
+ kw['redirect'] = False
+
+ response = conn.urlopen(method, u.request_uri, **kw)
+
+ redirect_location = redirect and response.get_redirect_location()
+ if not redirect_location:
+ return response
+
+ if response.status == 303:
+ method = 'GET'
- except HostChangedError as e:
- kw['retries'] = e.retries # Persist retries countdown
- return self.urlopen(method, e.url, **kw)
+ log.info("Redirecting %s -> %s" % (url, redirect_location))
+ kw['retries'] = kw.get('retries', 3) - 1 # Persist retries countdown
+ return self.urlopen(method, redirect_location, **kw)
class ProxyManager(RequestMethods):
from io import BytesIO
-from .exceptions import HTTPError
+from .exceptions import DecodeError
from .packages.six import string_types as basestring
try:
if decode_content and decoder:
data = decoder(data)
- except IOError:
- raise HTTPError("Received response with content-encoding: %s, but "
- "failed to decode it." % content_encoding)
+ except (IOError, zlib.error):
+ raise DecodeError("Received response with content-encoding: %s, but "
+ "failed to decode it." % content_encoding)
if cache_content:
self._body = data
from base64 import b64encode
+from collections import namedtuple
+from socket import error as SocketError
try:
from select import poll, POLLIN
from .exceptions import LocationParseError
-def make_headers(keep_alive=None, accept_encoding=None, user_agent=None,
- basic_auth=None):
+class Url(namedtuple('Url', ['scheme', 'auth', 'host', 'port', 'path', 'query', 'fragment'])):
"""
- Shortcuts for generating request headers.
-
- :param keep_alive:
- If ``True``, adds 'connection: keep-alive' header.
-
- :param accept_encoding:
- Can be a boolean, list, or string.
- ``True`` translates to 'gzip,deflate'.
- List will get joined by comma.
- String will be used as provided.
-
- :param user_agent:
- String representing the user-agent you want, such as
- "python-urllib3/0.6"
-
- :param basic_auth:
- Colon-separated username:password string for 'authorization: basic ...'
- auth header.
-
- Example: ::
-
- >>> make_headers(keep_alive=True, user_agent="Batman/1.0")
- {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
- >>> make_headers(accept_encoding=True)
- {'accept-encoding': 'gzip,deflate'}
+ Datastructure for representing an HTTP URL. Used as a return value for
+ :func:`parse_url`.
"""
- headers = {}
- if accept_encoding:
- if isinstance(accept_encoding, str):
- pass
- elif isinstance(accept_encoding, list):
- accept_encoding = ','.join(accept_encoding)
- else:
- accept_encoding = 'gzip,deflate'
- headers['accept-encoding'] = accept_encoding
+ slots = ()
- if user_agent:
- headers['user-agent'] = user_agent
+ def __new__(cls, scheme=None, auth=None, host=None, port=None, path=None, query=None, fragment=None):
+ return super(Url, cls).__new__(cls, scheme, auth, host, port, path, query, fragment)
- if keep_alive:
- headers['connection'] = 'keep-alive'
+ @property
+ def hostname(self):
+ """For backwards-compatibility with urlparse. We're nice like that."""
+ return self.host
- if basic_auth:
- headers['authorization'] = 'Basic ' + \
- b64encode(six.b(basic_auth)).decode('utf-8')
+ @property
+ def request_uri(self):
+ """Absolute path including the query string."""
+ uri = self.path or '/'
- return headers
+ if self.query is not None:
+ uri += '?' + self.query
+
+ return uri
def split_first(s, delims):
"""
Given a string and an iterable of delimiters, split on the first found
- delimiter. Return two split parts.
+ delimiter. Return two split parts and the matched delimiter.
If not found, then the first part is the full input string.
+ Example: ::
+
+ >>> split_first('foo/bar?baz', '?/=')
+ ('foo', 'bar?baz', '/')
+ >>> split_first('foo/bar?baz', '123')
+ ('foo/bar?baz', '', None)
+
Scales linearly with number of delims. Not ideal for large number of delims.
"""
min_idx = None
+ min_delim = None
for d in delims:
idx = s.find(d)
if idx < 0:
continue
- if not min_idx:
+ if min_idx is None or idx < min_idx:
min_idx = idx
- else:
- min_idx = min(idx, min_idx)
+ min_delim = d
- if min_idx < 0:
- return s, ''
+ if min_idx is None or min_idx < 0:
+ return s, '', None
- return s[:min_idx], s[min_idx+1:]
+ return s[:min_idx], s[min_idx+1:], min_delim
-def get_host(url):
+def parse_url(url):
"""
- Given a url, return its scheme, host and port (None if it's not there).
+ Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is
+ performed to parse incomplete urls. Fields not provided will be None.
- For example: ::
+ Partly backwards-compatible with :mod:`urlparse`.
- >>> get_host('http://google.com/mail/')
- ('http', 'google.com', None)
- >>> get_host('google.com:80')
- ('http', 'google.com', 80)
+ Example: ::
+
+ >>> parse_url('http://google.com/mail/')
+ Url(scheme='http', host='google.com', port=None, path='/', ...)
+ >>> prase_url('google.com:80')
+ Url(scheme=None, host='google.com', port=80, path=None, ...)
+ >>> prase_url('/foo?bar')
+ Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...)
"""
# While this code has overlap with stdlib's urlparse, it is much
# Additionally, this imeplementations does silly things to be optimal
# on CPython.
- scheme = 'http'
+ scheme = None
+ auth = None
host = None
port = None
+ path = None
+ fragment = None
+ query = None
# Scheme
if '://' in url:
# Find the earliest Authority Terminator
# (http://tools.ietf.org/html/rfc3986#section-3.2)
- url, _path = split_first(url, ['/', '?', '#'])
+ url, path_, delim = split_first(url, ['/', '?', '#'])
+
+ if delim:
+ # Reassemble the path
+ path = delim + path_
# Auth
if '@' in url:
- _auth, url = url.split('@', 1)
+ auth, url = url.split('@', 1)
# IPv6
if url and url[0] == '[':
port = int(port)
- elif not host:
+ elif not host and url:
host = url
- return scheme, host, port
+ if not path:
+ return Url(scheme, auth, host, port, path, query, fragment)
+
+ # Fragment
+ if '#' in path:
+ path, fragment = path.split('#', 1)
+
+ # Query
+ if '?' in path:
+ path, query = path.split('?', 1)
+
+ return Url(scheme, auth, host, port, path, query, fragment)
+
+
+def get_host(url):
+ """
+ Deprecated. Use :func:`.parse_url` instead.
+ """
+ p = parse_url(url)
+ return p.scheme or 'http', p.hostname, p.port
+
+
+def make_headers(keep_alive=None, accept_encoding=None, user_agent=None,
+ basic_auth=None):
+ """
+ Shortcuts for generating request headers.
+
+ :param keep_alive:
+ If ``True``, adds 'connection: keep-alive' header.
+
+ :param accept_encoding:
+ Can be a boolean, list, or string.
+ ``True`` translates to 'gzip,deflate'.
+ List will get joined by comma.
+ String will be used as provided.
+
+ :param user_agent:
+ String representing the user-agent you want, such as
+ "python-urllib3/0.6"
+
+ :param basic_auth:
+ Colon-separated username:password string for 'authorization: basic ...'
+ auth header.
+
+ Example: ::
+
+ >>> make_headers(keep_alive=True, user_agent="Batman/1.0")
+ {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
+ >>> make_headers(accept_encoding=True)
+ {'accept-encoding': 'gzip,deflate'}
+ """
+ headers = {}
+ if accept_encoding:
+ if isinstance(accept_encoding, str):
+ pass
+ elif isinstance(accept_encoding, list):
+ accept_encoding = ','.join(accept_encoding)
+ else:
+ accept_encoding = 'gzip,deflate'
+ headers['accept-encoding'] = accept_encoding
+
+ if user_agent:
+ headers['user-agent'] = user_agent
+
+ if keep_alive:
+ headers['connection'] = 'keep-alive'
+
+ if basic_auth:
+ headers['authorization'] = 'Basic ' + \
+ b64encode(six.b(basic_auth)).decode('utf-8')
+
+ return headers
def is_connection_dropped(conn):
Returns True if the connection is dropped and should be closed.
:param conn:
- ``HTTPConnection`` object.
+ :class:`httplib.HTTPConnection` object.
Note: For platforms like AppEngine, this will always return ``False`` to
let the platform handle connection recycling transparently for us.
if not select: # Platform-specific: AppEngine
return False
- return select([sock], [], [], 0.0)[0]
+ try:
+ return select([sock], [], [], 0.0)[0]
+ except SocketError:
+ return True
# This version is better on platforms that support it.
p = poll()