1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
5 """Classes and functions for generic network communication over HTTP."""
8 import cStringIO as StringIO
26 from third_party import requests
27 from third_party.requests import adapters
28 from third_party.requests import structures
29 from third_party.rietveld import upload
31 from utils import oauth
32 from utils import tools
33 from utils import zip_package
35 # Hack out upload logging.info()
36 upload.logging = logging.getLogger('upload')
37 # Mac pylint choke on this line.
38 upload.logging.setLevel(logging.WARNING) # pylint: disable=E1103
41 # TODO(vadimsh): Remove this once we don't have to support python 2.6 anymore.
42 def monkey_patch_httplib():
43 """Patch httplib.HTTPConnection to have '_tunnel_host' attribute.
45 'requests' library (>= v2) accesses 'HTTPConnection._tunnel_host' attribute
46 added only in python 2.6.3. This function patches HTTPConnection to have it
47 on python 2.6.2 as well.
49 conn = httplib.HTTPConnection('example.com')
50 if not hasattr(conn, '_tunnel_host'):
51 httplib.HTTPConnection._tunnel_host = None
52 monkey_patch_httplib()
55 # The name of the key to store the count of url attempts.
56 COUNT_KEY = 'UrlOpenAttempt'
58 # Default maximum number of attempts to trying opening a url before aborting.
59 URL_OPEN_MAX_ATTEMPTS = 30
61 # Default timeout when retrying.
62 URL_OPEN_TIMEOUT = 6*60.
64 # Default timeout when reading from open HTTP connection.
67 # Content type for url encoded POST body.
68 URL_ENCODED_FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded'
69 # Content type for JSON body.
70 JSON_CONTENT_TYPE = 'application/json; charset=UTF-8'
71 # Default content type for POST body.
72 DEFAULT_CONTENT_TYPE = URL_ENCODED_FORM_CONTENT_TYPE
74 # Content type -> function that encodes a request body.
76 URL_ENCODED_FORM_CONTENT_TYPE:
79 lambda x: json.dumps(x, sort_keys=True, separators=(',', ':')),
82 # File to use to store all auth cookies.
83 COOKIE_FILE = os.path.join(os.path.expanduser('~'), '.isolated_cookies')
85 # Google Storage URL regular expression.
86 GS_STORAGE_HOST_URL_RE = re.compile(r'https://.*\.storage\.googleapis\.com')
88 # All possible authentication methods with corresponding types of method config
89 # object (or None if method is not configurable). See configure_auth.
90 # Order is important: it's visible in commands --help output.
92 ('oauth', oauth.OAuthConfig),
99 # Global (for now) map: server URL (http://example.com) -> HttpService instance.
100 # Used by get_http_service to cache HttpService instances.
102 _http_services_lock = threading.Lock()
104 # CookieJar reused by all services + lock that protects its instantiation.
106 _cookie_jar_lock = threading.Lock()
108 # Path to cacert.pem bundle file reused by all services.
110 _ca_certs_lock = threading.Lock()
112 # This lock ensures that user won't be confused with multiple concurrent
114 _auth_lock = threading.Lock()
116 # Set in 'configure_auth'. If configure_auth is not called before first request,
117 # will be set to defaults generated by 'get_default_auth_config'.
119 _auth_method_config = None
122 class NetError(IOError):
123 """Generic network related error."""
125 def __init__(self, inner_exc=None):
126 super(NetError, self).__init__(str(inner_exc or self.__doc__))
127 self.inner_exc = inner_exc
129 def format(self, verbose=False):
130 """Human readable description with detailed information about the error."""
131 out = [str(self.inner_exc)]
135 if isinstance(self.inner_exc, urllib2.HTTPError):
136 headers = self.inner_exc.hdrs.items()
137 body = self.inner_exc.read()
138 elif isinstance(self.inner_exc, requests.HTTPError):
139 headers = self.inner_exc.response.headers.items()
140 body = self.inner_exc.response.content
142 out.append('----------')
144 for header, value in headers:
145 if not header.startswith('x-'):
146 out.append('%s: %s' % (header.capitalize(), value))
148 out.append(body or '<empty body>')
149 out.append('----------')
150 return '\n'.join(out)
153 class TimeoutError(NetError):
154 """Timeout while reading HTTP response."""
157 class ConnectionError(NetError):
158 """Failed to connect to the server."""
161 class HttpError(NetError):
162 """Server returned HTTP error code."""
164 def __init__(self, code, inner_exc=None):
165 super(HttpError, self).__init__(inner_exc)
169 def url_open(url, **kwargs):
170 """Attempts to open the given url multiple times.
172 |data| can be either:
173 - None for a GET request
174 - str for pre-encoded data
175 - list for data to be encoded
176 - dict for data to be encoded
178 See HttpService.request for a full list of arguments.
180 Returns HttpResponse object, where the response may be read from, or None
181 if it was unable to connect.
183 urlhost, urlpath = split_server_request_url(url)
184 service = get_http_service(urlhost)
185 return service.request(urlpath, **kwargs)
188 def url_read(url, **kwargs):
189 """Attempts to open the given url multiple times and read all data from it.
191 Accepts same arguments as url_open function.
193 Returns all data read or None if it was unable to connect or read the data.
195 kwargs['stream'] = False
196 response = url_open(url, **kwargs)
200 return response.read()
205 def split_server_request_url(url):
206 """Splits the url into scheme+netloc and path+params+query+fragment."""
207 url_parts = list(urlparse.urlparse(url))
208 urlhost = '%s://%s' % (url_parts[0], url_parts[1])
209 urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
210 return urlhost, urlpath
213 def get_http_service(urlhost):
214 """Returns existing or creates new instance of HttpService that can send
215 requests to given base urlhost.
217 # Ensure consistency in url naming.
218 urlhost = str(urlhost).lower().rstrip('/')
219 # Do not use COUNT_KEY with Google Storage (since it breaks a signature).
220 use_count_key = not GS_STORAGE_HOST_URL_RE.match(urlhost)
221 with _http_services_lock:
222 service = _http_services.get(urlhost)
224 service = HttpService(
226 engine=RequestsLibEngine(get_cacerts_bundle()),
227 authenticator=create_authenticator(urlhost),
228 use_count_key=use_count_key)
229 _http_services[urlhost] = service
233 def get_cookie_jar():
234 """Returns global CoookieJar object that stores cookies in the file."""
236 with _cookie_jar_lock:
237 if _cookie_jar is not None:
239 jar = ThreadSafeCookieJar(COOKIE_FILE)
245 def get_cacerts_bundle():
246 """Returns path to a file with CA root certificates bundle."""
249 if _ca_certs is not None and os.path.exists(_ca_certs):
251 _ca_certs = zip_package.extract_resource(requests, 'cacert.pem')
255 def get_default_auth_config():
256 """Returns auth configuration used by default if configure_auth is not called.
258 If running in a headless mode on bots, will use 'bot' auth, otherwise
259 'oauth' with default oauth config.
261 Returns pair (auth method name, auth method config).
263 if tools.is_headless():
266 return 'oauth', oauth.make_oauth_config()
269 def configure_auth(method, config=None):
270 """Defines what authentication methods to use.
272 Possible authentication methods are:
273 'bot' - use HMAC authentication based on a secret key.
274 'cookie' - use cookie-based authentication.
275 'none' - do not use authentication.
276 'oauth' - use oauth-based authentication.
279 method: what method to use.
280 config: object that holds configuration for authentication method.
281 Concrete type depends on a method used (see AUTH_METHODS for expected
282 type). Passed to corresponding authenticator instance.
285 global _auth_method_config
286 assert method in dict(AUTH_METHODS), method
287 config_type = dict(AUTH_METHODS)[method]
288 if config_type and not isinstance(config, config_type):
290 'Expecting \'%s\' auth config to be of type %s. Got %s instead.' %
291 (method, config_type, type(config)))
292 elif not config_type and config is not None:
293 raise TypeError('Auth method \'%s\' is not configurable.' % method)
295 _auth_method = method
296 _auth_method_config = config
299 def create_authenticator(urlhost):
300 """Makes Authenticator instance used by HttpService to access |urlhost|."""
301 # We use signed URL for Google Storage, no need for special authentication.
302 if GS_STORAGE_HOST_URL_RE.match(urlhost):
305 # Lazy initialize auth config with defaults.
307 default_method, default_config = get_default_auth_config()
308 configure_auth(default_method, default_config)
310 # Use configuration set with 'configure_auth'.
312 if _auth_method == 'bot':
313 # TODO(vadimsh): Implement it. Use IP whitelist (that doesn't require
314 # any authenticator instance) for now.
316 elif _auth_method == 'cookie':
317 return CookieBasedAuthenticator(urlhost, get_cookie_jar())
318 elif _auth_method == 'none':
320 elif _auth_method == 'oauth':
321 return OAuthAuthenticator(urlhost, _auth_method_config)
322 raise AssertionError('Invalid auth method: %s' % _auth_method)
325 def get_case_insensitive_dict(original):
326 """Given a dict with string keys returns new CaseInsensitiveDict.
328 Raises ValueError if there are duplicate keys.
330 normalized = structures.CaseInsensitiveDict(original or {})
331 if len(normalized) != len(original):
332 raise ValueError('Duplicate keys in: %s' % repr(original))
336 class HttpService(object):
337 """Base class for a class that provides an API to HTTP based service:
338 - Provides 'request' method.
339 - Supports automatic request retries.
343 def __init__(self, urlhost, engine, authenticator=None, use_count_key=True):
344 self.urlhost = urlhost
346 self.authenticator = authenticator
347 self.use_count_key = use_count_key
350 def is_transient_http_error(code, retry_404, retry_50x):
351 """Returns True if given HTTP response code is a transient error."""
352 # Google Storage can return this and it should be retried.
355 # Retry 404 only if allowed by the caller.
358 # All other 4** errors are fatal.
361 # Retry >= 500 error only if allowed by the caller.
365 def encode_request_body(body, content_type):
366 """Returns request body encoded according to its content type."""
367 # No body or it is already encoded.
368 if body is None or isinstance(body, str):
370 # Any body should have content type set.
371 assert content_type, 'Request has body, but no content type'
372 encoder = CONTENT_ENCODERS.get(content_type)
373 assert encoder, ('Unknown content type %s' % content_type)
376 def login(self, allow_user_interaction):
377 """Runs authentication flow to refresh short lived access token.
379 Authentication flow may need to interact with the user (read username from
380 stdin, open local browser for OAuth2, etc.). If interaction is required and
381 |allow_user_interaction| is False, the login will silently be considered
382 failed (i.e. this function returns False).
384 'request' method always uses non-interactive login, so long-lived
385 authentication tokens (cookie, OAuth2 refresh token, etc) have to be set up
386 manually by developer (by calling 'auth.py login' perhaps) prior running
387 any swarming or isolate scripts.
389 # Use global lock to ensure two authentication flows never run in parallel.
391 if self.authenticator:
392 return self.authenticator.login(allow_user_interaction)
396 """Purges access credentials from local cache."""
397 if self.authenticator:
398 self.authenticator.logout()
405 max_attempts=URL_OPEN_MAX_ATTEMPTS,
408 timeout=URL_OPEN_TIMEOUT,
409 read_timeout=URL_READ_TIMEOUT,
413 """Attempts to open the given url multiple times.
415 |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
417 |data| can be either:
418 - None for a GET request
419 - str for pre-encoded data
420 - list for data to be form-encoded
421 - dict for data to be form-encoded
423 - Optionally retries HTTP 404 and 50x.
424 - Retries up to |max_attempts| times. If None or 0, there's no limit in the
426 - Retries up to |timeout| duration in seconds. If None or 0, there's no
427 limit in the time taken to do retries.
428 - If both |max_attempts| and |timeout| are None or 0, this functions retries
431 If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
432 when performing the request. By default it's GET if |data| is None and POST
433 if |data| is not None.
435 If |headers| is given, it should be a dict with HTTP headers to append
436 to request. Caller is responsible for providing headers that make sense.
438 If |read_timeout| is not None will configure underlying socket to
439 raise TimeoutError exception whenever there's no response from the server
440 for more than |read_timeout| seconds. It can happen during any read
441 operation so once you pass non-None |read_timeout| be prepared to handle
442 these exceptions in subsequent reads from the stream.
444 Returns a file-like object, where the response may be read from, or None
445 if it was unable to connect. If |stream| is False will read whole response
446 into memory buffer before returning file-like object that reads from this
449 assert urlpath and urlpath[0] == '/', urlpath
452 assert method in (None, 'POST', 'PUT')
453 method = method or 'POST'
454 content_type = content_type or DEFAULT_CONTENT_TYPE
455 body = self.encode_request_body(data, content_type)
457 assert method in (None, 'GET')
458 method = method or 'GET'
460 assert not content_type, 'Can\'t use content_type on GET'
462 # Prepare request info.
463 parsed = urlparse.urlparse('/' + urlpath.lstrip('/'))
464 resource_url = urlparse.urljoin(self.urlhost, parsed.path)
465 query_params = urlparse.parse_qsl(parsed.query)
468 headers = get_case_insensitive_dict(headers or {})
470 headers['Content-Length'] = len(body)
472 headers['Content-Type'] = content_type
475 auth_attempted = False
477 for attempt in retry_loop(max_attempts, timeout):
478 # Log non-first attempt.
481 'Retrying request %s, attempt %d/%d...',
482 resource_url, attempt.attempt, max_attempts)
485 # Prepare and send a new request.
486 request = HttpRequest(method, resource_url, query_params, body,
487 headers, read_timeout, stream)
488 self.prepare_request(request, attempt.attempt)
489 if self.authenticator:
490 self.authenticator.authorize(request)
491 response = self.engine.perform_request(request)
492 logging.debug('Request %s succeeded', request.get_full_url())
495 except (ConnectionError, TimeoutError) as e:
498 'Unable to open url %s on attempt %d.\n%s',
499 request.get_full_url(), attempt.attempt, e.format())
502 except HttpError as e:
505 # Access denied -> authenticate.
506 if e.code in (401, 403):
508 'Authentication is required for %s on attempt %d.\n%s',
509 request.get_full_url(), attempt.attempt, e.format())
510 # Try to authenticate only once. If it doesn't help, then server does
511 # not support authentication or user doesn't have required access.
512 if not auth_attempted:
513 auth_attempted = True
514 if self.login(allow_user_interaction=False):
515 # Success! Run request again immediately.
516 attempt.skip_sleep = True
518 # Authentication attempt was unsuccessful.
520 'Unable to authenticate to %s (%s). Use auth.py to login: '
521 'python auth.py login --service=%s',
522 self.urlhost, e.format(), self.urlhost)
525 # Hit a error that can not be retried -> stop retry loop.
526 if not self.is_transient_http_error(e.code, retry_404, retry_50x):
527 # This HttpError means we reached the server and there was a problem
528 # with the request, so don't retry.
530 'Able to connect to %s but an exception was thrown.\n%s',
531 request.get_full_url(), e.format(verbose=True))
534 # Retry all other errors.
536 'Server responded with error on %s on attempt %d.\n%s',
537 request.get_full_url(), attempt.attempt, e.format())
541 'Unable to open given url, %s, after %d attempts.\n%s',
542 request.get_full_url(), max_attempts, last_error.format(verbose=True))
550 max_attempts=URL_OPEN_MAX_ATTEMPTS,
551 timeout=URL_OPEN_TIMEOUT,
553 """Sends JSON request to the server and parses JSON response it get back.
556 method: HTTP method to use ('GET', 'POST', ...).
557 urlpath: relative request path (e.g. '/auth/v1/...').
558 body: object to serialize to JSON and sent in the request.
559 max_attempts: how many times to retry 50x errors.
560 timeout: how long to wait for a response (including all retries).
561 headers: dict with additional request headers.
564 Deserialized JSON response on success, None on error or timeout.
566 response = self.request(
568 content_type=JSON_CONTENT_TYPE if body is not None else None,
571 max_attempts=max_attempts,
580 text = response.read()
586 return json.loads(text)
588 logging.error('Not a JSON response when calling %s: %s', urlpath, text)
591 def prepare_request(self, request, attempt): # pylint: disable=R0201
592 """Modify HttpRequest before sending it by adding COUNT_KEY parameter."""
593 # Add COUNT_KEY only on retries.
594 if self.use_count_key and attempt:
595 request.params += [(COUNT_KEY, attempt)]
598 class HttpRequest(object):
599 """Request to HttpService."""
601 def __init__(self, method, url, params, body, headers, timeout, stream):
603 |method| - HTTP method to use
604 |url| - relative URL to the resource, without query parameters
605 |params| - list of (key, value) pairs to put into GET parameters
606 |body| - encoded body of the request (None or str)
607 |headers| - dict with request headers
608 |timeout| - socket read timeout (None to disable)
609 |stream| - True to stream response from socket
613 self.params = params[:]
615 self.headers = headers.copy()
616 self.timeout = timeout
622 """CookieJar object that will be used for cookies in this request."""
623 if self._cookies is None:
624 self._cookies = cookielib.CookieJar()
627 def get_full_url(self):
628 """Resource URL with url-encoded GET parameters."""
632 return '%s?%s' % (self.url, urllib.urlencode(self.params))
634 def make_fake_response(self, content='', headers=None):
635 """Makes new fake HttpResponse to this request, useful in tests."""
636 return HttpResponse.get_fake_response(content, self.get_full_url(), headers)
639 class HttpResponse(object):
640 """Response from HttpService."""
642 def __init__(self, stream, url, headers):
643 self._stream = stream
645 self._headers = get_case_insensitive_dict(headers)
649 def content_length(self):
650 """Total length to the response or None if not known in advance."""
651 length = self.get_header('Content-Length')
652 return int(length) if length is not None else None
654 def get_header(self, header):
655 """Returns response header (as str) or None if no such header."""
656 return self._headers.get(header)
658 def read(self, size=None):
659 """Reads up to |size| bytes from the stream and returns them.
661 If |size| is None reads all available bytes.
663 Raises TimeoutError on read timeout.
666 # cStringIO has a bug: stream.read(None) is not the same as stream.read().
667 data = self._stream.read() if size is None else self._stream.read(size)
668 self._read += len(data)
670 except (socket.timeout, ssl.SSLError, requests.Timeout) as e:
671 logging.error('Timeout while reading from %s, read %d of %s: %s',
672 self._url, self._read, self.content_length, e)
673 raise TimeoutError(e)
676 def get_fake_response(cls, content, url, headers=None):
677 """Returns HttpResponse with predefined content, useful in tests."""
678 headers = dict(headers or {})
679 headers['Content-Length'] = len(content)
680 return cls(StringIO.StringIO(content), url, headers)
683 class Authenticator(object):
684 """Base class for objects that know how to authenticate into http services."""
686 def authorize(self, request):
687 """Add authentication information to the request."""
689 def login(self, allow_user_interaction):
690 """Run interactive authentication flow."""
691 raise NotImplementedError()
694 """Purges access credentials from local cache."""
697 class RequestsLibEngine(object):
698 """Class that knows how to execute HttpRequests via requests library."""
700 # Preferred number of connections in a connection pool.
701 CONNECTION_POOL_SIZE = 64
702 # If True will not open more than CONNECTION_POOL_SIZE connections.
703 CONNECTION_POOL_BLOCK = False
704 # Maximum number of internal connection retries in a connection pool.
705 CONNECTION_RETRIES = 0
707 def __init__(self, ca_certs):
708 super(RequestsLibEngine, self).__init__()
709 self.session = requests.Session()
711 self.session.trust_env = False
712 self.session.verify = ca_certs
713 # Configure connection pools.
714 for protocol in ('https://', 'http://'):
715 self.session.mount(protocol, adapters.HTTPAdapter(
716 pool_connections=self.CONNECTION_POOL_SIZE,
717 pool_maxsize=self.CONNECTION_POOL_SIZE,
718 max_retries=self.CONNECTION_RETRIES,
719 pool_block=self.CONNECTION_POOL_BLOCK))
721 def perform_request(self, request):
722 """Sends a HttpRequest to the server and reads back the response.
724 Returns HttpResponse.
727 ConnectionError - failed to establish connection to the server.
728 TimeoutError - timeout while connecting or reading response.
729 HttpError - server responded with >= 400 error code.
732 response = self.session.request(
733 method=request.method,
735 params=request.params,
737 headers=request.headers,
738 cookies=request.cookies,
739 timeout=request.timeout,
740 stream=request.stream)
741 response.raise_for_status()
743 stream = response.raw
745 stream = StringIO.StringIO(response.content)
746 return HttpResponse(stream, request.get_full_url(), response.headers)
747 except requests.Timeout as e:
748 raise TimeoutError(e)
749 except requests.HTTPError as e:
750 raise HttpError(e.response.status_code, e)
751 except (requests.ConnectionError, socket.timeout, ssl.SSLError) as e:
752 raise ConnectionError(e)
755 # TODO(vadimsh): Remove once everything is using OAuth or HMAC-based auth.
756 class CookieBasedAuthenticator(Authenticator):
757 """Uses cookies (that AppEngine recognizes) to authenticate to |urlhost|."""
759 def __init__(self, urlhost, cookie_jar):
760 super(CookieBasedAuthenticator, self).__init__()
761 self.urlhost = urlhost
762 self.cookie_jar = cookie_jar
766 self._lock = threading.Lock()
768 def authorize(self, request):
769 # Copy all cookies from authenticator cookie jar to request cookie jar.
771 with self.cookie_jar:
772 for cookie in self.cookie_jar:
773 request.cookies.set_cookie(cookie)
775 def login(self, allow_user_interaction):
776 # Cookie authentication is always interactive (it asks for user name).
777 if not allow_user_interaction:
778 print >> sys.stderr, 'Cookie authentication requires interactive login'
780 # To be used from inside AuthServer.
781 cookie_jar = self.cookie_jar
782 # RPC server that uses AuthenticationSupport's cookie jar.
783 class AuthServer(upload.AbstractRpcServer):
784 def _GetOpener(self):
785 # Authentication code needs to know about 302 response.
786 # So make OpenerDirector without HTTPRedirectHandler.
787 opener = urllib2.OpenerDirector()
788 opener.add_handler(urllib2.ProxyHandler())
789 opener.add_handler(urllib2.UnknownHandler())
790 opener.add_handler(urllib2.HTTPHandler())
791 opener.add_handler(urllib2.HTTPDefaultErrorHandler())
792 opener.add_handler(urllib2.HTTPSHandler())
793 opener.add_handler(urllib2.HTTPErrorProcessor())
794 opener.add_handler(urllib2.HTTPCookieProcessor(cookie_jar))
796 def PerformAuthentication(self):
798 return self.authenticated
801 rpc_server = AuthServer(self.urlhost, self.get_credentials)
802 return rpc_server.PerformAuthentication()
805 domain = urlparse.urlparse(self.urlhost).netloc
807 with self.cookie_jar:
808 self.cookie_jar.clear(domain)
812 def get_credentials(self):
813 """Called during authentication process to get the credentials.
815 May be called multiple times if authentication fails.
817 Returns tuple (email, password).
819 if self.email and self.password:
820 return (self.email, self.password)
821 self._keyring = self._keyring or upload.KeyringCreds(self.urlhost,
822 self.urlhost, self.email)
823 return self._keyring.GetUserCredentials()
826 # TODO(vadimsh): Remove once everything is using OAuth or HMAC-based auth.
827 class ThreadSafeCookieJar(cookielib.MozillaCookieJar):
828 """MozillaCookieJar with thread safe load and save."""
831 """Context manager interface."""
834 def __exit__(self, *_args):
835 """Saves cookie jar when exiting the block."""
839 def load(self, filename=None, ignore_discard=False, ignore_expires=False):
840 """Loads cookies from the file if it exists."""
841 filename = os.path.expanduser(filename or self.filename)
842 with self._cookies_lock:
843 if os.path.exists(filename):
845 cookielib.MozillaCookieJar.load(
846 self, filename, ignore_discard, ignore_expires)
847 logging.debug('Loaded cookies from %s', filename)
848 except (cookielib.LoadError, IOError):
852 fd = os.open(filename, os.O_CREAT, 0600)
855 logging.debug('Failed to create %s', filename)
857 os.chmod(filename, 0600)
859 logging.debug('Failed to fix mode for %s', filename)
861 def save(self, filename=None, ignore_discard=False, ignore_expires=False):
862 """Saves cookies to the file, completely overwriting it."""
863 logging.debug('Saving cookies to %s', filename or self.filename)
864 with self._cookies_lock:
866 cookielib.MozillaCookieJar.save(
867 self, filename, ignore_discard, ignore_expires)
869 logging.error('Failed to save %s', filename)
872 class OAuthAuthenticator(Authenticator):
873 """Uses OAuth Authorization header to authenticate requests."""
875 def __init__(self, urlhost, config):
876 super(OAuthAuthenticator, self).__init__()
877 assert isinstance(config, oauth.OAuthConfig)
878 self.urlhost = urlhost
880 self._lock = threading.Lock()
881 self._access_token = oauth.load_access_token(self.urlhost, self.config)
883 def authorize(self, request):
885 if self._access_token:
886 request.headers['Authorization'] = 'Bearer %s' % self._access_token
888 def login(self, allow_user_interaction):
890 self._access_token = oauth.create_access_token(
891 self.urlhost, self.config, allow_user_interaction)
892 return self._access_token is not None
896 self._access_token = None
897 oauth.purge_access_token(self.urlhost, self.config)
900 class RetryAttempt(object):
901 """Contains information about current retry attempt.
903 Yielded from retry_loop.
906 def __init__(self, attempt, remaining):
907 """Information about current attempt in retry loop:
908 |attempt| - zero based index of attempt.
909 |remaining| - how much time is left before retry loop finishes retries.
911 self.attempt = attempt
912 self.remaining = remaining
913 self.skip_sleep = False
916 def calculate_sleep_before_retry(attempt, max_duration):
917 """How long to sleep before retrying an attempt in retry_loop."""
918 # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
921 # random.random() returns [0.0, 1.0). Starts with relatively short waiting
922 # time by starting with 1.5/2+1.5^-1 median offset.
923 duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
924 assert duration > 0.1
925 duration = min(MAX_SLEEP, duration)
927 duration = min(max_duration, duration)
931 def sleep_before_retry(attempt, max_duration):
932 """Sleeps for some amount of time when retrying the attempt in retry_loop.
934 To be mocked in tests.
936 time.sleep(calculate_sleep_before_retry(attempt, max_duration))
940 """Used by retry loop to get current time.
942 To be mocked in tests.
947 def retry_loop(max_attempts=None, timeout=None):
948 """Yields whenever new attempt to perform some action is needed.
950 Yields instances of RetryAttempt class that contains information about current
951 attempt. Setting |skip_sleep| attribute of RetryAttempt to True will cause
952 retry loop to run next attempt immediately.
954 start = current_time()
955 for attempt in itertools.count():
957 if max_attempts and attempt == max_attempts:
959 # Retried for too long?
960 remaining = (timeout - (current_time() - start)) if timeout else None
961 if remaining is not None and remaining < 0:
963 # Kick next iteration.
964 attemp_obj = RetryAttempt(attempt, remaining)
966 if attemp_obj.skip_sleep:
968 # Only sleep if we are going to try again.
969 if max_attempts and attempt != max_attempts - 1:
970 remaining = (timeout - (current_time() - start)) if timeout else None
971 if remaining is not None and remaining < 0:
973 sleep_before_retry(attempt, remaining)