1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
5 """Classes and functions for generic network communication over HTTP."""
8 import cStringIO as StringIO
24 from third_party import requests
25 from third_party.requests import adapters
26 from third_party.requests import structures
28 from utils import oauth
29 from utils import tools
32 # TODO(vadimsh): Remove this once we don't have to support python 2.6 anymore.
33 def monkey_patch_httplib():
34 """Patch httplib.HTTPConnection to have '_tunnel_host' attribute.
36 'requests' library (>= v2) accesses 'HTTPConnection._tunnel_host' attribute
37 added only in python 2.6.3. This function patches HTTPConnection to have it
38 on python 2.6.2 as well.
40 conn = httplib.HTTPConnection('example.com')
41 if not hasattr(conn, '_tunnel_host'):
42 httplib.HTTPConnection._tunnel_host = None
43 monkey_patch_httplib()
46 # Default maximum number of attempts to trying opening a url before aborting.
47 URL_OPEN_MAX_ATTEMPTS = 30
49 # Default timeout when retrying.
50 URL_OPEN_TIMEOUT = 6*60.
52 # Default timeout when reading from open HTTP connection.
55 # Content type for url encoded POST body.
56 URL_ENCODED_FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded'
57 # Content type for JSON body.
58 JSON_CONTENT_TYPE = 'application/json; charset=UTF-8'
59 # Default content type for POST body.
60 DEFAULT_CONTENT_TYPE = URL_ENCODED_FORM_CONTENT_TYPE
62 # Content type -> function that encodes a request body.
64 URL_ENCODED_FORM_CONTENT_TYPE:
67 lambda x: json.dumps(x, sort_keys=True, separators=(',', ':')),
71 # Google Storage URL regular expression.
72 GS_STORAGE_HOST_URL_RE = re.compile(r'https://.*\.storage\.googleapis\.com')
74 # All possible authentication methods with corresponding types of method config
75 # object (or None if method is not configurable). See configure_auth.
76 # Order is important: it's visible in commands --help output.
78 ('oauth', oauth.OAuthConfig),
83 # Global (for now) map: server URL (http://example.com) -> HttpService instance.
84 # Used by get_http_service to cache HttpService instances.
86 _http_services_lock = threading.Lock()
88 # This lock ensures that user won't be confused with multiple concurrent
90 _auth_lock = threading.Lock()
92 # Set in 'configure_auth'. If configure_auth is not called before first request,
93 # will be set to defaults generated by 'get_default_auth_config'.
95 _auth_method_config = None
98 class NetError(IOError):
99 """Generic network related error."""
101 def __init__(self, inner_exc=None):
102 super(NetError, self).__init__(str(inner_exc or self.__doc__))
103 self.inner_exc = inner_exc
105 def format(self, verbose=False):
106 """Human readable description with detailed information about the error."""
107 out = [str(self.inner_exc)]
111 if isinstance(self.inner_exc, requests.HTTPError):
112 headers = self.inner_exc.response.headers.items()
113 body = self.inner_exc.response.content
115 out.append('----------')
117 for header, value in headers:
118 if not header.startswith('x-'):
119 out.append('%s: %s' % (header.capitalize(), value))
121 out.append(body or '<empty body>')
122 out.append('----------')
123 return '\n'.join(out)
126 class TimeoutError(NetError):
127 """Timeout while reading HTTP response."""
130 class ConnectionError(NetError):
131 """Failed to connect to the server."""
134 class HttpError(NetError):
135 """Server returned HTTP error code."""
137 def __init__(self, code, inner_exc=None):
138 super(HttpError, self).__init__(inner_exc)
142 def url_open(url, **kwargs): # pylint: disable=W0621
143 """Attempts to open the given url multiple times.
145 |data| can be either:
146 - None for a GET request
147 - str for pre-encoded data
148 - list for data to be encoded
149 - dict for data to be encoded
151 See HttpService.request for a full list of arguments.
153 Returns HttpResponse object, where the response may be read from, or None
154 if it was unable to connect.
156 urlhost, urlpath = split_server_request_url(url)
157 service = get_http_service(urlhost)
158 return service.request(urlpath, **kwargs)
161 def url_read(url, **kwargs):
162 """Attempts to open the given url multiple times and read all data from it.
164 Accepts same arguments as url_open function.
166 Returns all data read or None if it was unable to connect or read the data.
168 kwargs['stream'] = False
169 response = url_open(url, **kwargs)
173 return response.read()
178 def url_read_json(url, **kwargs):
179 """Attempts to open the given url multiple times and read all data from it.
181 Accepts same arguments as url_open function.
183 Returns all data read or None if it was unable to connect or read the data.
185 urlhost, urlpath = split_server_request_url(url)
186 service = get_http_service(urlhost)
188 return service.json_request(urlpath, **kwargs)
193 def url_retrieve(filepath, url, **kwargs):
194 """Downloads an URL to a file. Returns True on success."""
195 response = url_open(url, **kwargs)
199 with open(filepath, 'wb') as f:
201 buf = response.read(65536)
205 except (IOError, OSError, TimeoutError):
213 def split_server_request_url(url):
214 """Splits the url into scheme+netloc and path+params+query+fragment."""
215 url_parts = list(urlparse.urlparse(url))
216 urlhost = '%s://%s' % (url_parts[0], url_parts[1])
217 urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
218 return urlhost, urlpath
221 def get_http_service(urlhost, allow_cached=True):
222 """Returns existing or creates new instance of HttpService that can send
223 requests to given base urlhost.
228 engine=RequestsLibEngine(),
229 authenticator=create_authenticator(urlhost))
231 # Ensure consistency in url naming.
232 urlhost = str(urlhost).lower().rstrip('/')
236 with _http_services_lock:
237 service = _http_services.get(urlhost)
239 service = new_service()
240 _http_services[urlhost] = service
244 def get_default_auth_config():
245 """Returns auth configuration used by default if configure_auth is not called.
247 If running in a headless mode on bots, will use 'bot' auth, otherwise
248 'oauth' with default oauth config.
250 Returns pair (auth method name, auth method config).
252 if tools.is_headless():
255 return 'oauth', oauth.make_oauth_config()
258 def configure_auth(method, config=None):
259 """Defines what authentication methods to use.
261 Possible authentication methods are:
262 'bot' - use HMAC authentication based on a secret key.
263 'oauth' - use oauth-based authentication.
264 'none' - do not use authentication.
267 method: what method to use.
268 config: object that holds configuration for authentication method.
269 Concrete type depends on a method used (see AUTH_METHODS for expected
270 type). Passed to corresponding authenticator instance.
273 global _auth_method_config
274 assert method in dict(AUTH_METHODS), method
275 config_type = dict(AUTH_METHODS)[method]
276 if config_type and not isinstance(config, config_type):
278 'Expecting \'%s\' auth config to be of type %s. Got %s instead.' %
279 (method, config_type, type(config)))
280 elif not config_type and config is not None:
281 raise TypeError('Auth method \'%s\' is not configurable.' % method)
283 _auth_method = method
284 _auth_method_config = config
287 def get_auth_method():
288 """Returns authentication method used by default.
290 Set with 'configure_auth'. See 'configure_auth' doc string for existing
296 def create_authenticator(urlhost):
297 """Makes Authenticator instance used by HttpService to access |urlhost|."""
298 # We use signed URL for Google Storage, no need for special authentication.
299 if GS_STORAGE_HOST_URL_RE.match(urlhost):
302 # Lazy initialize auth config with defaults.
304 default_method, default_config = get_default_auth_config()
305 configure_auth(default_method, default_config)
307 # Use configuration set with 'configure_auth'.
309 if _auth_method == 'bot':
310 # TODO(vadimsh): Implement it. Use IP whitelist (that doesn't require
311 # any authenticator instance) for now.
313 elif _auth_method == 'oauth':
314 return OAuthAuthenticator(urlhost, _auth_method_config)
315 elif _auth_method == 'none':
317 raise AssertionError('Invalid auth method: %s' % _auth_method)
320 def get_case_insensitive_dict(original):
321 """Given a dict with string keys returns new CaseInsensitiveDict.
323 Raises ValueError if there are duplicate keys.
325 normalized = structures.CaseInsensitiveDict(original or {})
326 if len(normalized) != len(original):
327 raise ValueError('Duplicate keys in: %s' % repr(original))
331 class HttpService(object):
332 """Base class for a class that provides an API to HTTP based service:
333 - Provides 'request' method.
334 - Supports automatic request retries.
338 def __init__(self, urlhost, engine, authenticator=None):
339 self.urlhost = urlhost
341 self.authenticator = authenticator
344 def is_transient_http_error(code, retry_404, retry_50x):
345 """Returns True if given HTTP response code is a transient error."""
346 # Google Storage can return this and it should be retried.
349 # Retry 404 only if allowed by the caller.
352 # All other 4** errors are fatal.
355 # Retry >= 500 error only if allowed by the caller.
359 def encode_request_body(body, content_type):
360 """Returns request body encoded according to its content type."""
361 # No body or it is already encoded.
362 if body is None or isinstance(body, str):
364 # Any body should have content type set.
365 assert content_type, 'Request has body, but no content type'
366 encoder = CONTENT_ENCODERS.get(content_type)
367 assert encoder, ('Unknown content type %s' % content_type)
370 def login(self, allow_user_interaction):
371 """Runs authentication flow to refresh short lived access token.
373 Authentication flow may need to interact with the user (read username from
374 stdin, open local browser for OAuth2, etc.). If interaction is required and
375 |allow_user_interaction| is False, the login will silently be considered
376 failed (i.e. this function returns False).
378 'request' method always uses non-interactive login, so long-lived
379 authentication tokens (OAuth2 refresh token, etc) have to be set up
380 manually by developer (by calling 'auth.py login' perhaps) prior running
381 any swarming or isolate scripts.
383 # Use global lock to ensure two authentication flows never run in parallel.
385 if self.authenticator:
386 return self.authenticator.login(allow_user_interaction)
390 """Purges access credentials from local cache."""
391 if self.authenticator:
392 self.authenticator.logout()
399 max_attempts=URL_OPEN_MAX_ATTEMPTS,
402 timeout=URL_OPEN_TIMEOUT,
403 read_timeout=URL_READ_TIMEOUT,
407 """Attempts to open the given url multiple times.
409 |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
411 |data| can be either:
412 - None for a GET request
413 - str for pre-encoded data
414 - list for data to be form-encoded
415 - dict for data to be form-encoded
417 - Optionally retries HTTP 404 and 50x.
418 - Retries up to |max_attempts| times. If None or 0, there's no limit in the
420 - Retries up to |timeout| duration in seconds. If None or 0, there's no
421 limit in the time taken to do retries.
422 - If both |max_attempts| and |timeout| are None or 0, this functions retries
425 If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
426 when performing the request. By default it's GET if |data| is None and POST
427 if |data| is not None.
429 If |headers| is given, it should be a dict with HTTP headers to append
430 to request. Caller is responsible for providing headers that make sense.
432 If |read_timeout| is not None will configure underlying socket to
433 raise TimeoutError exception whenever there's no response from the server
434 for more than |read_timeout| seconds. It can happen during any read
435 operation so once you pass non-None |read_timeout| be prepared to handle
436 these exceptions in subsequent reads from the stream.
438 Returns a file-like object, where the response may be read from, or None
439 if it was unable to connect. If |stream| is False will read whole response
440 into memory buffer before returning file-like object that reads from this
443 assert urlpath and urlpath[0] == '/', urlpath
446 assert method in (None, 'POST', 'PUT')
447 method = method or 'POST'
448 content_type = content_type or DEFAULT_CONTENT_TYPE
449 body = self.encode_request_body(data, content_type)
451 assert method in (None, 'GET')
452 method = method or 'GET'
454 assert not content_type, 'Can\'t use content_type on GET'
456 # Prepare request info.
457 parsed = urlparse.urlparse('/' + urlpath.lstrip('/'))
458 resource_url = urlparse.urljoin(self.urlhost, parsed.path)
459 query_params = urlparse.parse_qsl(parsed.query)
462 headers = get_case_insensitive_dict(headers or {})
464 headers['Content-Length'] = len(body)
466 headers['Content-Type'] = content_type
469 auth_attempted = False
471 for attempt in retry_loop(max_attempts, timeout):
472 # Log non-first attempt.
475 'Retrying request %s, attempt %d/%d...',
476 resource_url, attempt.attempt, max_attempts)
479 # Prepare and send a new request.
480 request = HttpRequest(
481 method, resource_url, query_params, body,
482 headers, read_timeout, stream)
483 if self.authenticator:
484 self.authenticator.authorize(request)
485 response = self.engine.perform_request(request)
486 logging.debug('Request %s succeeded', request.get_full_url())
489 except (ConnectionError, TimeoutError) as e:
492 'Unable to open url %s on attempt %d.\n%s',
493 request.get_full_url(), attempt.attempt, e.format())
496 except HttpError as e:
499 # Access denied -> authenticate.
500 if e.code in (401, 403):
502 'Authentication is required for %s on attempt %d.\n%s',
503 request.get_full_url(), attempt.attempt, e.format())
504 # Try to authenticate only once. If it doesn't help, then server does
505 # not support authentication or user doesn't have required access.
506 if not auth_attempted:
507 auth_attempted = True
508 if self.login(allow_user_interaction=False):
509 # Success! Run request again immediately.
510 attempt.skip_sleep = True
512 # Authentication attempt was unsuccessful.
514 'Unable to authenticate to %s (%s). Use auth.py to login: '
515 'python auth.py login --service=%s',
516 self.urlhost, e.format(), self.urlhost)
519 # Hit a error that can not be retried -> stop retry loop.
520 if not self.is_transient_http_error(e.code, retry_404, retry_50x):
521 # This HttpError means we reached the server and there was a problem
522 # with the request, so don't retry.
524 'Able to connect to %s but an exception was thrown.\n%s',
525 request.get_full_url(), e.format(verbose=True))
528 # Retry all other errors.
530 'Server responded with error on %s on attempt %d.\n%s',
531 request.get_full_url(), attempt.attempt, e.format())
535 'Unable to open given url, %s, after %d attempts.\n%s',
536 request.get_full_url(), max_attempts, last_error.format(verbose=True))
544 max_attempts=URL_OPEN_MAX_ATTEMPTS,
545 timeout=URL_OPEN_TIMEOUT,
547 """Sends JSON request to the server and parses JSON response it get back.
550 method: HTTP method to use ('GET', 'POST', ...).
551 urlpath: relative request path (e.g. '/auth/v1/...').
552 data: object to serialize to JSON and sent in the request.
553 max_attempts: how many times to retry 50x errors.
554 timeout: how long to wait for a response (including all retries).
555 headers: dict with additional request headers.
557 If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
558 when performing the request. By default it's GET if |data| is None and POST
559 if |data| is not None.
562 Deserialized JSON response on success, None on error or timeout.
564 response = self.request(
566 content_type=JSON_CONTENT_TYPE if data is not None else None,
569 max_attempts=max_attempts,
578 text = response.read()
584 return json.loads(text)
586 logging.error('Not a JSON response when calling %s: %s', urlpath, text)
590 class HttpRequest(object):
591 """Request to HttpService."""
593 def __init__(self, method, url, params, body, headers, timeout, stream):
595 |method| - HTTP method to use
596 |url| - relative URL to the resource, without query parameters
597 |params| - list of (key, value) pairs to put into GET parameters
598 |body| - encoded body of the request (None or str)
599 |headers| - dict with request headers
600 |timeout| - socket read timeout (None to disable)
601 |stream| - True to stream response from socket
605 self.params = params[:]
607 self.headers = headers.copy()
608 self.timeout = timeout
614 """CookieJar object that will be used for cookies in this request."""
615 if self._cookies is None:
616 self._cookies = cookielib.CookieJar()
619 def get_full_url(self):
620 """Resource URL with url-encoded GET parameters."""
624 return '%s?%s' % (self.url, urllib.urlencode(self.params))
626 def make_fake_response(self, content='', headers=None):
627 """Makes new fake HttpResponse to this request, useful in tests."""
628 return HttpResponse.get_fake_response(content, self.get_full_url(), headers)
631 class HttpResponse(object):
632 """Response from HttpService."""
634 def __init__(self, stream, url, headers):
635 self._stream = stream
637 self._headers = get_case_insensitive_dict(headers)
641 def content_length(self):
642 """Total length to the response or None if not known in advance."""
643 length = self.get_header('Content-Length')
644 return int(length) if length is not None else None
646 def get_header(self, header):
647 """Returns response header (as str) or None if no such header."""
648 return self._headers.get(header)
650 def read(self, size=None):
651 """Reads up to |size| bytes from the stream and returns them.
653 If |size| is None reads all available bytes.
655 Raises TimeoutError on read timeout.
658 # cStringIO has a bug: stream.read(None) is not the same as stream.read().
659 data = self._stream.read() if size is None else self._stream.read(size)
660 self._read += len(data)
662 except (socket.timeout, ssl.SSLError, requests.Timeout) as e:
663 logging.error('Timeout while reading from %s, read %d of %s: %s',
664 self._url, self._read, self.content_length, e)
665 raise TimeoutError(e)
668 def get_fake_response(cls, content, url, headers=None):
669 """Returns HttpResponse with predefined content, useful in tests."""
670 headers = dict(headers or {})
671 headers['Content-Length'] = len(content)
672 return cls(StringIO.StringIO(content), url, headers)
675 class Authenticator(object):
676 """Base class for objects that know how to authenticate into http services."""
678 def authorize(self, request):
679 """Add authentication information to the request."""
681 def login(self, allow_user_interaction):
682 """Run interactive authentication flow."""
683 raise NotImplementedError()
686 """Purges access credentials from local cache."""
689 class RequestsLibEngine(object):
690 """Class that knows how to execute HttpRequests via requests library."""
692 # Preferred number of connections in a connection pool.
693 CONNECTION_POOL_SIZE = 64
694 # If True will not open more than CONNECTION_POOL_SIZE connections.
695 CONNECTION_POOL_BLOCK = False
696 # Maximum number of internal connection retries in a connection pool.
697 CONNECTION_RETRIES = 0
700 super(RequestsLibEngine, self).__init__()
701 self.session = requests.Session()
703 self.session.trust_env = False
704 self.session.verify = tools.get_cacerts_bundle()
705 # Configure connection pools.
706 for protocol in ('https://', 'http://'):
707 self.session.mount(protocol, adapters.HTTPAdapter(
708 pool_connections=self.CONNECTION_POOL_SIZE,
709 pool_maxsize=self.CONNECTION_POOL_SIZE,
710 max_retries=self.CONNECTION_RETRIES,
711 pool_block=self.CONNECTION_POOL_BLOCK))
713 def perform_request(self, request):
714 """Sends a HttpRequest to the server and reads back the response.
716 Returns HttpResponse.
719 ConnectionError - failed to establish connection to the server.
720 TimeoutError - timeout while connecting or reading response.
721 HttpError - server responded with >= 400 error code.
724 response = self.session.request(
725 method=request.method,
727 params=request.params,
729 headers=request.headers,
730 cookies=request.cookies,
731 timeout=request.timeout,
732 stream=request.stream)
733 response.raise_for_status()
735 stream = response.raw
737 stream = StringIO.StringIO(response.content)
738 return HttpResponse(stream, request.get_full_url(), response.headers)
739 except requests.Timeout as e:
740 raise TimeoutError(e)
741 except requests.HTTPError as e:
742 raise HttpError(e.response.status_code, e)
743 except (requests.ConnectionError, socket.timeout, ssl.SSLError) as e:
744 raise ConnectionError(e)
747 class OAuthAuthenticator(Authenticator):
748 """Uses OAuth Authorization header to authenticate requests."""
750 def __init__(self, urlhost, config):
751 super(OAuthAuthenticator, self).__init__()
752 assert isinstance(config, oauth.OAuthConfig)
753 self.urlhost = urlhost
755 self._lock = threading.Lock()
756 self._access_token_known = False
757 self._access_token = None
759 def authorize(self, request):
761 if not self._access_token_known:
762 self._access_token = oauth.load_access_token(self.urlhost, self.config)
763 self._access_token_known = True
764 if self._access_token:
765 request.headers['Authorization'] = 'Bearer %s' % self._access_token
767 def login(self, allow_user_interaction):
769 self._access_token = oauth.create_access_token(
770 self.urlhost, self.config, allow_user_interaction)
771 self._access_token_known = True
772 return self._access_token is not None
776 self._access_token = None
777 self._access_token_known = True
778 oauth.purge_access_token(self.urlhost, self.config)
781 class RetryAttempt(object):
782 """Contains information about current retry attempt.
784 Yielded from retry_loop.
787 def __init__(self, attempt, remaining):
788 """Information about current attempt in retry loop:
789 |attempt| - zero based index of attempt.
790 |remaining| - how much time is left before retry loop finishes retries.
792 self.attempt = attempt
793 self.remaining = remaining
794 self.skip_sleep = False
797 def calculate_sleep_before_retry(attempt, max_duration):
798 """How long to sleep before retrying an attempt in retry_loop."""
799 # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
802 # random.random() returns [0.0, 1.0). Starts with relatively short waiting
803 # time by starting with 1.5/2+1.5^-1 median offset.
804 duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
805 assert duration > 0.1
806 duration = min(MAX_SLEEP, duration)
808 duration = min(max_duration, duration)
812 def sleep_before_retry(attempt, max_duration):
813 """Sleeps for some amount of time when retrying the attempt in retry_loop.
815 To be mocked in tests.
817 time.sleep(calculate_sleep_before_retry(attempt, max_duration))
821 """Used by retry loop to get current time.
823 To be mocked in tests.
828 def retry_loop(max_attempts=None, timeout=None):
829 """Yields whenever new attempt to perform some action is needed.
831 Yields instances of RetryAttempt class that contains information about current
832 attempt. Setting |skip_sleep| attribute of RetryAttempt to True will cause
833 retry loop to run next attempt immediately.
835 start = current_time()
836 for attempt in itertools.count():
838 if max_attempts and attempt == max_attempts:
840 # Retried for too long?
841 remaining = (timeout - (current_time() - start)) if timeout else None
842 if remaining is not None and remaining < 0:
844 # Kick next iteration.
845 attemp_obj = RetryAttempt(attempt, remaining)
847 if attemp_obj.skip_sleep:
849 # Only sleep if we are going to try again.
850 if max_attempts and attempt != max_attempts - 1:
851 remaining = (timeout - (current_time() - start)) if timeout else None
852 if remaining is not None and remaining < 0:
854 sleep_before_retry(attempt, remaining)