URLRequired, SSLError)
from .utils import (
get_encoding_from_headers, stream_untransfer, guess_filename, requote_uri,
- dict_from_string, supported_schemes)
-
-from .compat import urlparse, urlunparse, urljoin, urlsplit, urlencode, quote, unquote, str, bytes, SimpleCookie, is_py3, is_py2
+ dict_from_string, supported_schemes, stream_decode_response_unicode)
+from .compat import (
+ urlparse, urlunparse, urljoin, urlsplit, urlencode, str, bytes,
+ SimpleCookie, is_py2)
# Import chardet if it is available.
try:
REDIRECT_STATI = (codes.moved, codes.found, codes.other, codes.temporary_moved)
-
class Request(object):
"""The :class:`Request <Request>` object. It carries out all functionality of
Requests. Recommended interface is with the Requests functions.
self.headers = headers
self._poolmanager = _poolmanager
-
def __repr__(self):
return '<Request [%s]>' % (self.method)
-
def _build_response(self, resp):
"""Build internal :class:`Response <Response>` object
from given response.
config=self.config,
timeout=self.timeout,
_poolmanager=self._poolmanager,
- proxies = self.proxies,
- verify = self.verify,
- session = self.session
+ proxies=self.proxies,
+ verify=self.verify,
+ session=self.session
)
request.send()
self.response.request = self
self.response.cookies.update(self.cookies)
-
@staticmethod
def _encode_params(data):
"""Encode parameters in a piece of data.
if hasattr(data, '__iter__') and not isinstance(data, str):
data = dict(data)
-
if hasattr(data, 'items'):
result = []
for k, vs in list(data.items()):
# Support for unicode domain names and paths.
scheme, netloc, path, params, query, fragment = urlparse(url)
-
if not scheme:
raise ValueError("Invalid URL %r: No schema supplied" % url)
return ''.join(url)
-
def register_hook(self, event, hook):
"""Properly register a hook."""
return self.hooks[event].append(hook)
-
def send(self, anyway=False, prefetch=False):
"""Sends the request. Returns True of successful, false if not.
If there was an HTTPError during transmission,
if self.verify is not True:
cert_loc = self.verify
-
# Look for configuration.
if not cert_loc:
cert_loc = os.environ.get('REQUESTS_CA_BUNDLE')
#: Dictionary of configurations for this request.
self.config = {}
-
def __repr__(self):
return '<Response [%s]>' % (self.status_code)
return False
return True
-
def iter_content(self, chunk_size=10 * 1024, decode_unicode=False):
"""Iterates over the response data. This avoids reading the content
at once into memory for large responses. The chunk size is the number
pending_bytes = resp.chunk_left
while pending_bytes:
chunk = fp.read(min(chunk_size, pending_bytes))
- pending_bytes-=len(chunk)
+ pending_bytes -= len(chunk)
yield chunk
- fp.read(2) # throw away crlf
+ fp.read(2) # throw away crlf
while 1:
#XXX correct line size? (httplib has 64kb, seems insane)
pending_bytes = fp.readline(40).strip()
break
while pending_bytes:
chunk = fp.read(min(chunk_size, pending_bytes))
- pending_bytes-=len(chunk)
+ pending_bytes -= len(chunk)
yield chunk
- fp.read(2) # throw away crlf
+ fp.read(2) # throw away crlf
self._content_consumed = True
fp.close()
-
if getattr(getattr(self.raw, '_original_response', None), 'chunked', False):
gen = generate_chunked()
else:
return gen
-
def iter_lines(self, chunk_size=10 * 1024, decode_unicode=None):
"""Iterates over the response data, one line at a time. This
avoids reading the content at once into memory for large
self._content_consumed = True
return self._content
-
@property
def text(self):
"""Content of the response, in unicode.
return content
-
def raise_for_status(self):
"""Raises stored :class:`HTTPError` or :class:`URLError`, if one occurred."""
elif (self.status_code >= 500) and (self.status_code < 600):
raise HTTPError('%s Server Error' % self.status_code)
-
-