test_requests: Add tests for morsel_to_cookie
[platform/upstream/python-requests.git] / test_requests.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 """Tests for Requests."""
5
6 from __future__ import division
7 import json
8 import os
9 import pickle
10 import unittest
11
12 import requests
13 import pytest
14 from requests.adapters import HTTPAdapter
15 from requests.auth import HTTPDigestAuth
16 from requests.compat import (
17     Morsel, cookielib, getproxies, str, urljoin, urlparse)
18 from requests.cookies import cookiejar_from_dict, morsel_to_cookie
19 from requests.exceptions import InvalidURL, MissingSchema
20 from requests.structures import CaseInsensitiveDict
21
22 try:
23     import StringIO
24 except ImportError:
25     import io as StringIO
26
27 HTTPBIN = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/')
28 # Issue #1483: Make sure the URL always has a trailing slash
29 HTTPBIN = HTTPBIN.rstrip('/') + '/'
30
31
32 def httpbin(*suffix):
33     """Returns url for HTTPBIN resource."""
34     return urljoin(HTTPBIN, '/'.join(suffix))
35
36
37 class RequestsTestCase(unittest.TestCase):
38
39     _multiprocess_can_split_ = True
40
41     def setUp(self):
42         """Create simple data set with headers."""
43         pass
44
45     def tearDown(self):
46         """Teardown."""
47         pass
48
49     def test_entry_points(self):
50
51         requests.session
52         requests.session().get
53         requests.session().head
54         requests.get
55         requests.head
56         requests.put
57         requests.patch
58         requests.post
59
60     def test_invalid_url(self):
61         with pytest.raises(MissingSchema):
62             requests.get('hiwpefhipowhefopw')
63         with pytest.raises(InvalidURL):
64             requests.get('http://')
65
66     def test_basic_building(self):
67         req = requests.Request()
68         req.url = 'http://kennethreitz.org/'
69         req.data = {'life': '42'}
70
71         pr = req.prepare()
72         assert pr.url == req.url
73         assert pr.body == 'life=42'
74
75     def test_no_content_length(self):
76         get_req = requests.Request('GET', httpbin('get')).prepare()
77         assert 'Content-Length' not in get_req.headers
78         head_req = requests.Request('HEAD', httpbin('head')).prepare()
79         assert 'Content-Length' not in head_req.headers
80
81     def test_path_is_not_double_encoded(self):
82         request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
83
84         assert request.path_url == '/get/test%20case'
85
86     def test_params_are_added_before_fragment(self):
87         request = requests.Request('GET',
88             "http://example.com/path#fragment", params={"a": "b"}).prepare()
89         assert request.url == "http://example.com/path?a=b#fragment"
90         request = requests.Request('GET',
91             "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare()
92         assert request.url == "http://example.com/path?key=value&a=b#fragment"
93
94     def test_mixed_case_scheme_acceptable(self):
95         s = requests.Session()
96         s.proxies = getproxies()
97         parts = urlparse(httpbin('get'))
98         schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://',
99                    'https://', 'HTTPS://', 'hTTps://', 'HttPs://']
100         for scheme in schemes:
101             url = scheme + parts.netloc + parts.path
102             r = requests.Request('GET', url)
103             r = s.send(r.prepare())
104             assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
105
106     def test_HTTP_200_OK_GET_ALTERNATIVE(self):
107         r = requests.Request('GET', httpbin('get'))
108         s = requests.Session()
109         s.proxies = getproxies()
110
111         r = s.send(r.prepare())
112
113         assert r.status_code == 200
114
115     def test_HTTP_302_ALLOW_REDIRECT_GET(self):
116         r = requests.get(httpbin('redirect', '1'))
117         assert r.status_code == 200
118
119     # def test_HTTP_302_ALLOW_REDIRECT_POST(self):
120     #     r = requests.post(httpbin('status', '302'), data={'some': 'data'})
121     #     self.assertEqual(r.status_code, 200)
122
123     def test_HTTP_200_OK_GET_WITH_PARAMS(self):
124         heads = {'User-agent': 'Mozilla/5.0'}
125
126         r = requests.get(httpbin('user-agent'), headers=heads)
127
128         assert heads['User-agent'] in r.text
129         assert r.status_code == 200
130
131     def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
132         heads = {'User-agent': 'Mozilla/5.0'}
133
134         r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
135         assert r.status_code == 200
136
137     def test_set_cookie_on_301(self):
138         s = requests.session()
139         url = httpbin('cookies/set?foo=bar')
140         r = s.get(url)
141         assert s.cookies['foo'] == 'bar'
142
143     def test_cookie_sent_on_redirect(self):
144         s = requests.session()
145         s.get(httpbin('cookies/set?foo=bar'))
146         r = s.get(httpbin('redirect/1'))  # redirects to httpbin('get')
147         assert 'Cookie' in r.json()['headers']
148
149     def test_cookie_removed_on_expire(self):
150         s = requests.session()
151         s.get(httpbin('cookies/set?foo=bar'))
152         assert s.cookies['foo'] == 'bar'
153         s.get(
154             httpbin('response-headers'),
155             params={
156                 'Set-Cookie':
157                     'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
158             }
159         )
160         assert 'foo' not in s.cookies
161
162     def test_cookie_quote_wrapped(self):
163         s = requests.session()
164         s.get(httpbin('cookies/set?foo="bar:baz"'))
165         assert s.cookies['foo'] == '"bar:baz"'
166
167     def test_cookie_persists_via_api(self):
168         s = requests.session()
169         r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
170         assert 'foo' in r.request.headers['Cookie']
171         assert 'foo' in r.history[0].request.headers['Cookie']
172
173     def test_request_cookie_overrides_session_cookie(self):
174         s = requests.session()
175         s.cookies['foo'] = 'bar'
176         r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
177         assert r.json()['cookies']['foo'] == 'baz'
178         # Session cookie should not be modified
179         assert s.cookies['foo'] == 'bar'
180
181     def test_request_cookies_not_persisted(self):
182         s = requests.session()
183         s.get(httpbin('cookies'), cookies={'foo': 'baz'})
184         # Sending a request with cookies should not add cookies to the session
185         assert not s.cookies
186
187     def test_generic_cookiejar_works(self):
188         cj = cookielib.CookieJar()
189         cookiejar_from_dict({'foo': 'bar'}, cj)
190         s = requests.session()
191         s.cookies = cj
192         r = s.get(httpbin('cookies'))
193         # Make sure the cookie was sent
194         assert r.json()['cookies']['foo'] == 'bar'
195         # Make sure the session cj is still the custom one
196         assert s.cookies is cj
197     
198     def test_param_cookiejar_works(self):
199         cj = cookielib.CookieJar()
200         cookiejar_from_dict({'foo' : 'bar'}, cj)
201         s = requests.session()
202         r = s.get(httpbin('cookies'), cookies=cj)
203         # Make sure the cookie was sent
204         assert r.json()['cookies']['foo'] == 'bar'
205
206     def test_requests_in_history_are_not_overridden(self):
207         resp = requests.get(httpbin('redirect/3'))
208         urls = [r.url for r in resp.history]
209         req_urls = [r.request.url for r in resp.history]
210         assert urls == req_urls
211
212     def test_user_agent_transfers(self):
213
214         heads = {
215             'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
216         }
217
218         r = requests.get(httpbin('user-agent'), headers=heads)
219         assert heads['User-agent'] in r.text
220
221         heads = {
222             'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
223         }
224
225         r = requests.get(httpbin('user-agent'), headers=heads)
226         assert heads['user-agent'] in r.text
227
228     def test_HTTP_200_OK_HEAD(self):
229         r = requests.head(httpbin('get'))
230         assert r.status_code == 200
231
232     def test_HTTP_200_OK_PUT(self):
233         r = requests.put(httpbin('put'))
234         assert r.status_code == 200
235
236     def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
237         auth = ('user', 'pass')
238         url = httpbin('basic-auth', 'user', 'pass')
239
240         r = requests.get(url, auth=auth)
241         assert r.status_code == 200
242
243         r = requests.get(url)
244         assert r.status_code == 401
245
246         s = requests.session()
247         s.auth = auth
248         r = s.get(url)
249         assert r.status_code == 200
250
251     def test_basicauth_with_netrc(self):
252         auth = ('user', 'pass')
253         wrong_auth = ('wronguser', 'wrongpass')
254         url = httpbin('basic-auth', 'user', 'pass')
255
256         def get_netrc_auth_mock(url):
257             return auth
258         requests.sessions.get_netrc_auth = get_netrc_auth_mock
259
260         # Should use netrc and work.
261         r = requests.get(url)
262         assert r.status_code == 200
263
264         # Given auth should override and fail.
265         r = requests.get(url, auth=wrong_auth)
266         assert r.status_code == 401
267
268         s = requests.session()
269
270         # Should use netrc and work.
271         r = s.get(url)
272         assert r.status_code == 200
273
274         # Given auth should override and fail.
275         s.auth = wrong_auth
276         r = s.get(url)
277         assert r.status_code == 401
278
279     def test_DIGEST_HTTP_200_OK_GET(self):
280
281         auth = HTTPDigestAuth('user', 'pass')
282         url = httpbin('digest-auth', 'auth', 'user', 'pass')
283
284         r = requests.get(url, auth=auth)
285         assert r.status_code == 200
286
287         r = requests.get(url)
288         assert r.status_code == 401
289
290         s = requests.session()
291         s.auth = HTTPDigestAuth('user', 'pass')
292         r = s.get(url)
293         assert r.status_code == 200
294
295     def test_DIGEST_AUTH_RETURNS_COOKIE(self):
296         url = httpbin('digest-auth', 'auth', 'user', 'pass')
297         auth = HTTPDigestAuth('user', 'pass')
298         r = requests.get(url)
299         assert r.cookies['fake'] == 'fake_value'
300
301         r = requests.get(url, auth=auth)
302         assert r.status_code == 200
303
304     def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self):
305         url = httpbin('digest-auth', 'auth', 'user', 'pass')
306         auth = HTTPDigestAuth('user', 'pass')
307         s = requests.Session()
308         s.get(url, auth=auth)
309         assert s.cookies['fake'] == 'fake_value'
310
311     def test_DIGEST_STREAM(self):
312
313         auth = HTTPDigestAuth('user', 'pass')
314         url = httpbin('digest-auth', 'auth', 'user', 'pass')
315
316         r = requests.get(url, auth=auth, stream=True)
317         assert r.raw.read() != b''
318
319         r = requests.get(url, auth=auth, stream=False)
320         assert r.raw.read() == b''
321
322     def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
323
324         auth = HTTPDigestAuth('user', 'wrongpass')
325         url = httpbin('digest-auth', 'auth', 'user', 'pass')
326
327         r = requests.get(url, auth=auth)
328         assert r.status_code == 401
329
330         r = requests.get(url)
331         assert r.status_code == 401
332
333         s = requests.session()
334         s.auth = auth
335         r = s.get(url)
336         assert r.status_code == 401
337
338     def test_DIGESTAUTH_QUOTES_QOP_VALUE(self):
339
340         auth = HTTPDigestAuth('user', 'pass')
341         url = httpbin('digest-auth', 'auth', 'user', 'pass')
342
343         r = requests.get(url, auth=auth)
344         assert '"auth"' in r.request.headers['Authorization']
345
346     def test_POSTBIN_GET_POST_FILES(self):
347
348         url = httpbin('post')
349         post1 = requests.post(url).raise_for_status()
350
351         post1 = requests.post(url, data={'some': 'data'})
352         assert post1.status_code == 200
353
354         with open('requirements.txt') as f:
355             post2 = requests.post(url, files={'some': f})
356         assert post2.status_code == 200
357
358         post4 = requests.post(url, data='[{"some": "json"}]')
359         assert post4.status_code == 200
360
361         with pytest.raises(ValueError):
362             requests.post(url, files = ['bad file data'])
363
364     def test_POSTBIN_GET_POST_FILES_WITH_DATA(self):
365
366         url = httpbin('post')
367         post1 = requests.post(url).raise_for_status()
368
369         post1 = requests.post(url, data={'some': 'data'})
370         assert post1.status_code == 200
371
372         with open('requirements.txt') as f:
373             post2 = requests.post(url, data={'some': 'data'}, files={'some': f})
374         assert post2.status_code == 200
375
376         post4 = requests.post(url, data='[{"some": "json"}]')
377         assert post4.status_code == 200
378
379         with pytest.raises(ValueError):
380             requests.post(url, files = ['bad file data'])
381
382     def test_conflicting_post_params(self):
383         url = httpbin('post')
384         with open('requirements.txt') as f:
385             pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})")
386             pytest.raises(ValueError, "requests.post(url, data=u'[{\"some\": \"data\"}]', files={'some': f})")
387
388     def test_request_ok_set(self):
389         r = requests.get(httpbin('status', '404'))
390         assert not r.ok
391
392     def test_status_raising(self):
393         r = requests.get(httpbin('status', '404'))
394         with pytest.raises(requests.exceptions.HTTPError):
395             r.raise_for_status()
396
397         r = requests.get(httpbin('status', '500'))
398         assert not r.ok
399
400     def test_decompress_gzip(self):
401         r = requests.get(httpbin('gzip'))
402         r.content.decode('ascii')
403
404     def test_unicode_get(self):
405         url = httpbin('/get')
406         requests.get(url, params={'foo': 'føø'})
407         requests.get(url, params={'føø': 'føø'})
408         requests.get(url, params={'føø': 'føø'})
409         requests.get(url, params={'foo': 'foo'})
410         requests.get(httpbin('ø'), params={'foo': 'foo'})
411
412     def test_unicode_header_name(self):
413         requests.put(httpbin('put'), headers={str('Content-Type'): 'application/octet-stream'}, data='\xff') # compat.str is unicode.
414
415     def test_urlencoded_get_query_multivalued_param(self):
416
417         r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
418         assert r.status_code == 200
419         assert r.url == httpbin('get?test=foo&test=baz')
420
421     def test_different_encodings_dont_break_post(self):
422         r = requests.post(httpbin('post'),
423                           data={'stuff': json.dumps({'a': 123})},
424                           params={'blah': 'asdf1234'},
425                           files={'file': ('test_requests.py', open(__file__, 'rb'))})
426         assert r.status_code == 200
427
428     def test_unicode_multipart_post(self):
429         r = requests.post(httpbin('post'),
430                           data={'stuff': u'ëlïxr'},
431                           files={'file': ('test_requests.py', open(__file__, 'rb'))})
432         assert r.status_code == 200
433
434         r = requests.post(httpbin('post'),
435                           data={'stuff': u'ëlïxr'.encode('utf-8')},
436                           files={'file': ('test_requests.py', open(__file__, 'rb'))})
437         assert r.status_code == 200
438
439         r = requests.post(httpbin('post'),
440                           data={'stuff': 'elixr'},
441                           files={'file': ('test_requests.py', open(__file__, 'rb'))})
442         assert r.status_code == 200
443
444         r = requests.post(httpbin('post'),
445                           data={'stuff': 'elixr'.encode('utf-8')},
446                           files={'file': ('test_requests.py', open(__file__, 'rb'))})
447         assert r.status_code == 200
448
449     def test_unicode_multipart_post_fieldnames(self):
450         filename = os.path.splitext(__file__)[0] + '.py'
451         r = requests.Request(method='POST',
452                              url=httpbin('post'),
453                              data={'stuff'.encode('utf-8'): 'elixr'},
454                              files={'file': ('test_requests.py',
455                                              open(filename, 'rb'))})
456         prep = r.prepare()
457         assert b'name="stuff"' in prep.body
458         assert b'name="b\'stuff\'"' not in prep.body
459
460     def test_unicode_method_name(self):
461         files = {'file': open('test_requests.py', 'rb')}
462         r = requests.request(method=u'POST', url=httpbin('post'), files=files)
463         assert r.status_code == 200
464
465     def test_custom_content_type(self):
466         r = requests.post(httpbin('post'),
467                           data={'stuff': json.dumps({'a': 123})},
468                           files={'file1': ('test_requests.py', open(__file__, 'rb')),
469                                  'file2': ('test_requests', open(__file__, 'rb'),
470                                            'text/py-content-type')})
471         assert r.status_code == 200
472         assert b"text/py-content-type" in r.request.body
473
474     def test_hook_receives_request_arguments(self):
475         def hook(resp, **kwargs):
476             assert resp is not None
477             assert kwargs != {}
478
479         requests.Request('GET', HTTPBIN, hooks={'response': hook})
480
481     def test_session_hooks_are_used_with_no_request_hooks(self):
482         hook = lambda x, *args, **kwargs: x
483         s = requests.Session()
484         s.hooks['response'].append(hook)
485         r = requests.Request('GET', HTTPBIN)
486         prep = s.prepare_request(r)
487         assert prep.hooks['response'] != []
488         assert prep.hooks['response'] == [hook]
489
490     def test_session_hooks_are_overriden_by_request_hooks(self):
491         hook1 = lambda x, *args, **kwargs: x
492         hook2 = lambda x, *args, **kwargs: x
493         assert hook1 is not hook2
494         s = requests.Session()
495         s.hooks['response'].append(hook2)
496         r = requests.Request('GET', HTTPBIN, hooks={'response': [hook1]})
497         prep = s.prepare_request(r)
498         assert prep.hooks['response'] == [hook1]
499
500     def test_prepared_request_hook(self):
501         def hook(resp, **kwargs):
502             resp.hook_working = True
503             return resp
504
505         req = requests.Request('GET', HTTPBIN, hooks={'response': hook})
506         prep = req.prepare()
507
508         s = requests.Session()
509         s.proxies = getproxies()
510         resp = s.send(prep)
511
512         assert hasattr(resp, 'hook_working')
513
514     def test_prepared_from_session(self):
515         class DummyAuth(requests.auth.AuthBase):
516             def __call__(self, r):
517                 r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
518                 return r
519
520         req = requests.Request('GET', httpbin('headers'))
521         assert not req.auth
522
523         s = requests.Session()
524         s.auth = DummyAuth()
525
526         prep = s.prepare_request(req)
527         resp = s.send(prep)
528
529         assert resp.json()['headers']['Dummy-Auth-Test'] == 'dummy-auth-test-ok'
530
531     def test_links(self):
532         r = requests.Response()
533         r.headers = {
534             'cache-control': 'public, max-age=60, s-maxage=60',
535             'connection': 'keep-alive',
536             'content-encoding': 'gzip',
537             'content-type': 'application/json; charset=utf-8',
538             'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
539             'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
540             'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
541             'link': ('<https://api.github.com/users/kennethreitz/repos?'
542                      'page=2&per_page=10>; rel="next", <https://api.github.'
543                      'com/users/kennethreitz/repos?page=7&per_page=10>; '
544                      ' rel="last"'),
545             'server': 'GitHub.com',
546             'status': '200 OK',
547             'vary': 'Accept',
548             'x-content-type-options': 'nosniff',
549             'x-github-media-type': 'github.beta',
550             'x-ratelimit-limit': '60',
551             'x-ratelimit-remaining': '57'
552         }
553         assert r.links['next']['rel'] == 'next'
554
555     def test_cookie_parameters(self):
556         key = 'some_cookie'
557         value = 'some_value'
558         secure = True
559         domain = 'test.com'
560         rest = {'HttpOnly': True}
561
562         jar = requests.cookies.RequestsCookieJar()
563         jar.set(key, value, secure=secure, domain=domain, rest=rest)
564
565         assert len(jar) == 1
566         assert 'some_cookie' in jar
567
568         cookie = list(jar)[0]
569         assert cookie.secure == secure
570         assert cookie.domain == domain
571         assert cookie._rest['HttpOnly'] == rest['HttpOnly']
572
573     def test_time_elapsed_blank(self):
574         r = requests.get(httpbin('get'))
575         td = r.elapsed
576         total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
577                          * 10**6) / 10**6)
578         assert total_seconds > 0.0
579
580     def test_response_is_iterable(self):
581         r = requests.Response()
582         io = StringIO.StringIO('abc')
583         read_ = io.read
584
585         def read_mock(amt, decode_content=None):
586             return read_(amt)
587         setattr(io, 'read', read_mock)
588         r.raw = io
589         assert next(iter(r))
590         io.close()
591
592     def test_request_and_response_are_pickleable(self):
593         r = requests.get(httpbin('get'))
594
595         # verify we can pickle the original request
596         assert pickle.loads(pickle.dumps(r.request))
597
598         # verify we can pickle the response and that we have access to
599         # the original request.
600         pr = pickle.loads(pickle.dumps(r))
601         assert r.request.url == pr.request.url
602         assert r.request.headers == pr.request.headers
603
604     def test_get_auth_from_url(self):
605         url = 'http://user:pass@complex.url.com/path?query=yes'
606         assert ('user', 'pass') == requests.utils.get_auth_from_url(url)
607
608     def test_get_auth_from_url_encoded_spaces(self):
609         url = 'http://user:pass%20pass@complex.url.com/path?query=yes'
610         assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
611
612     def test_get_auth_from_url_not_encoded_spaces(self):
613         url = 'http://user:pass pass@complex.url.com/path?query=yes'
614         assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
615
616     def test_get_auth_from_url_percent_chars(self):
617         url = 'http://user%user:pass@complex.url.com/path?query=yes'
618         assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url)
619
620     def test_cannot_send_unprepared_requests(self):
621         r = requests.Request(url=HTTPBIN)
622         with pytest.raises(ValueError):
623             requests.Session().send(r)
624
625     def test_http_error(self):
626         error = requests.exceptions.HTTPError()
627         assert not error.response
628         response = requests.Response()
629         error = requests.exceptions.HTTPError(response=response)
630         assert error.response == response
631         error = requests.exceptions.HTTPError('message', response=response)
632         assert str(error) == 'message'
633         assert error.response == response
634
635     def test_session_pickling(self):
636         r = requests.Request('GET', httpbin('get'))
637         s = requests.Session()
638
639         s = pickle.loads(pickle.dumps(s))
640         s.proxies = getproxies()
641
642         r = s.send(r.prepare())
643         assert r.status_code == 200
644
645     def test_fixes_1329(self):
646         """
647         Ensure that header updates are done case-insensitively.
648         """
649         s = requests.Session()
650         s.headers.update({'ACCEPT': 'BOGUS'})
651         s.headers.update({'accept': 'application/json'})
652         r = s.get(httpbin('get'))
653         headers = r.request.headers
654         assert headers['accept'] == 'application/json'
655         assert headers['Accept'] == 'application/json'
656         assert headers['ACCEPT'] == 'application/json'
657
658     def test_uppercase_scheme_redirect(self):
659         parts = urlparse(httpbin('html'))
660         url = "HTTP://" + parts.netloc + parts.path
661         r = requests.get(httpbin('redirect-to'), params={'url': url})
662         assert r.status_code == 200
663         assert r.url.lower() == url.lower()
664
665     def test_transport_adapter_ordering(self):
666         s = requests.Session()
667         order = ['https://', 'http://']
668         assert order == list(s.adapters)
669         s.mount('http://git', HTTPAdapter())
670         s.mount('http://github', HTTPAdapter())
671         s.mount('http://github.com', HTTPAdapter())
672         s.mount('http://github.com/about/', HTTPAdapter())
673         order = [
674             'http://github.com/about/',
675             'http://github.com',
676             'http://github',
677             'http://git',
678             'https://',
679             'http://',
680         ]
681         assert order == list(s.adapters)
682         s.mount('http://gittip', HTTPAdapter())
683         s.mount('http://gittip.com', HTTPAdapter())
684         s.mount('http://gittip.com/about/', HTTPAdapter())
685         order = [
686             'http://github.com/about/',
687             'http://gittip.com/about/',
688             'http://github.com',
689             'http://gittip.com',
690             'http://github',
691             'http://gittip',
692             'http://git',
693             'https://',
694             'http://',
695         ]
696         assert order == list(s.adapters)
697         s2 = requests.Session()
698         s2.adapters = {'http://': HTTPAdapter()}
699         s2.mount('https://', HTTPAdapter())
700         assert 'http://' in s2.adapters
701         assert 'https://' in s2.adapters
702
703     def test_header_remove_is_case_insensitive(self):
704         # From issue #1321
705         s = requests.Session()
706         s.headers['foo'] = 'bar'
707         r = s.get(httpbin('get'), headers={'FOO': None})
708         assert 'foo' not in r.request.headers
709
710     def test_params_are_merged_case_sensitive(self):
711         s = requests.Session()
712         s.params['foo'] = 'bar'
713         r = s.get(httpbin('get'), params={'FOO': 'bar'})
714         assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
715
716
717     def test_long_authinfo_in_url(self):
718         url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format(
719             'E8A3BE87-9E3F-4620-8858-95478E385B5B',
720             'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
721             'exactly-------------sixty-----------three------------characters',
722         )
723         r = requests.Request('GET', url).prepare()
724         assert r.url == url
725
726     def test_header_keys_are_native(self):
727         headers = {u'unicode': 'blah', 'byte'.encode('ascii'): 'blah'}
728         r = requests.Request('GET', httpbin('get'), headers=headers)
729         p = r.prepare()
730
731         # This is testing that they are builtin strings. A bit weird, but there
732         # we go.
733         assert 'unicode' in p.headers.keys()
734         assert 'byte' in p.headers.keys()
735
736     def test_can_send_nonstring_objects_with_files(self):
737         data = {'a': 0.0}
738         files = {'b': 'foo'}
739         r = requests.Request('POST', httpbin('post'), data=data, files=files)
740         p = r.prepare()
741
742         assert 'multipart/form-data' in p.headers['Content-Type']
743
744     def test_autoset_header_values_are_native(self):
745         data = 'this is a string'
746         length = '16'
747         req = requests.Request('POST', httpbin('post'), data=data)
748         p = req.prepare()
749
750         assert p.headers['Content-Length'] == length
751
752     def test_oddball_schemes_dont_check_URLs(self):
753         test_urls = (
754             'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==',
755             'file:///etc/passwd',
756             'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
757         )
758         for test_url in test_urls:
759             req = requests.Request('GET', test_url)
760             preq = req.prepare()
761             assert test_url == preq.url
762
763     def test_morsel_to_cookie_expires_is_converted(self):
764         morsel = Morsel()
765
766         # Test case where we convert from string time
767         morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
768         cookie = morsel_to_cookie(morsel)
769         self.assertEquals(cookie.expires, 18001)
770
771         # Test case where no conversion is required
772         morsel['expires'] = 100
773         cookie = morsel_to_cookie(morsel)
774         self.assertEquals(cookie.expires, 100)
775
776         # Test case where an invalid string is input
777         morsel['expires'] = 'woops'
778         with self.assertRaises(ValueError):
779             cookie = morsel_to_cookie(morsel)
780
781
782 class TestContentEncodingDetection(unittest.TestCase):
783
784     def test_none(self):
785         encodings = requests.utils.get_encodings_from_content('')
786         assert not len(encodings)
787
788     def test_html_charset(self):
789         """HTML5 meta charset attribute"""
790         content = '<meta charset="UTF-8">'
791         encodings = requests.utils.get_encodings_from_content(content)
792         assert len(encodings) == 1
793         assert encodings[0] == 'UTF-8'
794
795     def test_html4_pragma(self):
796         """HTML4 pragma directive"""
797         content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">'
798         encodings = requests.utils.get_encodings_from_content(content)
799         assert len(encodings) == 1
800         assert encodings[0] == 'UTF-8'
801
802     def test_xhtml_pragma(self):
803         """XHTML 1.x served with text/html MIME type"""
804         content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />'
805         encodings = requests.utils.get_encodings_from_content(content)
806         assert len(encodings) == 1
807         assert encodings[0] == 'UTF-8'
808
809     def test_xml(self):
810         """XHTML 1.x served as XML"""
811         content = '<?xml version="1.0" encoding="UTF-8"?>'
812         encodings = requests.utils.get_encodings_from_content(content)
813         assert len(encodings) == 1
814         assert encodings[0] == 'UTF-8'
815
816     def test_precedence(self):
817         content = '''
818         <?xml version="1.0" encoding="XML"?>
819         <meta charset="HTML5">
820         <meta http-equiv="Content-type" content="text/html;charset=HTML4" />
821         '''.strip()
822         encodings = requests.utils.get_encodings_from_content(content)
823         assert encodings == ['HTML5', 'HTML4', 'XML']
824
825
826 class TestCaseInsensitiveDict(unittest.TestCase):
827
828     def test_mapping_init(self):
829         cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
830         assert len(cid) == 2
831         assert 'foo' in cid
832         assert 'bar' in cid
833
834     def test_iterable_init(self):
835         cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')])
836         assert len(cid) == 2
837         assert 'foo' in cid
838         assert 'bar' in cid
839
840     def test_kwargs_init(self):
841         cid = CaseInsensitiveDict(FOO='foo', BAr='bar')
842         assert len(cid) == 2
843         assert 'foo' in cid
844         assert 'bar' in cid
845
846     def test_docstring_example(self):
847         cid = CaseInsensitiveDict()
848         cid['Accept'] = 'application/json'
849         assert cid['aCCEPT'] == 'application/json'
850         assert list(cid) == ['Accept']
851
852     def test_len(self):
853         cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
854         cid['A'] = 'a'
855         assert len(cid) == 2
856
857     def test_getitem(self):
858         cid = CaseInsensitiveDict({'Spam': 'blueval'})
859         assert cid['spam'] == 'blueval'
860         assert cid['SPAM'] == 'blueval'
861
862     def test_fixes_649(self):
863         """__setitem__ should behave case-insensitively."""
864         cid = CaseInsensitiveDict()
865         cid['spam'] = 'oneval'
866         cid['Spam'] = 'twoval'
867         cid['sPAM'] = 'redval'
868         cid['SPAM'] = 'blueval'
869         assert cid['spam'] == 'blueval'
870         assert cid['SPAM'] == 'blueval'
871         assert list(cid.keys()) == ['SPAM']
872
873     def test_delitem(self):
874         cid = CaseInsensitiveDict()
875         cid['Spam'] = 'someval'
876         del cid['sPam']
877         assert 'spam' not in cid
878         assert len(cid) == 0
879
880     def test_contains(self):
881         cid = CaseInsensitiveDict()
882         cid['Spam'] = 'someval'
883         assert 'Spam' in cid
884         assert 'spam' in cid
885         assert 'SPAM' in cid
886         assert 'sPam' in cid
887         assert 'notspam' not in cid
888
889     def test_get(self):
890         cid = CaseInsensitiveDict()
891         cid['spam'] = 'oneval'
892         cid['SPAM'] = 'blueval'
893         assert cid.get('spam') == 'blueval'
894         assert cid.get('SPAM') == 'blueval'
895         assert cid.get('sPam') == 'blueval'
896         assert cid.get('notspam', 'default') == 'default'
897
898     def test_update(self):
899         cid = CaseInsensitiveDict()
900         cid['spam'] = 'blueval'
901         cid.update({'sPam': 'notblueval'})
902         assert cid['spam'] == 'notblueval'
903         cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
904         cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
905         assert len(cid) == 2
906         assert cid['foo'] == 'anotherfoo'
907         assert cid['bar'] == 'anotherbar'
908
909     def test_update_retains_unchanged(self):
910         cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
911         cid.update({'foo': 'newfoo'})
912         assert cid['bar'] == 'bar'
913
914     def test_iter(self):
915         cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
916         keys = frozenset(['Spam', 'Eggs'])
917         assert frozenset(iter(cid)) == keys
918
919     def test_equality(self):
920         cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
921         othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
922         assert cid == othercid
923         del othercid['spam']
924         assert cid != othercid
925         assert cid == {'spam': 'blueval', 'eggs': 'redval'}
926
927     def test_setdefault(self):
928         cid = CaseInsensitiveDict({'Spam': 'blueval'})
929         assert cid.setdefault('spam', 'notblueval') == 'blueval'
930         assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
931
932     def test_lower_items(self):
933         cid = CaseInsensitiveDict({
934             'Accept': 'application/json',
935             'user-Agent': 'requests',
936         })
937         keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
938         lowerkeyset = frozenset(['accept', 'user-agent'])
939         assert keyset == lowerkeyset
940
941     def test_preserve_key_case(self):
942         cid = CaseInsensitiveDict({
943             'Accept': 'application/json',
944             'user-Agent': 'requests',
945         })
946         keyset = frozenset(['Accept', 'user-Agent'])
947         assert frozenset(i[0] for i in cid.items()) == keyset
948         assert frozenset(cid.keys()) == keyset
949         assert frozenset(cid) == keyset
950
951     def test_preserve_last_key_case(self):
952         cid = CaseInsensitiveDict({
953             'Accept': 'application/json',
954             'user-Agent': 'requests',
955         })
956         cid.update({'ACCEPT': 'application/json'})
957         cid['USER-AGENT'] = 'requests'
958         keyset = frozenset(['ACCEPT', 'USER-AGENT'])
959         assert frozenset(i[0] for i in cid.items()) == keyset
960         assert frozenset(cid.keys()) == keyset
961         assert frozenset(cid) == keyset
962
963
964 class UtilsTestCase(unittest.TestCase):
965
966     def test_super_len_io_streams(self):
967         """ Ensures that we properly deal with different kinds of IO streams. """
968         # uses StringIO or io.StringIO (see import above)
969         from io import BytesIO
970         from requests.utils import super_len
971
972         assert super_len(StringIO.StringIO()) == 0
973         assert super_len(StringIO.StringIO('with so much drama in the LBC')) == 29
974
975         assert super_len(BytesIO()) == 0
976         assert super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
977
978         try:
979             import cStringIO
980         except ImportError:
981             pass
982         else:
983             assert super_len(cStringIO.StringIO('but some how, some way...')) == 25
984
985     def test_get_environ_proxies_ip_ranges(self):
986         """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
987         from requests.utils import get_environ_proxies
988         os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
989         assert get_environ_proxies('http://192.168.0.1:5000/') == {}
990         assert get_environ_proxies('http://192.168.0.1/') == {}
991         assert get_environ_proxies('http://172.16.1.1/') == {}
992         assert get_environ_proxies('http://172.16.1.1:5000/') == {}
993         assert get_environ_proxies('http://192.168.1.1:5000/') != {}
994         assert get_environ_proxies('http://192.168.1.1/') != {}
995
996     def test_get_environ_proxies(self):
997         """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
998         from requests.utils import get_environ_proxies
999         os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
1000         assert get_environ_proxies('http://localhost.localdomain:5000/v1.0/') == {}
1001         assert get_environ_proxies('http://www.requests.com/') != {}
1002
1003     def test_is_ipv4_address(self):
1004         from requests.utils import is_ipv4_address
1005         assert is_ipv4_address('8.8.8.8')
1006         assert not is_ipv4_address('8.8.8.8.8')
1007         assert not is_ipv4_address('localhost.localdomain')
1008
1009     def test_is_valid_cidr(self):
1010         from requests.utils import is_valid_cidr
1011         assert not is_valid_cidr('8.8.8.8')
1012         assert is_valid_cidr('192.168.1.0/24')
1013
1014     def test_dotted_netmask(self):
1015         from requests.utils import dotted_netmask
1016         assert dotted_netmask(8) == '255.0.0.0'
1017         assert dotted_netmask(24) == '255.255.255.0'
1018         assert dotted_netmask(25) == '255.255.255.128'
1019
1020     def test_address_in_network(self):
1021         from requests.utils import address_in_network
1022         assert address_in_network('192.168.1.1', '192.168.1.0/24')
1023         assert not address_in_network('172.16.0.1', '192.168.1.0/24')
1024
1025
1026 if __name__ == '__main__':
1027     unittest.main()