1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
5 """Classes and functions for generic network communication over HTTP."""
8 import cStringIO as StringIO
24 from third_party import requests
25 from third_party.requests import adapters
26 from third_party.requests import structures
28 from utils import oauth
29 from utils import tools
32 # TODO(vadimsh): Remove this once we don't have to support python 2.6 anymore.
33 def monkey_patch_httplib():
34 """Patch httplib.HTTPConnection to have '_tunnel_host' attribute.
36 'requests' library (>= v2) accesses 'HTTPConnection._tunnel_host' attribute
37 added only in python 2.6.3. This function patches HTTPConnection to have it
38 on python 2.6.2 as well.
40 conn = httplib.HTTPConnection('example.com')
41 if not hasattr(conn, '_tunnel_host'):
42 httplib.HTTPConnection._tunnel_host = None
43 monkey_patch_httplib()
46 # Default maximum number of attempts to trying opening a url before aborting.
47 URL_OPEN_MAX_ATTEMPTS = 30
49 # Default timeout when retrying.
50 URL_OPEN_TIMEOUT = 6*60.
52 # Default timeout when reading from open HTTP connection.
55 # Content type for url encoded POST body.
56 URL_ENCODED_FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded'
57 # Content type for JSON body.
58 JSON_CONTENT_TYPE = 'application/json; charset=UTF-8'
59 # Default content type for POST body.
60 DEFAULT_CONTENT_TYPE = URL_ENCODED_FORM_CONTENT_TYPE
62 # Content type -> function that encodes a request body.
64 URL_ENCODED_FORM_CONTENT_TYPE:
67 lambda x: json.dumps(x, sort_keys=True, separators=(',', ':')),
71 # Google Storage URL regular expression.
72 GS_STORAGE_HOST_URL_RE = re.compile(r'https://.*\.storage\.googleapis\.com')
74 # All possible authentication methods with corresponding types of method config
75 # object (or None if method is not configurable). See configure_auth.
76 # Order is important: it's visible in commands --help output.
78 ('oauth', oauth.OAuthConfig),
83 # Global (for now) map: server URL (http://example.com) -> HttpService instance.
84 # Used by get_http_service to cache HttpService instances.
86 _http_services_lock = threading.Lock()
88 # This lock ensures that user won't be confused with multiple concurrent
90 _auth_lock = threading.Lock()
92 # Set in 'configure_auth'. If configure_auth is not called before first request,
93 # will be set to defaults generated by 'get_default_auth_config'.
95 _auth_method_config = None
98 class NetError(IOError):
99 """Generic network related error."""
101 def __init__(self, inner_exc=None):
102 super(NetError, self).__init__(str(inner_exc or self.__doc__))
103 self.inner_exc = inner_exc
105 def format(self, verbose=False):
106 """Human readable description with detailed information about the error."""
107 out = [str(self.inner_exc)]
111 if isinstance(self.inner_exc, requests.HTTPError):
112 headers = self.inner_exc.response.headers.items()
113 body = self.inner_exc.response.content
115 out.append('----------')
117 for header, value in headers:
118 if not header.startswith('x-'):
119 out.append('%s: %s' % (header.capitalize(), value))
121 out.append(body or '<empty body>')
122 out.append('----------')
123 return '\n'.join(out)
126 class TimeoutError(NetError):
127 """Timeout while reading HTTP response."""
130 class ConnectionError(NetError):
131 """Failed to connect to the server."""
134 class HttpError(NetError):
135 """Server returned HTTP error code."""
137 def __init__(self, code, inner_exc=None):
138 super(HttpError, self).__init__(inner_exc)
142 def url_open(url, **kwargs): # pylint: disable=W0621
143 """Attempts to open the given url multiple times.
145 |data| can be either:
146 - None for a GET request
147 - str for pre-encoded data
148 - list for data to be encoded
149 - dict for data to be encoded
151 See HttpService.request for a full list of arguments.
153 Returns HttpResponse object, where the response may be read from, or None
154 if it was unable to connect.
156 urlhost, urlpath = split_server_request_url(url)
157 service = get_http_service(urlhost)
158 return service.request(urlpath, **kwargs)
161 def url_read(url, **kwargs):
162 """Attempts to open the given url multiple times and read all data from it.
164 Accepts same arguments as url_open function.
166 Returns all data read or None if it was unable to connect or read the data.
168 kwargs['stream'] = False
169 response = url_open(url, **kwargs)
173 return response.read()
178 def url_read_json(url, **kwargs):
179 """Attempts to open the given url multiple times and read all data from it.
181 Accepts same arguments as url_open function.
183 Returns all data read or None if it was unable to connect or read the data.
185 urlhost, urlpath = split_server_request_url(url)
186 service = get_http_service(urlhost)
188 return service.json_request(urlpath, **kwargs)
193 def url_retrieve(filepath, url, **kwargs):
194 """Downloads an URL to a file. Returns True on success."""
195 response = url_open(url, **kwargs)
199 with open(filepath, 'wb') as f:
201 buf = response.read(65536)
205 except (IOError, OSError, TimeoutError):
213 def split_server_request_url(url):
214 """Splits the url into scheme+netloc and path+params+query+fragment."""
215 url_parts = list(urlparse.urlparse(url))
216 urlhost = '%s://%s' % (url_parts[0], url_parts[1])
217 urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
218 return urlhost, urlpath
221 def get_http_service(urlhost, allow_cached=True):
222 """Returns existing or creates new instance of HttpService that can send
223 requests to given base urlhost.
228 engine=RequestsLibEngine(),
229 authenticator=create_authenticator(urlhost))
231 # Ensure consistency in url naming.
232 urlhost = str(urlhost).lower().rstrip('/')
236 with _http_services_lock:
237 service = _http_services.get(urlhost)
239 service = new_service()
240 _http_services[urlhost] = service
244 def get_default_auth_config():
245 """Returns auth configuration used by default if configure_auth is not called.
247 If running in a headless mode on bots, will use 'bot' auth, otherwise
248 'oauth' with default oauth config.
250 Returns pair (auth method name, auth method config).
252 if tools.is_headless():
255 return 'oauth', oauth.make_oauth_config()
258 def configure_auth(method, config=None):
259 """Defines what authentication methods to use.
261 Possible authentication methods are:
262 'bot' - use HMAC authentication based on a secret key.
263 'oauth' - use oauth-based authentication.
264 'none' - do not use authentication.
267 method: what method to use.
268 config: object that holds configuration for authentication method.
269 Concrete type depends on a method used (see AUTH_METHODS for expected
270 type). Passed to corresponding authenticator instance.
273 global _auth_method_config
274 assert method in dict(AUTH_METHODS), method
275 config_type = dict(AUTH_METHODS)[method]
276 if config_type and not isinstance(config, config_type):
278 'Expecting \'%s\' auth config to be of type %s. Got %s instead.' %
279 (method, config_type, type(config)))
280 elif not config_type and config is not None:
281 raise TypeError('Auth method \'%s\' is not configurable.' % method)
283 _auth_method = method
284 _auth_method_config = config
287 def get_auth_method():
288 """Returns authentication method used by default.
290 Set with 'configure_auth'. See 'configure_auth' doc string for existing
296 def create_authenticator(urlhost):
297 """Makes Authenticator instance used by HttpService to access |urlhost|."""
298 # We use signed URL for Google Storage, no need for special authentication.
299 if GS_STORAGE_HOST_URL_RE.match(urlhost):
302 # Lazy initialize auth config with defaults.
304 default_method, default_config = get_default_auth_config()
305 configure_auth(default_method, default_config)
307 # Use configuration set with 'configure_auth'.
309 if _auth_method == 'bot':
310 # TODO(vadimsh): Implement it. Use IP whitelist (that doesn't require
311 # any authenticator instance) for now.
313 elif _auth_method == 'oauth':
314 return OAuthAuthenticator(urlhost, _auth_method_config)
315 elif _auth_method == 'none':
317 raise AssertionError('Invalid auth method: %s' % _auth_method)
320 def get_case_insensitive_dict(original):
321 """Given a dict with string keys returns new CaseInsensitiveDict.
323 Raises ValueError if there are duplicate keys.
325 normalized = structures.CaseInsensitiveDict(original or {})
326 if len(normalized) != len(original):
327 raise ValueError('Duplicate keys in: %s' % repr(original))
331 class HttpService(object):
332 """Base class for a class that provides an API to HTTP based service:
333 - Provides 'request' method.
334 - Supports automatic request retries.
338 def __init__(self, urlhost, engine, authenticator=None):
339 self.urlhost = urlhost
341 self.authenticator = authenticator
344 def is_transient_http_error(code, retry_404, retry_50x):
345 """Returns True if given HTTP response code is a transient error."""
346 # Google Storage can return this and it should be retried.
349 # Retry 404 only if allowed by the caller.
352 # All other 4** errors are fatal.
355 # Retry >= 500 error only if allowed by the caller.
359 def encode_request_body(body, content_type):
360 """Returns request body encoded according to its content type."""
361 # No body or it is already encoded.
362 if body is None or isinstance(body, str):
364 # Any body should have content type set.
365 assert content_type, 'Request has body, but no content type'
366 encoder = CONTENT_ENCODERS.get(content_type)
367 assert encoder, ('Unknown content type %s' % content_type)
370 def login(self, allow_user_interaction):
371 """Runs authentication flow to refresh short lived access token.
373 Authentication flow may need to interact with the user (read username from
374 stdin, open local browser for OAuth2, etc.). If interaction is required and
375 |allow_user_interaction| is False, the login will silently be considered
376 failed (i.e. this function returns False).
378 'request' method always uses non-interactive login, so long-lived
379 authentication tokens (OAuth2 refresh token, etc) have to be set up
380 manually by developer (by calling 'auth.py login' perhaps) prior running
381 any swarming or isolate scripts.
383 # Use global lock to ensure two authentication flows never run in parallel.
385 if self.authenticator:
386 return self.authenticator.login(allow_user_interaction)
390 """Purges access credentials from local cache."""
391 if self.authenticator:
392 self.authenticator.logout()
399 max_attempts=URL_OPEN_MAX_ATTEMPTS,
402 timeout=URL_OPEN_TIMEOUT,
403 read_timeout=URL_READ_TIMEOUT,
407 """Attempts to open the given url multiple times.
409 |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
411 |data| can be either:
412 - None for a GET request
413 - str for pre-encoded data
414 - list for data to be form-encoded
415 - dict for data to be form-encoded
417 - Optionally retries HTTP 404 and 50x.
418 - Retries up to |max_attempts| times. If None or 0, there's no limit in the
420 - Retries up to |timeout| duration in seconds. If None or 0, there's no
421 limit in the time taken to do retries.
422 - If both |max_attempts| and |timeout| are None or 0, this functions retries
425 If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
426 when performing the request. By default it's GET if |data| is None and POST
427 if |data| is not None.
429 If |headers| is given, it should be a dict with HTTP headers to append
430 to request. Caller is responsible for providing headers that make sense.
432 If |read_timeout| is not None will configure underlying socket to
433 raise TimeoutError exception whenever there's no response from the server
434 for more than |read_timeout| seconds. It can happen during any read
435 operation so once you pass non-None |read_timeout| be prepared to handle
436 these exceptions in subsequent reads from the stream.
438 Returns a file-like object, where the response may be read from, or None
439 if it was unable to connect. If |stream| is False will read whole response
440 into memory buffer before returning file-like object that reads from this
443 assert urlpath and urlpath[0] == '/', urlpath
446 assert method in (None, 'POST', 'PUT')
447 method = method or 'POST'
448 content_type = content_type or DEFAULT_CONTENT_TYPE
449 body = self.encode_request_body(data, content_type)
451 assert method in (None, 'GET')
452 method = method or 'GET'
454 assert not content_type, 'Can\'t use content_type on GET'
456 # Prepare request info.
457 parsed = urlparse.urlparse('/' + urlpath.lstrip('/'))
458 resource_url = urlparse.urljoin(self.urlhost, parsed.path)
459 query_params = urlparse.parse_qsl(parsed.query)
462 headers = get_case_insensitive_dict(headers or {})
464 headers['Content-Length'] = len(body)
466 headers['Content-Type'] = content_type
469 auth_attempted = False
471 for attempt in retry_loop(max_attempts, timeout):
472 # Log non-first attempt.
475 'Retrying request %s, attempt %d/%d...',
476 resource_url, attempt.attempt, max_attempts)
479 # Prepare and send a new request.
480 request = HttpRequest(
481 method, resource_url, query_params, body,
482 headers, read_timeout, stream)
483 if self.authenticator:
484 self.authenticator.authorize(request)
485 response = self.engine.perform_request(request)
486 logging.debug('Request %s succeeded', request.get_full_url())
489 except (ConnectionError, TimeoutError) as e:
492 'Unable to open url %s on attempt %d.\n%s',
493 request.get_full_url(), attempt.attempt, e.format())
496 except HttpError as e:
499 # Access denied -> authenticate.
500 if e.code in (401, 403):
502 'Authentication is required for %s on attempt %d.\n%s',
503 request.get_full_url(), attempt.attempt, e.format())
504 # Try to authenticate only once. If it doesn't help, then server does
505 # not support authentication or user doesn't have required access.
506 if not auth_attempted:
507 auth_attempted = True
508 if self.login(allow_user_interaction=False):
509 # Success! Run request again immediately.
510 attempt.skip_sleep = True
512 # Authentication attempt was unsuccessful.
514 'Unable to authenticate to %s (%s). Use auth.py to login: '
515 'python auth.py login --service=%s',
516 self.urlhost, e.format(), self.urlhost)
519 # Hit a error that can not be retried -> stop retry loop.
520 if not self.is_transient_http_error(e.code, retry_404, retry_50x):
521 # This HttpError means we reached the server and there was a problem
522 # with the request, so don't retry.
524 'Able to connect to %s but an exception was thrown.\n%s',
525 request.get_full_url(), e.format(verbose=True))
528 # Retry all other errors.
530 'Server responded with error on %s on attempt %d.\n%s',
531 request.get_full_url(), attempt.attempt, e.format())
535 'Unable to open given url, %s, after %d attempts.\n%s',
536 request.get_full_url(), max_attempts, last_error.format(verbose=True))
539 def json_request(self, urlpath, data=None, **kwargs):
540 """Sends JSON request to the server and parses JSON response it get back.
543 urlpath: relative request path (e.g. '/auth/v1/...').
544 data: object to serialize to JSON and sent in the request.
546 See self.request() for more details.
549 Deserialized JSON response on success, None on error or timeout.
551 content_type = JSON_CONTENT_TYPE if data is not None else None
552 response = self.request(
553 urlpath, content_type=content_type, data=data, stream=False, **kwargs)
557 text = response.read()
563 return json.loads(text)
565 logging.error('Not a JSON response when calling %s: %s', urlpath, text)
569 class HttpRequest(object):
570 """Request to HttpService."""
572 def __init__(self, method, url, params, body, headers, timeout, stream):
574 |method| - HTTP method to use
575 |url| - relative URL to the resource, without query parameters
576 |params| - list of (key, value) pairs to put into GET parameters
577 |body| - encoded body of the request (None or str)
578 |headers| - dict with request headers
579 |timeout| - socket read timeout (None to disable)
580 |stream| - True to stream response from socket
584 self.params = params[:]
586 self.headers = headers.copy()
587 self.timeout = timeout
593 """CookieJar object that will be used for cookies in this request."""
594 if self._cookies is None:
595 self._cookies = cookielib.CookieJar()
598 def get_full_url(self):
599 """Resource URL with url-encoded GET parameters."""
603 return '%s?%s' % (self.url, urllib.urlencode(self.params))
605 def make_fake_response(self, content='', headers=None):
606 """Makes new fake HttpResponse to this request, useful in tests."""
607 return HttpResponse.get_fake_response(content, self.get_full_url(), headers)
610 class HttpResponse(object):
611 """Response from HttpService."""
613 def __init__(self, stream, url, headers):
614 self._stream = stream
616 self._headers = get_case_insensitive_dict(headers)
620 def content_length(self):
621 """Total length to the response or None if not known in advance."""
622 length = self.get_header('Content-Length')
623 return int(length) if length is not None else None
625 def get_header(self, header):
626 """Returns response header (as str) or None if no such header."""
627 return self._headers.get(header)
629 def read(self, size=None):
630 """Reads up to |size| bytes from the stream and returns them.
632 If |size| is None reads all available bytes.
634 Raises TimeoutError on read timeout.
637 # cStringIO has a bug: stream.read(None) is not the same as stream.read().
638 data = self._stream.read() if size is None else self._stream.read(size)
639 self._read += len(data)
641 except (socket.timeout, ssl.SSLError, requests.Timeout) as e:
642 logging.error('Timeout while reading from %s, read %d of %s: %s',
643 self._url, self._read, self.content_length, e)
644 raise TimeoutError(e)
647 def get_fake_response(cls, content, url, headers=None):
648 """Returns HttpResponse with predefined content, useful in tests."""
649 headers = dict(headers or {})
650 headers['Content-Length'] = len(content)
651 return cls(StringIO.StringIO(content), url, headers)
654 class Authenticator(object):
655 """Base class for objects that know how to authenticate into http services."""
657 def authorize(self, request):
658 """Add authentication information to the request."""
660 def login(self, allow_user_interaction):
661 """Run interactive authentication flow."""
662 raise NotImplementedError()
665 """Purges access credentials from local cache."""
668 class RequestsLibEngine(object):
669 """Class that knows how to execute HttpRequests via requests library."""
671 # Preferred number of connections in a connection pool.
672 CONNECTION_POOL_SIZE = 64
673 # If True will not open more than CONNECTION_POOL_SIZE connections.
674 CONNECTION_POOL_BLOCK = False
675 # Maximum number of internal connection retries in a connection pool.
676 CONNECTION_RETRIES = 0
679 super(RequestsLibEngine, self).__init__()
680 self.session = requests.Session()
682 self.session.trust_env = False
683 self.session.verify = tools.get_cacerts_bundle()
684 # Configure connection pools.
685 for protocol in ('https://', 'http://'):
686 self.session.mount(protocol, adapters.HTTPAdapter(
687 pool_connections=self.CONNECTION_POOL_SIZE,
688 pool_maxsize=self.CONNECTION_POOL_SIZE,
689 max_retries=self.CONNECTION_RETRIES,
690 pool_block=self.CONNECTION_POOL_BLOCK))
692 def perform_request(self, request):
693 """Sends a HttpRequest to the server and reads back the response.
695 Returns HttpResponse.
698 ConnectionError - failed to establish connection to the server.
699 TimeoutError - timeout while connecting or reading response.
700 HttpError - server responded with >= 400 error code.
703 response = self.session.request(
704 method=request.method,
706 params=request.params,
708 headers=request.headers,
709 cookies=request.cookies,
710 timeout=request.timeout,
711 stream=request.stream)
712 response.raise_for_status()
714 stream = response.raw
716 stream = StringIO.StringIO(response.content)
717 return HttpResponse(stream, request.get_full_url(), response.headers)
718 except requests.Timeout as e:
719 raise TimeoutError(e)
720 except requests.HTTPError as e:
721 raise HttpError(e.response.status_code, e)
722 except (requests.ConnectionError, socket.timeout, ssl.SSLError) as e:
723 raise ConnectionError(e)
726 class OAuthAuthenticator(Authenticator):
727 """Uses OAuth Authorization header to authenticate requests."""
729 def __init__(self, urlhost, config):
730 super(OAuthAuthenticator, self).__init__()
731 assert isinstance(config, oauth.OAuthConfig)
732 self.urlhost = urlhost
734 self._lock = threading.Lock()
735 self._access_token_known = False
736 self._access_token = None
738 def authorize(self, request):
740 if not self._access_token_known:
741 self._access_token = oauth.load_access_token(self.urlhost, self.config)
742 self._access_token_known = True
743 if self._access_token:
744 request.headers['Authorization'] = 'Bearer %s' % self._access_token
746 def login(self, allow_user_interaction):
748 self._access_token = oauth.create_access_token(
749 self.urlhost, self.config, allow_user_interaction)
750 self._access_token_known = True
751 return self._access_token is not None
755 self._access_token = None
756 self._access_token_known = True
757 oauth.purge_access_token(self.urlhost, self.config)
760 class RetryAttempt(object):
761 """Contains information about current retry attempt.
763 Yielded from retry_loop.
766 def __init__(self, attempt, remaining):
767 """Information about current attempt in retry loop:
768 |attempt| - zero based index of attempt.
769 |remaining| - how much time is left before retry loop finishes retries.
771 self.attempt = attempt
772 self.remaining = remaining
773 self.skip_sleep = False
776 def calculate_sleep_before_retry(attempt, max_duration):
777 """How long to sleep before retrying an attempt in retry_loop."""
778 # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
781 # random.random() returns [0.0, 1.0). Starts with relatively short waiting
782 # time by starting with 1.5/2+1.5^-1 median offset.
783 duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
784 assert duration > 0.1
785 duration = min(MAX_SLEEP, duration)
787 duration = min(max_duration, duration)
791 def sleep_before_retry(attempt, max_duration):
792 """Sleeps for some amount of time when retrying the attempt in retry_loop.
794 To be mocked in tests.
796 time.sleep(calculate_sleep_before_retry(attempt, max_duration))
800 """Used by retry loop to get current time.
802 To be mocked in tests.
807 def retry_loop(max_attempts=None, timeout=None):
808 """Yields whenever new attempt to perform some action is needed.
810 Yields instances of RetryAttempt class that contains information about current
811 attempt. Setting |skip_sleep| attribute of RetryAttempt to True will cause
812 retry loop to run next attempt immediately.
814 start = current_time()
815 for attempt in itertools.count():
817 if max_attempts and attempt == max_attempts:
819 # Retried for too long?
820 remaining = (timeout - (current_time() - start)) if timeout else None
821 if remaining is not None and remaining < 0:
823 # Kick next iteration.
824 attemp_obj = RetryAttempt(attempt, remaining)
826 if attemp_obj.skip_sleep:
828 # Only sleep if we are going to try again.
829 if max_attempts and attempt != max_attempts - 1:
830 remaining = (timeout - (current_time() - start)) if timeout else None
831 if remaining is not None and remaining < 0:
833 sleep_before_retry(attempt, remaining)