urllib3 - Thread-safe connection pooling and re-using.
"""
-from connectionpool import (
- connection_from_url,
- get_host,
+__author__ = "Andrey Petrov (andrey.petrov@shazow.net)"
+__license__ = "MIT"
+__version__ = "$Rev$"
+
+
+from .connectionpool import (
HTTPConnectionPool,
HTTPSConnectionPool,
+ connection_from_url,
+ get_host,
make_headers)
-# Possible exceptions
-from connectionpool import (
+
+
+from .exceptions import (
HTTPError,
MaxRetryError,
SSLError,
TimeoutError)
-from filepost import encode_multipart_formdata
-
-__author__ = "Andrey Petrov (andrey.petrov@shazow.net)"
-__license__ = "MIT"
-__version__ = "$Rev$"
+from .poolmanager import PoolManager
+from .response import HTTPResponse
+from .filepost import encode_multipart_formdata
--- /dev/null
+from collections import MutableMapping, deque
+
+
+__all__ = ['RecentlyUsedContainer']
+
+
+class AccessEntry(object):
+ __slots__ = ('key', 'is_valid')
+
+ def __init__(self, key, is_valid=True):
+ self.key = key
+ self.is_valid = is_valid
+
+
+class RecentlyUsedContainer(MutableMapping):
+ """
+ Provides a dict-like that maintains up to ``maxsize`` keys while throwing
+ away the least-recently-used keys beyond ``maxsize``.
+ """
+
+ # TODO: Make this threadsafe. _prune_invalidated_entries should be the
+ # only real pain-point for this.
+
+ # If len(self.access_log) exceeds self._maxsize * CLEANUP_FACTOR, then we
+ # will attempt to cleanup the invalidated entries in the access_log
+ # datastructure during the next 'get' operation.
+ CLEANUP_FACTOR = 10
+
+ def __init__(self, maxsize=10):
+ self._maxsize = maxsize
+
+ self._container = {}
+
+ # We use a deque to to store our keys ordered by the last access.
+ self.access_log = deque()
+
+ # We look up the access log entry by the key to invalidate it so we can
+ # insert a new authorative entry at the head without having to dig and
+ # find the old entry for removal immediately.
+ self.access_lookup = {}
+
+ # Trigger a heap cleanup when we get past this size
+ self.access_log_limit = maxsize * self.CLEANUP_FACTOR
+
+ def _push_entry(self, key):
+ "Push entry onto our access log, invalidate the old entry if exists."
+ # Invalidate old entry if it exists
+ old_entry = self.access_lookup.get(key)
+ if old_entry:
+ old_entry.is_valid = False
+
+ new_entry = AccessEntry(key)
+
+ self.access_lookup[key] = new_entry
+ self.access_log.appendleft(new_entry)
+
+ def _prune_entries(self, num):
+ "Pop entries from our access log until we popped ``num`` valid ones."
+ while num > 0:
+ p = self.access_log.pop()
+
+ if not p.is_valid:
+ continue # Invalidated entry, skip
+
+ del self._container[p.key]
+ del self.access_lookup[p.key]
+ num -= 1
+
+ def _prune_invalidated_entries(self):
+ "Rebuild our access_log without the invalidated entries."
+ self.access_log = deque(e for e in self.access_log if e.is_valid)
+
+ def _get_ordered_access_keys(self):
+ # Used for testing
+ return [e.key for e in self.access_log if e.is_valid]
+
+ def __getitem__(self, key):
+ item = self._container.get(key)
+
+ if not item:
+ return
+
+ # Insert new entry with new high priority, also implicitly invalidates
+ # the old entry.
+ self._push_entry(key)
+
+ if len(self.access_log) > self.access_log_limit:
+ # Heap is getting too big, try to clean up any tailing invalidated
+ # entries.
+ self._prune_invalidated_entries()
+
+ return item
+
+ def __setitem__(self, key, item):
+ # Add item to our container and access log
+ self._container[key] = item
+ self._push_entry(key)
+
+ # Discard invalid and excess entries
+ self._prune_entries(len(self._container) - self._maxsize)
+
+
+ def __delitem__(self, key):
+ self._invalidate_entry(key)
+ del self._container[key]
+ del self._access_lookup[key]
+
+ def __len__(self):
+ return len(self.access_log)
+
+ def __iter__(self):
+ return self._container.__iter__()
+
+ def __contains__(self, key):
+ return self._container.__contains__(key)
-import gzip
-import zlib
import logging
import socket
try:
import ssl
BaseSSLError = ssl.SSLError
-except ImportError, e:
+except ImportError:
ssl = None
BaseSSLError = None
-try:
- from cStringIO import StringIO
-except ImportError, e:
- from StringIO import StringIO
-
-from filepost import encode_multipart_formdata
+from .filepost import encode_multipart_formdata
+from .response import HTTPResponse
+from .exceptions import (
+ SSLError,
+ MaxRetryError,
+ TimeoutError,
+ HostChangedError,
+ EmptyPoolError)
log = logging.getLogger(__name__)
-## Exceptions
-
-class HTTPError(Exception):
- "Base exception used by this module."
- pass
-
-
-class SSLError(Exception):
- "Raised when SSL certificate fails in an HTTPS connection."
- pass
-
-
-class MaxRetryError(HTTPError):
- "Raised when the maximum number of retries is exceeded."
- pass
-
-
-class TimeoutError(HTTPError):
- "Raised when a socket timeout occurs."
- pass
-
-
-class HostChangedError(HTTPError):
- "Raised when an existing pool gets a request for a foreign host."
- pass
-
-
-## Response objects
-
-class HTTPResponse(object):
- """
- HTTP Response container.
-
- Similar to httplib's HTTPResponse but the data is pre-loaded.
- """
-
- def __init__(self, data='', headers=None, status=0, version=0, reason=None,
- strict=0):
- self.data = data
- self.headers = headers or {}
- self.status = status
- self.version = version
- self.reason = reason
- self.strict = strict
-
- @staticmethod
- def from_httplib(r, block=True):
- """
- Given an httplib.HTTPResponse instance, return a corresponding
- urllib3.HTTPResponse object.
-
- NOTE: This method will perform r.read() which will have side effects
- on the original http.HTTPResponse object.
- """
-
- if block:
- tmp_data = r.read()
- try:
- if r.getheader('content-encoding') == 'gzip':
- log.debug("Received response with content-encoding: gzip, "
- "decompressing with gzip.")
-
- gzipper = gzip.GzipFile(fileobj=StringIO(tmp_data))
- data = gzipper.read()
- elif r.getheader('content-encoding') == 'deflate':
- log.debug("Received response with content-encoding: deflate, "
- "decompressing with zlib.")
- try:
- data = zlib.decompress(tmp_data)
- except zlib.error, e:
- data = zlib.decompress(tmp_data, -zlib.MAX_WBITS)
- else:
- data = tmp_data
-
- except IOError:
- raise HTTPError("Received response with content-encoding: %s, "
- "but failed to decompress it." %
- (r.getheader('content-encoding')))
- else:
- data = None
-
- resp = HTTPResponse(data=data,
- headers=dict(r.getheaders()),
- status=r.status,
- version=r.version,
- reason=r.reason,
- strict=r.strict)
-
- resp._raw = r
- return resp
-
- # Backwards-compatibility methods for httplib.HTTPResponse
- def getheaders(self):
- return self.headers
-
- def getheader(self, name, default=None):
- return self.headers.get(name, default)
-
-
-## Connection objects
+## Connection objects (extension of httplib)
class VerifiedHTTPSConnection(HTTPSConnection):
"""
SSL certification.
"""
- def set_cert(self, key_file=None, cert_file=None, cert_reqs='CERT_NONE',
- ca_certs=None):
+ def __init__(self):
+ HTTPSConnection.__init__()
+ self.cert_reqs = None
+ self.ca_certs = None
+
+ def set_cert(self, key_file=None, cert_file=None,
+ cert_reqs='CERT_NONE', ca_certs=None):
ssl_req_scheme = {
'CERT_NONE': ssl.CERT_NONE,
'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
## Pool objects
-class HTTPConnectionPool(object):
+class ConnectionPool(object):
+ pass
+
+
+class HTTPConnectionPool(ConnectionPool):
"""
Thread-safe connection pool for one host.
self.headers = headers or {}
# Fill the queue up so that doing get() on it will block properly
- [self.pool.put(None) for i in xrange(maxsize)]
+ for _ in xrange(maxsize):
+ self.pool.put(None)
+ # These are mostly for testing and debugging purposes.
self.num_connections = 0
self.num_requests = 0
# If this is a persistent connection, check if it got disconnected
if conn and conn.sock and select([conn.sock], [], [], 0.0)[0]:
# Either data is buffered (bad), or the connection is dropped.
- log.warning("Connection pool detected dropped "
- "connection, resetting: %s" % self.host)
+ log.info("Resetting dropped connection: %s" % self.host)
conn.close()
- except Empty, e:
+ except Empty:
+ if self.block:
+ raise EmptyPoolError("Pool reached maximum size and no more "
+ "connections are allowed.")
pass # Oh well, we'll create a new connection then
return conn or self._new_conn()
"""
try:
self.pool.put(conn, block=False)
- except Full, e:
+ except Full:
# This should never happen if self.block == True
log.warning("HttpConnectionPool is full, discarding connection: %s"
% self.host)
+ def _make_request(self, conn, method, url, **httplib_request_kw):
+ """
+ Perform a request on a given httplib connection object taken from our
+ pool.
+ """
+ self.num_requests += 1
+
+ conn.request(method, url, **httplib_request_kw)
+ conn.sock.settimeout(self.timeout)
+ httplib_response = conn.getresponse()
+
+ log.debug("\"%s %s %s\" %s %s" %
+ (method, url,
+ conn._http_vsn_str, # pylint: disable-msg=W0212
+ httplib_response.status, httplib_response.length))
+
+ return httplib_response
+
+
def is_same_host(self, url):
return (url.startswith('/') or
get_host(url) == (self.scheme, self.host, self.port))
def urlopen(self, method, url, body=None, headers=None, retries=3,
- redirect=True, assert_same_host=True, block=True):
+ redirect=True, assert_same_host=True, pool_timeout=None,
+ **response_kw):
"""
Get a connection from the pool and perform an HTTP request.
If True, will make sure that the host of the pool requests is
consistent else will raise HostChangedError. When False, you can
use the pool on an HTTP proxy and request foreign hosts.
+
+ pool_timeout
+ If set and the pool is set to block=True, then this method will
+ block for ``pool_timeout`` seconds and raise EmptyPoolError if no
+ connection is available within the time period.
+
+ Additional parameters are passed to
+ ``HTTPResponse.from_httplib(r, **response_kw)``
"""
- if headers == None:
+ if headers is None:
headers = self.headers
if retries < 0:
try:
# Request a connection from the queue
- conn = self._get_conn()
+ conn = self._get_conn(timeout=pool_timeout)
- # Make the request
- self.num_requests += 1
- conn.request(method, url, body=body, headers=headers)
- conn.sock.settimeout(self.timeout)
- httplib_response = conn.getresponse()
- log.debug("\"%s %s %s\" %s %s" %
- (method, url, conn._http_vsn_str,
- httplib_response.status, httplib_response.length))
+ # Make the request on the httplib connection object
+ httplib_response = self._make_request(conn, method, url,
+ body=body, headers=headers)
- # from_httplib will perform httplib_response.read() which will have
- # the side effect of letting us use this connection for another
- # request.
- response = HTTPResponse.from_httplib(httplib_response, block=block)
+ # Import httplib's response into our own wrapper object
+ response = HTTPResponse.from_httplib(httplib_response,
+ pool=self,
+ connection=conn,
+ **response_kw)
- # Put the connection back to be reused
- self._put_conn(conn)
+ # The connection will be put back into the pool when
+ # response.release_conn() is called (implicitly by response.read())
except (SocketTimeout, Empty), e:
# Timed out either by socket or queue
return response
def get_url(self, url, fields=None, headers=None, retries=3,
- redirect=True):
+ redirect=True, **response_kw):
"""
Wrapper for performing GET with urlopen (see urlopen for more details).
if fields:
url += '?' + urlencode(fields)
return self.urlopen('GET', url, headers=headers, retries=retries,
- redirect=redirect)
+ redirect=redirect, **response_kw)
def post_url(self, url, fields=None, headers=None, retries=3,
- redirect=True, encode_multipart=True):
+ redirect=True, encode_multipart=True, multipart_boundary=None,
+ **response_kw):
"""
Wrapper for performing POST with urlopen (see urlopen
for more details).
which is used to compose the body of the request.
"""
if encode_multipart:
- body, content_type = encode_multipart_formdata(fields or {})
+ body, content_type = encode_multipart_formdata(fields or {},
+ boundary=multipart_boundary)
else:
body, content_type = (
urlencode(fields or {}),
headers.update({'Content-Type': content_type})
return self.urlopen('POST', url, body, headers=headers,
- retries=retries, redirect=redirect)
+ retries=retries, redirect=redirect, **response_kw)
class HTTPSConnectionPool(HTTPConnectionPool):
scheme = 'https'
- def __init__(self, host, port=None, strict=False, timeout=None, maxsize=1,
- block=False, headers=None, key_file=None,
- cert_file=None, cert_reqs='CERT_NONE', ca_certs=None):
- self.host = host
- self.port = port
- self.strict = strict
- self.timeout = timeout
- self.pool = Queue(maxsize)
- self.block = block
- self.headers = headers or {}
+ def __init__(self, host, port=None,
+ strict=False, timeout=None, maxsize=1,
+ block=False, headers=None,
+ key_file=None, cert_file=None,
+ cert_reqs='CERT_NONE', ca_certs=None):
+ super(HTTPSConnectionPool, self).__init__(host, port,
+ strict, timeout, maxsize,
+ block, headers)
self.key_file = key_file
self.cert_file = cert_file
self.cert_reqs = cert_reqs
self.ca_certs = ca_certs
- # Fill the queue up so that doing get() on it will block properly
- [self.pool.put(None) for i in xrange(maxsize)]
-
- self.num_connections = 0
- self.num_requests = 0
-
def _new_conn(self):
"""
Return a fresh HTTPSConnection.
if '//' in url:
scheme, url = url.split('://', 1)
if '/' in url:
- url, path = url.split('/', 1)
+ url, _path = url.split('/', 1)
if ':' in url:
url, port = url.split(':', 1)
port = int(port)
--- /dev/null
+## Exceptions
+
+class HTTPError(Exception):
+ "Base exception used by this module."
+ pass
+
+
+class SSLError(Exception):
+ "Raised when SSL certificate fails in an HTTPS connection."
+ pass
+
+
+class MaxRetryError(HTTPError):
+ "Raised when the maximum number of retries is exceeded."
+ pass
+
+
+class TimeoutError(HTTPError):
+ "Raised when a socket timeout occurs."
+ pass
+
+
+class HostChangedError(HTTPError):
+ "Raised when an existing pool gets a request for a foreign host."
+ pass
+
+class EmptyPoolError(HTTPError):
+ "Raised when a pool runs out of connections and no more are allowed."
+ pass
import codecs
import mimetools
import mimetypes
+
try:
from cStringIO import StringIO
-except:
- from StringIO import StringIO
+except ImportError:
+ from StringIO import StringIO # pylint: disable-msg=W0404
writer = codecs.lookup('utf-8')[3]
return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
-def encode_multipart_formdata(fields):
+def encode_multipart_formdata(fields, boundary=None):
body = StringIO()
- BOUNDARY = mimetools.choose_boundary()
+ if boundary is None:
+ boundary = mimetools.choose_boundary()
for fieldname, value in fields.iteritems():
- body.write('--%s\r\n' % (BOUNDARY))
+ body.write('--%s\r\n' % (boundary))
if isinstance(value, tuple):
filename, data = value
body.write('\r\n')
- body.write('--%s--\r\n' % (BOUNDARY))
+ body.write('--%s--\r\n' % (boundary))
- content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
+ content_type = 'multipart/form-data; boundary=%s' % boundary
return body.getvalue(), content_type
--- /dev/null
+from ._collections import RecentlyUsedContainer
+from .connectionpool import HTTPConnectionPool, HTTPSConnectionPool, get_host
+
+
+pool_classes_by_scheme = {
+ 'http': HTTPConnectionPool,
+ 'https': HTTPSConnectionPool,
+}
+
+port_by_scheme = {
+ 'http': 80,
+ 'https': 433,
+}
+
+
+class PoolManager(object):
+ """
+ Allows for arbitrary requests while transparently keeping track of
+ necessary connection pools for you.
+
+ num_pools
+ Number of connection pools to cache before discarding the least recently
+ used pool.
+
+ Additional parameters are used to create fresh ConnectionPool instances.
+
+ """
+
+ # TODO: Make sure there are no memory leaks here.
+
+ def __init__(self, num_pools=10, **connection_pool_kw):
+ self.connection_pool_kw = connection_pool_kw
+
+ self.pools = RecentlyUsedContainer(num_pools)
+ self.recently_used_pools = []
+
+ def connection_from_url(self, url):
+ """
+ Similar to connectionpool.connection_from_url but doesn't pass any
+ additional keywords to the ConnectionPool constructor. Additional
+ keywords are taken from the PoolManager constructor.
+ """
+ scheme, host, port = get_host(url)
+
+ # If the scheme, host, or port doesn't match existing open connections,
+ # open a new ConnectionPool.
+ pool_key = (scheme, host, port or port_by_scheme.get(scheme, 80))
+
+ pool = self.pools.get(pool_key)
+ if pool:
+ return pool
+
+ # Make a fresh ConnectionPool of the desired type
+ pool_cls = pool_classes_by_scheme[scheme]
+ pool = pool_cls(host, port, **self.connection_pool_kw)
+
+ self.pools[pool_key] = pool
+
+ return pool
+
+ def urlopen(self, method, url, **kw):
+ "Same as HTTP(S)ConnectionPool.urlopen"
+ conn = self.connection_from_url(url)
+ return conn.urlopen(method, url, **kw)
--- /dev/null
+import gzip
+import logging
+import zlib
+
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO # pylint: disable-msg=W0404
+
+
+from .exceptions import HTTPError
+
+
+log = logging.getLogger(__name__)
+
+
+def decode_gzip(data):
+ gzipper = gzip.GzipFile(fileobj=StringIO(data))
+ return gzipper.read()
+
+
+def decode_deflate(data):
+ try:
+ return zlib.decompress(data)
+ except zlib.error:
+ return zlib.decompress(data, -zlib.MAX_WBITS)
+
+
+class HTTPResponse(object):
+ """
+ HTTP Response container.
+
+ Backwards-compatible to httplib's HTTPResponse but the response ``body`` is
+ loaded and decoded on-demand when the ``data`` property is accessed.
+
+ Extra parameters for behaviour not present in httplib.HTTPResponse:
+
+ preload_content
+ If True, the response's body will be preloaded during construction.
+
+ decode_content
+ If True, attempts to decode specific content-encoding's based on headers
+ (like 'gzip' and 'deflate') will be skipped and raw data will be used
+ instead.
+
+ original_response
+ When this HTTPResponse wrapper is generated from an httplib.HTTPResponse
+ object, it's convenient to include the original for debug purposes. It's
+ otherwise unused.
+ """
+
+ CONTENT_DECODERS = {
+ 'gzip': decode_gzip,
+ 'deflate': decode_deflate,
+ }
+
+ def __init__(self, body='', headers=None, status=0, version=0, reason=None,
+ strict=0, preload_content=False, decode_content=True,
+ original_response=None, pool=None, connection=None):
+ self.headers = headers or {}
+ self.status = status
+ self.version = version
+ self.reason = reason
+ self.strict = strict
+
+ self._decode_content = decode_content
+ self._body = None
+ self._fp = None
+ self._original_response = original_response
+
+ self._pool = pool
+ self._connection = connection
+
+ if hasattr(body, 'read'):
+ self._fp = body
+
+ if preload_content:
+ self._body = self.read(decode_content=decode_content)
+
+ def release_conn(self):
+ if not self._pool or not self._connection:
+ return
+
+ self._pool._put_conn(self._connection)
+
+ @property
+ def data(self):
+ # For backwords-compat with earlier urllib3 0.4 and earlier.
+ if self._body:
+ return self._body
+
+ if self._fp:
+ return self.read(decode_content=self._decode_content,
+ cache_content=True)
+
+ def read(self, amt=None, decode_content=True, cache_content=False):
+ """
+ Similar to ``httplib.HTTPResponse.read(amt=None)``.
+
+ amt
+ How much of the content to read. If specified, decoding and caching
+ is skipped because we can't decode partial content nor does it make
+ sense to cache partial content as the full response.
+
+ decode_content
+ If True, will attempt to decode the body based on the
+ 'content-encoding' header. (Overridden if ``amt`` is set.)
+
+ cache_content
+ If True, will save the returned data such that the same result is
+ returned despite of the state of the underlying file object. This
+ is useful if you want the ``.data`` property to continue working
+ after having ``.read()`` the file object. (Overridden if ``amt`` is
+ set.)
+ """
+ content_encoding = self.headers.get('content-encoding')
+ decoder = self.CONTENT_DECODERS.get(content_encoding)
+
+ data = self._fp and self._fp.read(amt)
+
+ try:
+
+ if amt:
+ return data
+
+ if not decode_content or not decoder:
+ if cache_content:
+ self._body = data
+
+ return data
+
+ try:
+ data = decoder(data)
+ except IOError:
+ raise HTTPError("Received response with content-encoding: %s, but "
+ "failed to decode it." % content_encoding)
+
+ if cache_content:
+ self._body = data
+
+ return data
+
+ finally:
+
+ if self._original_response and self._original_response.isclosed():
+ self.release_conn()
+
+ @staticmethod
+ def from_httplib(r, **response_kw):
+ """
+ Given an httplib.HTTPResponse instance ``r``, return a corresponding
+ urllib3.HTTPResponse object.
+
+ Remaining parameters are passed to the HTTPResponse constructor, along
+ with ``original_response=r``.
+ """
+
+ return HTTPResponse(body=r,
+ headers=dict(r.getheaders()),
+ status=r.status,
+ version=r.version,
+ reason=r.reason,
+ strict=r.strict,
+ original_response=r,
+ **response_kw)
+
+ # Backwards-compatibility methods for httplib.HTTPResponse
+ def getheaders(self):
+ return self.headers
+
+ def getheader(self, name, default=None):
+ return self.headers.get(name, default)