2 # -*- coding: utf-8 -*-
4 """Tests for Requests."""
6 from __future__ import division
14 from requests.adapters import HTTPAdapter
15 from requests.auth import HTTPDigestAuth
16 from requests.compat import (
17 Morsel, cookielib, getproxies, str, urljoin, urlparse)
18 from requests.cookies import cookiejar_from_dict, morsel_to_cookie
19 from requests.exceptions import InvalidURL, MissingSchema
20 from requests.structures import CaseInsensitiveDict
27 HTTPBIN = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/')
28 # Issue #1483: Make sure the URL always has a trailing slash
29 HTTPBIN = HTTPBIN.rstrip('/') + '/'
33 """Returns url for HTTPBIN resource."""
34 return urljoin(HTTPBIN, '/'.join(suffix))
37 class RequestsTestCase(unittest.TestCase):
39 _multiprocess_can_split_ = True
42 """Create simple data set with headers."""
49 def test_entry_points(self):
52 requests.session().get
53 requests.session().head
60 def test_invalid_url(self):
61 with pytest.raises(MissingSchema):
62 requests.get('hiwpefhipowhefopw')
63 with pytest.raises(InvalidURL):
64 requests.get('http://')
66 def test_basic_building(self):
67 req = requests.Request()
68 req.url = 'http://kennethreitz.org/'
69 req.data = {'life': '42'}
72 assert pr.url == req.url
73 assert pr.body == 'life=42'
75 def test_no_content_length(self):
76 get_req = requests.Request('GET', httpbin('get')).prepare()
77 assert 'Content-Length' not in get_req.headers
78 head_req = requests.Request('HEAD', httpbin('head')).prepare()
79 assert 'Content-Length' not in head_req.headers
81 def test_path_is_not_double_encoded(self):
82 request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
84 assert request.path_url == '/get/test%20case'
86 def test_params_are_added_before_fragment(self):
87 request = requests.Request('GET',
88 "http://example.com/path#fragment", params={"a": "b"}).prepare()
89 assert request.url == "http://example.com/path?a=b#fragment"
90 request = requests.Request('GET',
91 "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare()
92 assert request.url == "http://example.com/path?key=value&a=b#fragment"
94 def test_mixed_case_scheme_acceptable(self):
95 s = requests.Session()
96 s.proxies = getproxies()
97 parts = urlparse(httpbin('get'))
98 schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://',
99 'https://', 'HTTPS://', 'hTTps://', 'HttPs://']
100 for scheme in schemes:
101 url = scheme + parts.netloc + parts.path
102 r = requests.Request('GET', url)
103 r = s.send(r.prepare())
104 assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
106 def test_HTTP_200_OK_GET_ALTERNATIVE(self):
107 r = requests.Request('GET', httpbin('get'))
108 s = requests.Session()
109 s.proxies = getproxies()
111 r = s.send(r.prepare())
113 assert r.status_code == 200
115 def test_HTTP_302_ALLOW_REDIRECT_GET(self):
116 r = requests.get(httpbin('redirect', '1'))
117 assert r.status_code == 200
119 # def test_HTTP_302_ALLOW_REDIRECT_POST(self):
120 # r = requests.post(httpbin('status', '302'), data={'some': 'data'})
121 # self.assertEqual(r.status_code, 200)
123 def test_HTTP_200_OK_GET_WITH_PARAMS(self):
124 heads = {'User-agent': 'Mozilla/5.0'}
126 r = requests.get(httpbin('user-agent'), headers=heads)
128 assert heads['User-agent'] in r.text
129 assert r.status_code == 200
131 def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
132 heads = {'User-agent': 'Mozilla/5.0'}
134 r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
135 assert r.status_code == 200
137 def test_set_cookie_on_301(self):
138 s = requests.session()
139 url = httpbin('cookies/set?foo=bar')
141 assert s.cookies['foo'] == 'bar'
143 def test_cookie_sent_on_redirect(self):
144 s = requests.session()
145 s.get(httpbin('cookies/set?foo=bar'))
146 r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
147 assert 'Cookie' in r.json()['headers']
149 def test_cookie_removed_on_expire(self):
150 s = requests.session()
151 s.get(httpbin('cookies/set?foo=bar'))
152 assert s.cookies['foo'] == 'bar'
154 httpbin('response-headers'),
157 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
160 assert 'foo' not in s.cookies
162 def test_cookie_quote_wrapped(self):
163 s = requests.session()
164 s.get(httpbin('cookies/set?foo="bar:baz"'))
165 assert s.cookies['foo'] == '"bar:baz"'
167 def test_cookie_persists_via_api(self):
168 s = requests.session()
169 r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
170 assert 'foo' in r.request.headers['Cookie']
171 assert 'foo' in r.history[0].request.headers['Cookie']
173 def test_request_cookie_overrides_session_cookie(self):
174 s = requests.session()
175 s.cookies['foo'] = 'bar'
176 r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
177 assert r.json()['cookies']['foo'] == 'baz'
178 # Session cookie should not be modified
179 assert s.cookies['foo'] == 'bar'
181 def test_request_cookies_not_persisted(self):
182 s = requests.session()
183 s.get(httpbin('cookies'), cookies={'foo': 'baz'})
184 # Sending a request with cookies should not add cookies to the session
187 def test_generic_cookiejar_works(self):
188 cj = cookielib.CookieJar()
189 cookiejar_from_dict({'foo': 'bar'}, cj)
190 s = requests.session()
192 r = s.get(httpbin('cookies'))
193 # Make sure the cookie was sent
194 assert r.json()['cookies']['foo'] == 'bar'
195 # Make sure the session cj is still the custom one
196 assert s.cookies is cj
198 def test_param_cookiejar_works(self):
199 cj = cookielib.CookieJar()
200 cookiejar_from_dict({'foo' : 'bar'}, cj)
201 s = requests.session()
202 r = s.get(httpbin('cookies'), cookies=cj)
203 # Make sure the cookie was sent
204 assert r.json()['cookies']['foo'] == 'bar'
206 def test_requests_in_history_are_not_overridden(self):
207 resp = requests.get(httpbin('redirect/3'))
208 urls = [r.url for r in resp.history]
209 req_urls = [r.request.url for r in resp.history]
210 assert urls == req_urls
212 def test_user_agent_transfers(self):
215 'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
218 r = requests.get(httpbin('user-agent'), headers=heads)
219 assert heads['User-agent'] in r.text
222 'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
225 r = requests.get(httpbin('user-agent'), headers=heads)
226 assert heads['user-agent'] in r.text
228 def test_HTTP_200_OK_HEAD(self):
229 r = requests.head(httpbin('get'))
230 assert r.status_code == 200
232 def test_HTTP_200_OK_PUT(self):
233 r = requests.put(httpbin('put'))
234 assert r.status_code == 200
236 def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
237 auth = ('user', 'pass')
238 url = httpbin('basic-auth', 'user', 'pass')
240 r = requests.get(url, auth=auth)
241 assert r.status_code == 200
243 r = requests.get(url)
244 assert r.status_code == 401
246 s = requests.session()
249 assert r.status_code == 200
251 def test_basicauth_with_netrc(self):
252 auth = ('user', 'pass')
253 wrong_auth = ('wronguser', 'wrongpass')
254 url = httpbin('basic-auth', 'user', 'pass')
256 def get_netrc_auth_mock(url):
258 requests.sessions.get_netrc_auth = get_netrc_auth_mock
260 # Should use netrc and work.
261 r = requests.get(url)
262 assert r.status_code == 200
264 # Given auth should override and fail.
265 r = requests.get(url, auth=wrong_auth)
266 assert r.status_code == 401
268 s = requests.session()
270 # Should use netrc and work.
272 assert r.status_code == 200
274 # Given auth should override and fail.
277 assert r.status_code == 401
279 def test_DIGEST_HTTP_200_OK_GET(self):
281 auth = HTTPDigestAuth('user', 'pass')
282 url = httpbin('digest-auth', 'auth', 'user', 'pass')
284 r = requests.get(url, auth=auth)
285 assert r.status_code == 200
287 r = requests.get(url)
288 assert r.status_code == 401
290 s = requests.session()
291 s.auth = HTTPDigestAuth('user', 'pass')
293 assert r.status_code == 200
295 def test_DIGEST_AUTH_RETURNS_COOKIE(self):
296 url = httpbin('digest-auth', 'auth', 'user', 'pass')
297 auth = HTTPDigestAuth('user', 'pass')
298 r = requests.get(url)
299 assert r.cookies['fake'] == 'fake_value'
301 r = requests.get(url, auth=auth)
302 assert r.status_code == 200
304 def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self):
305 url = httpbin('digest-auth', 'auth', 'user', 'pass')
306 auth = HTTPDigestAuth('user', 'pass')
307 s = requests.Session()
308 s.get(url, auth=auth)
309 assert s.cookies['fake'] == 'fake_value'
311 def test_DIGEST_STREAM(self):
313 auth = HTTPDigestAuth('user', 'pass')
314 url = httpbin('digest-auth', 'auth', 'user', 'pass')
316 r = requests.get(url, auth=auth, stream=True)
317 assert r.raw.read() != b''
319 r = requests.get(url, auth=auth, stream=False)
320 assert r.raw.read() == b''
322 def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
324 auth = HTTPDigestAuth('user', 'wrongpass')
325 url = httpbin('digest-auth', 'auth', 'user', 'pass')
327 r = requests.get(url, auth=auth)
328 assert r.status_code == 401
330 r = requests.get(url)
331 assert r.status_code == 401
333 s = requests.session()
336 assert r.status_code == 401
338 def test_DIGESTAUTH_QUOTES_QOP_VALUE(self):
340 auth = HTTPDigestAuth('user', 'pass')
341 url = httpbin('digest-auth', 'auth', 'user', 'pass')
343 r = requests.get(url, auth=auth)
344 assert '"auth"' in r.request.headers['Authorization']
346 def test_POSTBIN_GET_POST_FILES(self):
348 url = httpbin('post')
349 post1 = requests.post(url).raise_for_status()
351 post1 = requests.post(url, data={'some': 'data'})
352 assert post1.status_code == 200
354 with open('requirements.txt') as f:
355 post2 = requests.post(url, files={'some': f})
356 assert post2.status_code == 200
358 post4 = requests.post(url, data='[{"some": "json"}]')
359 assert post4.status_code == 200
361 with pytest.raises(ValueError):
362 requests.post(url, files = ['bad file data'])
364 def test_POSTBIN_GET_POST_FILES_WITH_DATA(self):
366 url = httpbin('post')
367 post1 = requests.post(url).raise_for_status()
369 post1 = requests.post(url, data={'some': 'data'})
370 assert post1.status_code == 200
372 with open('requirements.txt') as f:
373 post2 = requests.post(url, data={'some': 'data'}, files={'some': f})
374 assert post2.status_code == 200
376 post4 = requests.post(url, data='[{"some": "json"}]')
377 assert post4.status_code == 200
379 with pytest.raises(ValueError):
380 requests.post(url, files = ['bad file data'])
382 def test_conflicting_post_params(self):
383 url = httpbin('post')
384 with open('requirements.txt') as f:
385 pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})")
386 pytest.raises(ValueError, "requests.post(url, data=u'[{\"some\": \"data\"}]', files={'some': f})")
388 def test_request_ok_set(self):
389 r = requests.get(httpbin('status', '404'))
392 def test_status_raising(self):
393 r = requests.get(httpbin('status', '404'))
394 with pytest.raises(requests.exceptions.HTTPError):
397 r = requests.get(httpbin('status', '500'))
400 def test_decompress_gzip(self):
401 r = requests.get(httpbin('gzip'))
402 r.content.decode('ascii')
404 def test_unicode_get(self):
405 url = httpbin('/get')
406 requests.get(url, params={'foo': 'føø'})
407 requests.get(url, params={'føø': 'føø'})
408 requests.get(url, params={'føø': 'føø'})
409 requests.get(url, params={'foo': 'foo'})
410 requests.get(httpbin('ø'), params={'foo': 'foo'})
412 def test_unicode_header_name(self):
413 requests.put(httpbin('put'), headers={str('Content-Type'): 'application/octet-stream'}, data='\xff') # compat.str is unicode.
415 def test_urlencoded_get_query_multivalued_param(self):
417 r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
418 assert r.status_code == 200
419 assert r.url == httpbin('get?test=foo&test=baz')
421 def test_different_encodings_dont_break_post(self):
422 r = requests.post(httpbin('post'),
423 data={'stuff': json.dumps({'a': 123})},
424 params={'blah': 'asdf1234'},
425 files={'file': ('test_requests.py', open(__file__, 'rb'))})
426 assert r.status_code == 200
428 def test_unicode_multipart_post(self):
429 r = requests.post(httpbin('post'),
430 data={'stuff': u'ëlïxr'},
431 files={'file': ('test_requests.py', open(__file__, 'rb'))})
432 assert r.status_code == 200
434 r = requests.post(httpbin('post'),
435 data={'stuff': u'ëlïxr'.encode('utf-8')},
436 files={'file': ('test_requests.py', open(__file__, 'rb'))})
437 assert r.status_code == 200
439 r = requests.post(httpbin('post'),
440 data={'stuff': 'elixr'},
441 files={'file': ('test_requests.py', open(__file__, 'rb'))})
442 assert r.status_code == 200
444 r = requests.post(httpbin('post'),
445 data={'stuff': 'elixr'.encode('utf-8')},
446 files={'file': ('test_requests.py', open(__file__, 'rb'))})
447 assert r.status_code == 200
449 def test_unicode_multipart_post_fieldnames(self):
450 filename = os.path.splitext(__file__)[0] + '.py'
451 r = requests.Request(method='POST',
453 data={'stuff'.encode('utf-8'): 'elixr'},
454 files={'file': ('test_requests.py',
455 open(filename, 'rb'))})
457 assert b'name="stuff"' in prep.body
458 assert b'name="b\'stuff\'"' not in prep.body
460 def test_unicode_method_name(self):
461 files = {'file': open('test_requests.py', 'rb')}
462 r = requests.request(method=u'POST', url=httpbin('post'), files=files)
463 assert r.status_code == 200
465 def test_custom_content_type(self):
466 r = requests.post(httpbin('post'),
467 data={'stuff': json.dumps({'a': 123})},
468 files={'file1': ('test_requests.py', open(__file__, 'rb')),
469 'file2': ('test_requests', open(__file__, 'rb'),
470 'text/py-content-type')})
471 assert r.status_code == 200
472 assert b"text/py-content-type" in r.request.body
474 def test_hook_receives_request_arguments(self):
475 def hook(resp, **kwargs):
476 assert resp is not None
479 requests.Request('GET', HTTPBIN, hooks={'response': hook})
481 def test_session_hooks_are_used_with_no_request_hooks(self):
482 hook = lambda x, *args, **kwargs: x
483 s = requests.Session()
484 s.hooks['response'].append(hook)
485 r = requests.Request('GET', HTTPBIN)
486 prep = s.prepare_request(r)
487 assert prep.hooks['response'] != []
488 assert prep.hooks['response'] == [hook]
490 def test_session_hooks_are_overriden_by_request_hooks(self):
491 hook1 = lambda x, *args, **kwargs: x
492 hook2 = lambda x, *args, **kwargs: x
493 assert hook1 is not hook2
494 s = requests.Session()
495 s.hooks['response'].append(hook2)
496 r = requests.Request('GET', HTTPBIN, hooks={'response': [hook1]})
497 prep = s.prepare_request(r)
498 assert prep.hooks['response'] == [hook1]
500 def test_prepared_request_hook(self):
501 def hook(resp, **kwargs):
502 resp.hook_working = True
505 req = requests.Request('GET', HTTPBIN, hooks={'response': hook})
508 s = requests.Session()
509 s.proxies = getproxies()
512 assert hasattr(resp, 'hook_working')
514 def test_prepared_from_session(self):
515 class DummyAuth(requests.auth.AuthBase):
516 def __call__(self, r):
517 r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
520 req = requests.Request('GET', httpbin('headers'))
523 s = requests.Session()
526 prep = s.prepare_request(req)
529 assert resp.json()['headers']['Dummy-Auth-Test'] == 'dummy-auth-test-ok'
531 def test_links(self):
532 r = requests.Response()
534 'cache-control': 'public, max-age=60, s-maxage=60',
535 'connection': 'keep-alive',
536 'content-encoding': 'gzip',
537 'content-type': 'application/json; charset=utf-8',
538 'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
539 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
540 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
541 'link': ('<https://api.github.com/users/kennethreitz/repos?'
542 'page=2&per_page=10>; rel="next", <https://api.github.'
543 'com/users/kennethreitz/repos?page=7&per_page=10>; '
545 'server': 'GitHub.com',
548 'x-content-type-options': 'nosniff',
549 'x-github-media-type': 'github.beta',
550 'x-ratelimit-limit': '60',
551 'x-ratelimit-remaining': '57'
553 assert r.links['next']['rel'] == 'next'
555 def test_cookie_parameters(self):
560 rest = {'HttpOnly': True}
562 jar = requests.cookies.RequestsCookieJar()
563 jar.set(key, value, secure=secure, domain=domain, rest=rest)
566 assert 'some_cookie' in jar
568 cookie = list(jar)[0]
569 assert cookie.secure == secure
570 assert cookie.domain == domain
571 assert cookie._rest['HttpOnly'] == rest['HttpOnly']
573 def test_time_elapsed_blank(self):
574 r = requests.get(httpbin('get'))
576 total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
578 assert total_seconds > 0.0
580 def test_response_is_iterable(self):
581 r = requests.Response()
582 io = StringIO.StringIO('abc')
585 def read_mock(amt, decode_content=None):
587 setattr(io, 'read', read_mock)
592 def test_request_and_response_are_pickleable(self):
593 r = requests.get(httpbin('get'))
595 # verify we can pickle the original request
596 assert pickle.loads(pickle.dumps(r.request))
598 # verify we can pickle the response and that we have access to
599 # the original request.
600 pr = pickle.loads(pickle.dumps(r))
601 assert r.request.url == pr.request.url
602 assert r.request.headers == pr.request.headers
604 def test_get_auth_from_url(self):
605 url = 'http://user:pass@complex.url.com/path?query=yes'
606 assert ('user', 'pass') == requests.utils.get_auth_from_url(url)
608 def test_get_auth_from_url_encoded_spaces(self):
609 url = 'http://user:pass%20pass@complex.url.com/path?query=yes'
610 assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
612 def test_get_auth_from_url_not_encoded_spaces(self):
613 url = 'http://user:pass pass@complex.url.com/path?query=yes'
614 assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
616 def test_get_auth_from_url_percent_chars(self):
617 url = 'http://user%user:pass@complex.url.com/path?query=yes'
618 assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url)
620 def test_cannot_send_unprepared_requests(self):
621 r = requests.Request(url=HTTPBIN)
622 with pytest.raises(ValueError):
623 requests.Session().send(r)
625 def test_http_error(self):
626 error = requests.exceptions.HTTPError()
627 assert not error.response
628 response = requests.Response()
629 error = requests.exceptions.HTTPError(response=response)
630 assert error.response == response
631 error = requests.exceptions.HTTPError('message', response=response)
632 assert str(error) == 'message'
633 assert error.response == response
635 def test_session_pickling(self):
636 r = requests.Request('GET', httpbin('get'))
637 s = requests.Session()
639 s = pickle.loads(pickle.dumps(s))
640 s.proxies = getproxies()
642 r = s.send(r.prepare())
643 assert r.status_code == 200
645 def test_fixes_1329(self):
647 Ensure that header updates are done case-insensitively.
649 s = requests.Session()
650 s.headers.update({'ACCEPT': 'BOGUS'})
651 s.headers.update({'accept': 'application/json'})
652 r = s.get(httpbin('get'))
653 headers = r.request.headers
654 assert headers['accept'] == 'application/json'
655 assert headers['Accept'] == 'application/json'
656 assert headers['ACCEPT'] == 'application/json'
658 def test_uppercase_scheme_redirect(self):
659 parts = urlparse(httpbin('html'))
660 url = "HTTP://" + parts.netloc + parts.path
661 r = requests.get(httpbin('redirect-to'), params={'url': url})
662 assert r.status_code == 200
663 assert r.url.lower() == url.lower()
665 def test_transport_adapter_ordering(self):
666 s = requests.Session()
667 order = ['https://', 'http://']
668 assert order == list(s.adapters)
669 s.mount('http://git', HTTPAdapter())
670 s.mount('http://github', HTTPAdapter())
671 s.mount('http://github.com', HTTPAdapter())
672 s.mount('http://github.com/about/', HTTPAdapter())
674 'http://github.com/about/',
681 assert order == list(s.adapters)
682 s.mount('http://gittip', HTTPAdapter())
683 s.mount('http://gittip.com', HTTPAdapter())
684 s.mount('http://gittip.com/about/', HTTPAdapter())
686 'http://github.com/about/',
687 'http://gittip.com/about/',
696 assert order == list(s.adapters)
697 s2 = requests.Session()
698 s2.adapters = {'http://': HTTPAdapter()}
699 s2.mount('https://', HTTPAdapter())
700 assert 'http://' in s2.adapters
701 assert 'https://' in s2.adapters
703 def test_header_remove_is_case_insensitive(self):
705 s = requests.Session()
706 s.headers['foo'] = 'bar'
707 r = s.get(httpbin('get'), headers={'FOO': None})
708 assert 'foo' not in r.request.headers
710 def test_params_are_merged_case_sensitive(self):
711 s = requests.Session()
712 s.params['foo'] = 'bar'
713 r = s.get(httpbin('get'), params={'FOO': 'bar'})
714 assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
717 def test_long_authinfo_in_url(self):
718 url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format(
719 'E8A3BE87-9E3F-4620-8858-95478E385B5B',
720 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
721 'exactly-------------sixty-----------three------------characters',
723 r = requests.Request('GET', url).prepare()
726 def test_header_keys_are_native(self):
727 headers = {u'unicode': 'blah', 'byte'.encode('ascii'): 'blah'}
728 r = requests.Request('GET', httpbin('get'), headers=headers)
731 # This is testing that they are builtin strings. A bit weird, but there
733 assert 'unicode' in p.headers.keys()
734 assert 'byte' in p.headers.keys()
736 def test_can_send_nonstring_objects_with_files(self):
739 r = requests.Request('POST', httpbin('post'), data=data, files=files)
742 assert 'multipart/form-data' in p.headers['Content-Type']
744 def test_autoset_header_values_are_native(self):
745 data = 'this is a string'
747 req = requests.Request('POST', httpbin('post'), data=data)
750 assert p.headers['Content-Length'] == length
752 def test_oddball_schemes_dont_check_URLs(self):
754 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==',
755 'file:///etc/passwd',
756 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
758 for test_url in test_urls:
759 req = requests.Request('GET', test_url)
761 assert test_url == preq.url
763 def test_morsel_to_cookie_expires_is_converted(self):
766 # Test case where we convert from string time
767 morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
768 cookie = morsel_to_cookie(morsel)
769 self.assertEquals(cookie.expires, 18001)
771 # Test case where no conversion is required
772 morsel['expires'] = 100
773 cookie = morsel_to_cookie(morsel)
774 self.assertEquals(cookie.expires, 100)
776 # Test case where an invalid string is input
777 morsel['expires'] = 'woops'
778 with self.assertRaises(ValueError):
779 cookie = morsel_to_cookie(morsel)
782 class TestContentEncodingDetection(unittest.TestCase):
785 encodings = requests.utils.get_encodings_from_content('')
786 assert not len(encodings)
788 def test_html_charset(self):
789 """HTML5 meta charset attribute"""
790 content = '<meta charset="UTF-8">'
791 encodings = requests.utils.get_encodings_from_content(content)
792 assert len(encodings) == 1
793 assert encodings[0] == 'UTF-8'
795 def test_html4_pragma(self):
796 """HTML4 pragma directive"""
797 content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">'
798 encodings = requests.utils.get_encodings_from_content(content)
799 assert len(encodings) == 1
800 assert encodings[0] == 'UTF-8'
802 def test_xhtml_pragma(self):
803 """XHTML 1.x served with text/html MIME type"""
804 content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />'
805 encodings = requests.utils.get_encodings_from_content(content)
806 assert len(encodings) == 1
807 assert encodings[0] == 'UTF-8'
810 """XHTML 1.x served as XML"""
811 content = '<?xml version="1.0" encoding="UTF-8"?>'
812 encodings = requests.utils.get_encodings_from_content(content)
813 assert len(encodings) == 1
814 assert encodings[0] == 'UTF-8'
816 def test_precedence(self):
818 <?xml version="1.0" encoding="XML"?>
819 <meta charset="HTML5">
820 <meta http-equiv="Content-type" content="text/html;charset=HTML4" />
822 encodings = requests.utils.get_encodings_from_content(content)
823 assert encodings == ['HTML5', 'HTML4', 'XML']
826 class TestCaseInsensitiveDict(unittest.TestCase):
828 def test_mapping_init(self):
829 cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
834 def test_iterable_init(self):
835 cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')])
840 def test_kwargs_init(self):
841 cid = CaseInsensitiveDict(FOO='foo', BAr='bar')
846 def test_docstring_example(self):
847 cid = CaseInsensitiveDict()
848 cid['Accept'] = 'application/json'
849 assert cid['aCCEPT'] == 'application/json'
850 assert list(cid) == ['Accept']
853 cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
857 def test_getitem(self):
858 cid = CaseInsensitiveDict({'Spam': 'blueval'})
859 assert cid['spam'] == 'blueval'
860 assert cid['SPAM'] == 'blueval'
862 def test_fixes_649(self):
863 """__setitem__ should behave case-insensitively."""
864 cid = CaseInsensitiveDict()
865 cid['spam'] = 'oneval'
866 cid['Spam'] = 'twoval'
867 cid['sPAM'] = 'redval'
868 cid['SPAM'] = 'blueval'
869 assert cid['spam'] == 'blueval'
870 assert cid['SPAM'] == 'blueval'
871 assert list(cid.keys()) == ['SPAM']
873 def test_delitem(self):
874 cid = CaseInsensitiveDict()
875 cid['Spam'] = 'someval'
877 assert 'spam' not in cid
880 def test_contains(self):
881 cid = CaseInsensitiveDict()
882 cid['Spam'] = 'someval'
887 assert 'notspam' not in cid
890 cid = CaseInsensitiveDict()
891 cid['spam'] = 'oneval'
892 cid['SPAM'] = 'blueval'
893 assert cid.get('spam') == 'blueval'
894 assert cid.get('SPAM') == 'blueval'
895 assert cid.get('sPam') == 'blueval'
896 assert cid.get('notspam', 'default') == 'default'
898 def test_update(self):
899 cid = CaseInsensitiveDict()
900 cid['spam'] = 'blueval'
901 cid.update({'sPam': 'notblueval'})
902 assert cid['spam'] == 'notblueval'
903 cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
904 cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
906 assert cid['foo'] == 'anotherfoo'
907 assert cid['bar'] == 'anotherbar'
909 def test_update_retains_unchanged(self):
910 cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
911 cid.update({'foo': 'newfoo'})
912 assert cid['bar'] == 'bar'
915 cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
916 keys = frozenset(['Spam', 'Eggs'])
917 assert frozenset(iter(cid)) == keys
919 def test_equality(self):
920 cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
921 othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
922 assert cid == othercid
924 assert cid != othercid
925 assert cid == {'spam': 'blueval', 'eggs': 'redval'}
927 def test_setdefault(self):
928 cid = CaseInsensitiveDict({'Spam': 'blueval'})
929 assert cid.setdefault('spam', 'notblueval') == 'blueval'
930 assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
932 def test_lower_items(self):
933 cid = CaseInsensitiveDict({
934 'Accept': 'application/json',
935 'user-Agent': 'requests',
937 keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
938 lowerkeyset = frozenset(['accept', 'user-agent'])
939 assert keyset == lowerkeyset
941 def test_preserve_key_case(self):
942 cid = CaseInsensitiveDict({
943 'Accept': 'application/json',
944 'user-Agent': 'requests',
946 keyset = frozenset(['Accept', 'user-Agent'])
947 assert frozenset(i[0] for i in cid.items()) == keyset
948 assert frozenset(cid.keys()) == keyset
949 assert frozenset(cid) == keyset
951 def test_preserve_last_key_case(self):
952 cid = CaseInsensitiveDict({
953 'Accept': 'application/json',
954 'user-Agent': 'requests',
956 cid.update({'ACCEPT': 'application/json'})
957 cid['USER-AGENT'] = 'requests'
958 keyset = frozenset(['ACCEPT', 'USER-AGENT'])
959 assert frozenset(i[0] for i in cid.items()) == keyset
960 assert frozenset(cid.keys()) == keyset
961 assert frozenset(cid) == keyset
964 class UtilsTestCase(unittest.TestCase):
966 def test_super_len_io_streams(self):
967 """ Ensures that we properly deal with different kinds of IO streams. """
968 # uses StringIO or io.StringIO (see import above)
969 from io import BytesIO
970 from requests.utils import super_len
972 assert super_len(StringIO.StringIO()) == 0
973 assert super_len(StringIO.StringIO('with so much drama in the LBC')) == 29
975 assert super_len(BytesIO()) == 0
976 assert super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
983 assert super_len(cStringIO.StringIO('but some how, some way...')) == 25
985 def test_get_environ_proxies_ip_ranges(self):
986 """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
987 from requests.utils import get_environ_proxies
988 os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
989 assert get_environ_proxies('http://192.168.0.1:5000/') == {}
990 assert get_environ_proxies('http://192.168.0.1/') == {}
991 assert get_environ_proxies('http://172.16.1.1/') == {}
992 assert get_environ_proxies('http://172.16.1.1:5000/') == {}
993 assert get_environ_proxies('http://192.168.1.1:5000/') != {}
994 assert get_environ_proxies('http://192.168.1.1/') != {}
996 def test_get_environ_proxies(self):
997 """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
998 from requests.utils import get_environ_proxies
999 os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
1000 assert get_environ_proxies('http://localhost.localdomain:5000/v1.0/') == {}
1001 assert get_environ_proxies('http://www.requests.com/') != {}
1003 def test_is_ipv4_address(self):
1004 from requests.utils import is_ipv4_address
1005 assert is_ipv4_address('8.8.8.8')
1006 assert not is_ipv4_address('8.8.8.8.8')
1007 assert not is_ipv4_address('localhost.localdomain')
1009 def test_is_valid_cidr(self):
1010 from requests.utils import is_valid_cidr
1011 assert not is_valid_cidr('8.8.8.8')
1012 assert is_valid_cidr('192.168.1.0/24')
1014 def test_dotted_netmask(self):
1015 from requests.utils import dotted_netmask
1016 assert dotted_netmask(8) == '255.0.0.0'
1017 assert dotted_netmask(24) == '255.255.255.0'
1018 assert dotted_netmask(25) == '255.255.255.128'
1020 def test_address_in_network(self):
1021 from requests.utils import address_in_network
1022 assert address_in_network('192.168.1.1', '192.168.1.0/24')
1023 assert not address_in_network('172.16.0.1', '192.168.1.0/24')
1026 if __name__ == '__main__':