import urllib2
import socket
import zlib
-import cgi
-import re
+
from urllib2 import HTTPError
from urlparse import urlparse, urlunparse, urljoin
from .structures import CaseInsensitiveDict
from .packages.poster.encode import multipart_encode
from .packages.poster.streaminghttp import register_openers, get_handlers
-from .utils import dict_from_cookiejar
-from .exceptions import RequestException, AuthenticationError, Timeout, URLRequired, InvalidMethod, TooManyRedirects
+from .utils import dict_from_cookiejar, get_unicode_from_response
from .status_codes import codes
+from .exceptions import RequestException, AuthenticationError, Timeout, URLRequired, InvalidMethod, TooManyRedirects
REDIRECT_STATI = (codes.moved, codes.found, codes.other, codes.temporary_moved)
def __nonzero__(self):
"""Returns true if :attr:`status_code` is 'OK'."""
+
return not self.error
def __getattr__(self, name):
- """Read and returns the full stream when accessing to :attr: `content`"""
+ """Read and returns the full stream when accessing to
+ :attr: `content`
+ """
+
if name == 'content':
if self._content is not None:
return self._content
+
+ # Read the contents.
self._content = self.read()
+
+ # Decode GZip'd content.
if self.headers.get('content-encoding', '') == 'gzip':
try:
self._content = zlib.decompress(self._content, 16+zlib.MAX_WBITS)
except zlib.error:
pass
- return self.unicode_content(self._content)
-
+ # Decode unicode content.
+ try:
+ self._content = get_unicode_from_response(self)
+ # Don't trust this stuff.
+ except UserWarning, e:
+ print e
+
+
+ return self._content
+
else:
raise AttributeError
-
-
- def get_content_type(self):
- content_type = self.headers.get("content-type")
- content_type, params = cgi.parse_header(content_type)
- return content_type, params
-
- def get_encoding_from_content_type(self):
- content_type, params = self.get_content_type()
- if "charset" in params:
- return params["charset"].strip("'\"")
-
- def get_encodings_from_content(self, content):
- if self._charset_re is None:
- self._charset_re = re.compile(
- r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I
- )
- return self._charset_re.findall(content)
-
- def unicode_content(self, content):
- """
- Returns the requested content back in unicode.
- Tried:
- 1. charset from content-type
- 2. every encodings from <meta ... charset=XXX>
- 3. fall back and replace all unicode characters
- """
- tried_encodings = []
- # Try charset from content-type
- encoding = self.get_encoding_from_content_type()
- if encoding:
- try:
- return unicode(content, encoding)
- except UnicodeError:
- tried_encodings.append(encoding)
-
- # Try every encodings from <meta ... charset=XXX>
- encodings = self.get_encodings_from_content(content)
- for encoding in encodings:
- if encoding in tried_encodings:
- continue
- try:
- return unicode(content, encoding)
- except UnicodeError:
- tried_encodings.append(encoding)
- # Fall back:
- return unicode(content, encoding, errors="replace")
+
def raise_for_status(self):
"""Raises stored :class:`HTTPError` or :class:`URLError`, if one occured."""
self._resp.fp._sock.recv = None
self._close()
+
+
class AuthManager(object):
"""Requests Authentication Manager."""