from . import sessions
from .safe_mode import catch_exceptions_if_in_safe_mode
+
@catch_exceptions_if_in_safe_mode
def request(method, url, **kwargs):
"""Constructs and sends a :class:`Request <Request>`.
if adhoc_session:
session.close()
+
def get(url, **kwargs):
"""Sends a GET request. Returns :class:`Response` object.
CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded'
+
def _basic_auth_str(username, password):
"""Returns a Basic Auth string."""
r.register_hook('response', self.handle_401)
return r
+
def _negotiate_value(r):
"""Extracts the gssapi authentication token from the appropriate header"""
return None
+
class HTTPKerberosAuth(AuthBase):
"""Attaches HTTP GSSAPI/Kerberos Authentication to the given Request object."""
def __init__(self, require_mutual_auth=True):
bytes = bytes
basestring = (str,bytes)
numeric_types = (int, float)
-
except ImportError:
import dummy_threading as threading
+
class MockRequest(object):
"""Wraps a `requests.Request` to mimic a `urllib2.Request`.
def get_new_headers(self):
return self._new_headers
+
class MockResponse(object):
"""Wraps a `httplib.HTTPMessage` to mimic a `urllib.addinfourl`.
def getheaders(self, name):
self._headers.getheaders(name)
+
def extract_cookies_to_jar(jar, request, response):
"""Extract the cookies from the response into a CookieJar.
res = MockResponse(response._original_response.msg)
jar.extract_cookies(res, req)
+
def get_cookie_header(jar, request):
"""Produce an appropriate Cookie header string to be sent with `request`, or None."""
r = MockRequest(request)
jar.add_cookie_header(r)
return r.get_new_headers().get('Cookie')
+
def remove_cookie_by_name(cookiejar, name, domain=None, path=None):
"""Unsets a cookie by name, by default over all domains and paths.
for domain, path, name in clearables:
cookiejar.clear(domain, path, name)
+
class CookieConflictError(RuntimeError):
- """There are two cookies that meet the criteria specified in the cookie jar.
+ """There are two cookies that meet the criteria specified in the cookie jar.
Use .get and .set and include domain and path args in order to be more specific."""
+
class RequestsCookieJar(cookielib.CookieJar, collections.MutableMapping):
"""Compatibility class; is a cookielib.CookieJar, but exposes a dict interface.
for cookie in iter(self):
values.append(cookie.value)
return values
-
+
def items(self):
"""Dict-like items() that returns a list of name-value tuples from the jar.
See keys() and values(). Allows client-code to call "dict(RequestsCookieJar)
if cookie.domain is not None and cookie.domain in domains:
return True
domains.append(cookie.domain)
- return False # there is only one domain in jar
+ return False # there is only one domain in jar
def get_dict(self, domain=None, path=None):
"""Takes as an argument an optional domain and path and returns a plain old
Python dict of name-value pairs of cookies that meet the requirements."""
dictionary = {}
for cookie in iter(self):
- if (domain == None or cookie.domain == domain) and (path == None
+ if (domain == None or cookie.domain == domain) and (path == None
or cookie.path == path):
dictionary[cookie.name] = cookie.value
return dictionary
remove_cookie_by_name(self, name)
def _find(self, name, domain=None, path=None):
- """Requests uses this method internally to get cookie values. Takes as args name
+ """Requests uses this method internally to get cookie values. Takes as args name
and optional domain and path. Returns a cookie.value. If there are conflicting cookies,
_find arbitrarily chooses one. See _find_no_duplicates if you want an exception thrown
if there are conflicting cookies."""
raise KeyError('name=%r, domain=%r, path=%r' % (name, domain, path))
def _find_no_duplicates(self, name, domain=None, path=None):
- """__get_item__ and get call _find_no_duplicates -- never used in Requests internally.
- Takes as args name and optional domain and path. Returns a cookie.value.
- Throws KeyError if cookie is not found and CookieConflictError if there are
+ """__get_item__ and get call _find_no_duplicates -- never used in Requests internally.
+ Takes as args name and optional domain and path. Returns a cookie.value.
+ Throws KeyError if cookie is not found and CookieConflictError if there are
multiple cookies that match name and optionally domain and path."""
toReturn = None
for cookie in iter(self):
if cookie.name == name:
if domain is None or cookie.domain == domain:
if path is None or cookie.path == path:
- if toReturn != None: # if there are multiple cookies that meet passed in criteria
+ if toReturn != None: # if there are multiple cookies that meet passed in criteria
raise CookieConflictError('There are multiple cookies with name, %r' % (name))
- toReturn = cookie.value # we will eventually return this as long as no cookie conflict
+ toReturn = cookie.value # we will eventually return this as long as no cookie conflict
if toReturn:
return toReturn
"""This is not implemented. Calling this will throw an exception."""
raise NotImplementedError
+
def create_cookie(name, value, **kwargs):
"""Make a cookie from underspecified parameters.
return cookielib.Cookie(**result)
+
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
c = create_cookie(
)
return c
+
def cookiejar_from_dict(cookie_dict, cookiejar=None):
"""Returns a CookieJar from a key/value dictionary.
"""
+
class RequestException(RuntimeError):
"""There was an ambiguous exception that occurred while handling your
request."""
+
class HTTPError(RequestException):
"""An HTTP error occurred."""
response = None
+
class ConnectionError(RequestException):
"""A Connection error occurred."""
+
class SSLError(ConnectionError):
"""An SSL error occurred."""
+
class Timeout(RequestException):
"""The request timed out."""
+
class URLRequired(RequestException):
"""A valid URL is required to make a request."""
+
class TooManyRedirects(RequestException):
"""Too many redirects."""
+
class MissingSchema(RequestException, ValueError):
"""The URL schema (e.g. http or https) is missing."""
+
class InvalidSchema(RequestException, ValueError):
"""See defaults.py for valid schemas."""
+
class InvalidURL(RequestException, ValueError):
""" The URL provided was somehow invalid. """
HOOKS = ('args', 'pre_request', 'pre_send', 'post_request', 'response')
+
def dispatch_hook(key, hooks, hook_data):
"""Dispatches a hook dictionary on a given piece of data."""
REDIRECT_STATI = (codes.moved, codes.found, codes.other, codes.temporary_moved)
CONTENT_CHUNK_SIZE = 10 * 1024
+
class Request(object):
"""The :class:`Request <Request>` object. It carries out all functionality of
Requests. Recommended interface is with the Requests functions.
self.__dict__.update(r.__dict__)
_p = urlparse(url)
- no_proxy = filter(lambda x:x.strip(), self.proxies.get('no', '').split(','))
+ no_proxy = filter(lambda x: x.strip(), self.proxies.get('no', '').split(','))
proxy = self.proxies.get(_p.scheme)
if proxy and not any(map(_p.netloc.endswith, no_proxy)):
from .exceptions import RequestException, ConnectionError, HTTPError
import socket
+
def catch_exceptions_if_in_safe_mode(function):
"""New implementation of safe_mode. We catch all exceptions at the API level
and then return a blank Response object with the error field filled. This decorator
wraps request() in api.py.
"""
-
+
def wrapped(method, url, **kwargs):
# if save_mode, we catch exceptions and fill error field
- if (kwargs.get('config') and kwargs.get('config').get('safe_mode')) or (kwargs.get('session')
+ if (kwargs.get('config') and kwargs.get('config').get('safe_mode')) or (kwargs.get('session')
and kwargs.get('session').config.get('safe_mode')):
try:
return function(method, url, **kwargs)
socket.timeout, socket.gaierror) as e:
r = Response()
r.error = e
- r.raw = HTTPResponse() # otherwise, tests fail
- r.status_code = 0 # with this status_code, content returns None
+ r.raw = HTTPResponse() # otherwise, tests fail
+ r.status_code = 0 # with this status_code, content returns None
return r
return function(method, url, **kwargs)
return wrapped
from .utils import header_expand
from .packages.urllib3.poolmanager import PoolManager
+
def merge_kwargs(local_kwarg, default_kwarg):
"""Merges kwarg dictionaries.
'headers', 'cookies', 'auth', 'timeout', 'proxies', 'hooks',
'params', 'config', 'verify', 'cert', 'prefetch']
-
def __init__(self,
headers=None,
cookies=None,
# Return the response.
return r.response
-
def get(self, url, **kwargs):
"""Sends a GET request. Returns :class:`Response` object.
kwargs.setdefault('allow_redirects', True)
return self.request('get', url, **kwargs)
-
def options(self, url, **kwargs):
"""Sends a OPTIONS request. Returns :class:`Response` object.
kwargs.setdefault('allow_redirects', True)
return self.request('options', url, **kwargs)
-
def head(self, url, **kwargs):
"""Sends a HEAD request. Returns :class:`Response` object.
kwargs.setdefault('allow_redirects', False)
return self.request('head', url, **kwargs)
-
def post(self, url, data=None, **kwargs):
"""Sends a POST request. Returns :class:`Response` object.
return self.request('post', url, data=data, **kwargs)
-
def put(self, url, data=None, **kwargs):
"""Sends a PUT request. Returns :class:`Response` object.
return self.request('put', url, data=data, **kwargs)
-
def patch(self, url, data=None, **kwargs):
"""Sends a PATCH request. Returns :class:`Response` object.
return self.request('patch', url, data=data, **kwargs)
-
def delete(self, url, **kwargs):
"""Sends a DELETE request. Returns :class:`Response` object.
for title in titles:
setattr(codes, title, code)
if not title.startswith('\\'):
- setattr(codes, title.upper(), code)
\ No newline at end of file
+ setattr(codes, title.upper(), code)
else:
return default
+
class LookupDict(dict):
"""Dictionary lookup object."""
'/etc/ssl/ca-bundle.pem',
]
+
def get_os_ca_bundle_path():
"""Try to pick an available CA certificate bundle provided by the OS."""
for path in POSSIBLE_CA_BUNDLE_PATHS:
# otherwise, try and use the OS bundle
DEFAULT_CA_BUNDLE_PATH = CERTIFI_BUNDLE_PATH or get_os_ca_bundle_path()
+
def dict_to_sequence(d):
"""Returns an internal sequence dictionary update."""
# or '%')
return quote(unquote_unreserved(uri), safe="!#$%&'()*+,/:;=?@[]~")
+
def get_environ_proxies():
"""Return a dict of environment proxies."""
import gc, os, subprocess, requests, sys
+
def main():
gc.disable()
sys.path.append('.')
from test_requests import httpbin, TestBaseMixin
+
class CookieTests(TestBaseMixin, unittest.TestCase):
def test_cookies_from_response(self):
def test_disabled_cookie_persistence(self):
"""Test that cookies are not persisted when configured accordingly."""
- config = {'store_cookies' : False}
+ config = {'store_cookies': False}
# Check the case when no cookie is passed as part of the request and the one in response is ignored
- cookies = requests.get(httpbin('cookies', 'set', 'key', 'value'), config = config).cookies
+ cookies = requests.get(httpbin('cookies', 'set', 'key', 'value'), config=config).cookies
self.assertTrue(cookies.get("key") is None)
# Test that the cookies passed while making the request still gets used and is available in response object.
# only the ones received from server is not saved
- cookies_2 = requests.get(httpbin('cookies', 'set', 'key', 'value'), config = config,\
- cookies = {"key_2" : "value_2"}).cookies
+ cookies_2 = requests.get(httpbin('cookies', 'set', 'key', 'value'), config=config,\
+ cookies={"key_2": "value_2"}).cookies
self.assertEqual(len(cookies_2), 1)
self.assertEqual(cookies_2.get("key_2"), "value_2")
# Use the session and make sure that the received cookie is not used in subsequent calls
s = requests.session()
- s.get(httpbin('cookies', 'set', 'key', 'value'), config = config)
+ s.get(httpbin('cookies', 'set', 'key', 'value'), config=config)
r = s.get(httpbin('cookies'))
self.assertEqual(json.loads(r.text)['cookies'], {})
self.assertEqual(len(c), len(r.cookies.keys()))
self.assertEqual(len(c), len(r.cookies.values()))
self.assertEqual(len(c), len(r.cookies.items()))
-
+
# domain and path utility functions
domain = r.cookies.list_domains()[0]
path = r.cookies.list_paths()[0]
# test keys, values, and items
self.assertEqual(r.cookies.keys(), ['myname'])
self.assertEqual(r.cookies.values(), ['myvalue'])
- self.assertEqual(r.cookies.items(), [('myname','myvalue')])
-
+ self.assertEqual(r.cookies.items(), [('myname', 'myvalue')])
+
# test if we can convert jar to dict
dictOfCookies = dict(r.cookies)
- self.assertEqual(dictOfCookies, {'myname':'myvalue'})
+ self.assertEqual(dictOfCookies, {'myname': 'myvalue'})
self.assertEqual(dictOfCookies, r.cookies.get_dict())
+
class LWPCookieJarTest(TestBaseMixin, unittest.TestCase):
"""Check store/load of cookies to FileCookieJar's, specifically LWPCookieJar's."""
self.assertEqual(len(cookiejar_2), 1)
self.assertCookieHas(list(cookiejar_2)[0], name='Persistent', value='CookiesAreScary')
+
class MozCookieJarTest(LWPCookieJarTest):
"""Same test, but substitute MozillaCookieJar."""
# time.sleep(1)
_httpbin = True
+
class TestBaseMixin(object):
def assertCookieHas(self, cookie, **kwargs):
message = 'Failed comparison for %s: %s != %s' % (attr, cookie_attr, expected_value)
self.assertEqual(cookie_attr, expected_value, message)
+
class RequestsTestSuite(TestSetup, TestBaseMixin, unittest.TestCase):
"""Requests test cases."""
post1 = post(url, files={'fname.txt': 'fdata'})
self.assertEqual(post1.status_code, 200)
- post2 = post(url, files={'fname.txt': 'fdata', 'fname2.txt':'more fdata'})
+ post2 = post(url, files={'fname.txt': 'fdata', 'fname2.txt': 'more fdata'})
self.assertEqual(post2.status_code, 200)
- post3 = post(url, files={'fname.txt': 'fdata', 'fname2.txt':open(__file__,'rb')})
+ post3 = post(url, files={'fname.txt': 'fdata', 'fname2.txt': open(__file__, 'rb')})
self.assertEqual(post3.status_code, 200)
post4 = post(url, files={'fname.txt': 'fdata'})
self.assertTrue(rbody['files'].get('fname.txt'), None)
self.assertEqual(rbody['files']['fname.txt'], 'fdata to verify')
-
def test_nonzero_evaluation(self):
for service in SERVICES:
r = get('http://localhost:1/nope', allow_redirects=False, config=config)
assert r.content == None
-
# def test_invalid_content(self):
# # WARNING: if you're using a terrible DNS provider (comcast),
# # this will fail.
s.config['danger_mode'] = True
s.get(httpbin('redirect', '4'))
-
def test_empty_response(self):
r = requests.get(httpbin('status', '404'))
r.text
def test_addition(self):
assert (1 + 1) == 2
-
def test_ssl_hostname_ok(self):
requests.get('https://github.com', verify=True)
-
def test_ssl_hostname_not_ok(self):
requests.get('https://kennethreitz.com', verify=False)
self.assertRaises(requests.exceptions.SSLError, requests.get, 'https://kennethreitz.com')
-
def test_ssl_hostname_session_not_ok(self):
s = requests.session()
s.get('https://kennethreitz.com', verify=False)
-
def test_binary_post(self):
'''We need to be careful how we build the utf-8 string since
unicode literals are a syntax error in python3
raise EnvironmentError('Flesh out this test for your environment.')
requests.post('http://www.google.com/', data=utf8_string)
-
-
def test_unicode_error(self):
url = 'http://blip.fm/~1abvfu'
requests.get(url)
-
def test_chunked_head_redirect(self):
url = "http://t.co/NFrx0zLG"
r = requests.head(url, allow_redirects=True)
if __name__ == '__main__':
unittest.main()
-
sys.path.insert(0, os.path.abspath('..'))
import requests
+
class HTTPSTest(unittest.TestCase):
"""Smoke test for https functionality."""