1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
5 """Classes and functions for generic network communication over HTTP."""
8 import cStringIO as StringIO
26 from third_party import requests
27 from third_party.requests import adapters
28 from third_party.requests import structures
29 from third_party.rietveld import upload
31 from utils import oauth
32 from utils import zip_package
34 # Hack out upload logging.info()
35 upload.logging = logging.getLogger('upload')
36 # Mac pylint choke on this line.
37 upload.logging.setLevel(logging.WARNING) # pylint: disable=E1103
40 # TODO(vadimsh): Remove this once we don't have to support python 2.6 anymore.
41 def monkey_patch_httplib():
42 """Patch httplib.HTTPConnection to have '_tunnel_host' attribute.
44 'requests' library (>= v2) accesses 'HTTPConnection._tunnel_host' attribute
45 added only in python 2.6.3. This function patches HTTPConnection to have it
46 on python 2.6.2 as well.
48 conn = httplib.HTTPConnection('example.com')
49 if not hasattr(conn, '_tunnel_host'):
50 httplib.HTTPConnection._tunnel_host = None
51 monkey_patch_httplib()
54 # The name of the key to store the count of url attempts.
55 COUNT_KEY = 'UrlOpenAttempt'
57 # Default maximum number of attempts to trying opening a url before aborting.
58 URL_OPEN_MAX_ATTEMPTS = 30
60 # Default timeout when retrying.
61 URL_OPEN_TIMEOUT = 6*60.
63 # Content type for url encoded POST body.
64 URL_ENCODED_FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded'
65 # Content type for JSON body.
66 JSON_CONTENT_TYPE = 'application/json; charset=UTF-8'
67 # Default content type for POST body.
68 DEFAULT_CONTENT_TYPE = URL_ENCODED_FORM_CONTENT_TYPE
70 # Content type -> function that encodes a request body.
72 URL_ENCODED_FORM_CONTENT_TYPE:
75 lambda x: json.dumps(x, sort_keys=True, separators=(',', ':')),
78 # File to use to store all auth cookies.
79 COOKIE_FILE = os.path.join(os.path.expanduser('~'), '.isolated_cookies')
81 # Google Storage URL regular expression.
82 GS_STORAGE_HOST_URL_RE = re.compile(r'https://.*\.storage\.googleapis\.com')
84 # All possible authentication methods. See configure_auth.
85 AUTH_METHODS = ('oauth', 'cookie', 'bot', 'none')
88 # Global (for now) map: server URL (http://example.com) -> HttpService instance.
89 # Used by get_http_service to cache HttpService instances.
91 _http_services_lock = threading.Lock()
93 # CookieJar reused by all services + lock that protects its instantiation.
95 _cookie_jar_lock = threading.Lock()
97 # Path to cacert.pem bundle file reused by all services.
99 _ca_certs_lock = threading.Lock()
101 # This lock ensures that user won't be confused with multiple concurrent
103 _auth_lock = threading.Lock()
104 _auth_method_default = 'cookie'
105 _auth_methods_per_host = {}
106 _auth_oauth_options = None
109 class NetError(IOError):
110 """Generic network related error."""
112 def __init__(self, inner_exc=None):
113 super(NetError, self).__init__(str(inner_exc or self.__doc__))
114 self.inner_exc = inner_exc
116 def format(self, verbose=False):
117 """Human readable description with detailed information about the error."""
118 out = [str(self.inner_exc)]
122 if isinstance(self.inner_exc, urllib2.HTTPError):
123 headers = self.inner_exc.hdrs.items()
124 body = self.inner_exc.read()
125 elif isinstance(self.inner_exc, requests.HTTPError):
126 headers = self.inner_exc.response.headers.items()
127 body = self.inner_exc.response.content
129 out.append('----------')
131 for header, value in headers:
132 if not header.startswith('x-'):
133 out.append('%s: %s' % (header.capitalize(), value))
135 out.append(body or '<empty body>')
136 out.append('----------')
137 return '\n'.join(out)
140 class TimeoutError(NetError):
141 """Timeout while reading HTTP response."""
144 class ConnectionError(NetError):
145 """Failed to connect to the server."""
148 class HttpError(NetError):
149 """Server returned HTTP error code."""
151 def __init__(self, code, inner_exc=None):
152 super(HttpError, self).__init__(inner_exc)
156 def url_open(url, **kwargs):
157 """Attempts to open the given url multiple times.
159 |data| can be either:
160 - None for a GET request
161 - str for pre-encoded data
162 - list for data to be encoded
163 - dict for data to be encoded
165 See HttpService.request for a full list of arguments.
167 Returns HttpResponse object, where the response may be read from, or None
168 if it was unable to connect.
170 urlhost, urlpath = split_server_request_url(url)
171 service = get_http_service(urlhost)
172 return service.request(urlpath, **kwargs)
175 def url_read(url, **kwargs):
176 """Attempts to open the given url multiple times and read all data from it.
178 Accepts same arguments as url_open function.
180 Returns all data read or None if it was unable to connect or read the data.
182 kwargs['stream'] = False
183 response = url_open(url, **kwargs)
187 return response.read()
192 def split_server_request_url(url):
193 """Splits the url into scheme+netloc and path+params+query+fragment."""
194 url_parts = list(urlparse.urlparse(url))
195 urlhost = '%s://%s' % (url_parts[0], url_parts[1])
196 urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
197 return urlhost, urlpath
200 def get_http_service(urlhost):
201 """Returns existing or creates new instance of HttpService that can send
202 requests to given base urlhost.
204 # Ensure consistency in url naming.
205 urlhost = str(urlhost).lower().rstrip('/')
206 # Do not use COUNT_KEY with Google Storage (since it breaks a signature).
207 use_count_key = not GS_STORAGE_HOST_URL_RE.match(urlhost)
208 with _http_services_lock:
209 service = _http_services.get(urlhost)
211 service = HttpService(
213 engine=RequestsLibEngine(get_cacerts_bundle()),
214 authenticator=create_authenticator(urlhost),
215 use_count_key=use_count_key)
216 _http_services[urlhost] = service
220 def get_cookie_jar():
221 """Returns global CoookieJar object that stores cookies in the file."""
223 with _cookie_jar_lock:
224 if _cookie_jar is not None:
226 jar = ThreadSafeCookieJar(COOKIE_FILE)
232 def get_cacerts_bundle():
233 """Returns path to a file with CA root certificates bundle."""
236 if _ca_certs is not None and os.path.exists(_ca_certs):
238 _ca_certs = zip_package.extract_resource(requests, 'cacert.pem')
242 def configure_auth(default=None, urlhosts=None, oauth_options=None):
243 """Defines what authentication methods to use for given hosts.
245 Possible authentication methods are:
246 'bot' - use HMAC authentication based on a secret key.
247 'cookie' - use cookie-based authentication.
248 'none' - do not use authentication.
249 'oauth' - use oauth-based authentication.
252 default: what method to use if host-specific method isn't set in |urlhosts|.
253 urlhosts: dict host url -> method to use for that host.
254 oauth_options: OptionsParser with options added by oauth.add_oauth_options.
256 global _auth_method_default
257 global _auth_oauth_options
259 assert not default or default in AUTH_METHODS
261 v in AUTH_METHODS for v in (urlhosts or {}).values()), str(urlhosts)
265 _auth_method_default = default
266 _auth_methods_per_host.update(urlhosts or {})
268 _auth_oauth_options = oauth_options
271 def create_authenticator(urlhost):
272 """Makes Authenticator instance used by HttpService to access |urlhost|."""
273 # We use signed URL for Google Storage, no need for special authentication.
274 if GS_STORAGE_HOST_URL_RE.match(urlhost):
276 # For everything else use configuration set with 'configure_auth'.
278 method = _auth_methods_per_host.get(urlhost, _auth_method_default)
280 # TODO(vadimsh): Implement it. Use IP whitelist (that doesn't require
281 # any authenticator instance) for now.
283 elif method == 'cookie':
284 return CookieBasedAuthenticator(urlhost, get_cookie_jar())
285 elif method == 'none':
287 elif method == 'oauth':
288 return OAuthAuthenticator(urlhost, _auth_oauth_options)
289 raise AssertionError('Invalid auth method: %s' % method)
292 def get_case_insensitive_dict(original):
293 """Given a dict with string keys returns new CaseInsensitiveDict.
295 Raises ValueError if there are duplicate keys.
297 normalized = structures.CaseInsensitiveDict(original or {})
298 if len(normalized) != len(original):
299 raise ValueError('Duplicate keys in: %s' % repr(original))
303 class HttpService(object):
304 """Base class for a class that provides an API to HTTP based service:
305 - Provides 'request' method.
306 - Supports automatic request retries.
310 def __init__(self, urlhost, engine, authenticator=None, use_count_key=True):
311 self.urlhost = urlhost
313 self.authenticator = authenticator
314 self.use_count_key = use_count_key
317 def is_transient_http_error(code, retry_404, retry_50x):
318 """Returns True if given HTTP response code is a transient error."""
319 # Google Storage can return this and it should be retried.
322 # Retry 404 only if allowed by the caller.
325 # All other 4** errors are fatal.
328 # Retry >= 500 error only if allowed by the caller.
332 def encode_request_body(body, content_type):
333 """Returns request body encoded according to its content type."""
334 # No body or it is already encoded.
335 if body is None or isinstance(body, str):
337 # Any body should have content type set.
338 assert content_type, 'Request has body, but no content type'
339 encoder = CONTENT_ENCODERS.get(content_type)
340 assert encoder, ('Unknown content type %s' % content_type)
343 def login(self, allow_user_interaction):
344 """Runs authentication flow to refresh short lived access token.
346 Authentication flow may need to interact with the user (read username from
347 stdin, open local browser for OAuth2, etc.). If interaction is required and
348 |allow_user_interaction| is False, the login will silently be considered
349 failed (i.e. this function returns False).
351 'request' method always uses non-interactive login, so long-lived
352 authentication tokens (cookie, OAuth2 refresh token, etc) have to be set up
353 manually by developer (by calling 'auth.py login' perhaps) prior running
354 any swarming or isolate scripts.
356 # Use global lock to ensure two authentication flows never run in parallel.
358 if self.authenticator:
359 return self.authenticator.login(allow_user_interaction)
363 """Purges access credentials from local cache."""
364 if self.authenticator:
365 self.authenticator.logout()
372 max_attempts=URL_OPEN_MAX_ATTEMPTS,
375 timeout=URL_OPEN_TIMEOUT,
380 """Attempts to open the given url multiple times.
382 |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
384 |data| can be either:
385 - None for a GET request
386 - str for pre-encoded data
387 - list for data to be form-encoded
388 - dict for data to be form-encoded
390 - Optionally retries HTTP 404 and 50x.
391 - Retries up to |max_attempts| times. If None or 0, there's no limit in the
393 - Retries up to |timeout| duration in seconds. If None or 0, there's no
394 limit in the time taken to do retries.
395 - If both |max_attempts| and |timeout| are None or 0, this functions retries
398 If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
399 when performing the request. By default it's GET if |data| is None and POST
400 if |data| is not None.
402 If |headers| is given, it should be a dict with HTTP headers to append
403 to request. Caller is responsible for providing headers that make sense.
405 If |read_timeout| is not None will configure underlying socket to
406 raise TimeoutError exception whenever there's no response from the server
407 for more than |read_timeout| seconds. It can happen during any read
408 operation so once you pass non-None |read_timeout| be prepared to handle
409 these exceptions in subsequent reads from the stream.
411 Returns a file-like object, where the response may be read from, or None
412 if it was unable to connect. If |stream| is False will read whole response
413 into memory buffer before returning file-like object that reads from this
416 assert urlpath and urlpath[0] == '/', urlpath
419 assert method in (None, 'POST', 'PUT')
420 method = method or 'POST'
421 content_type = content_type or DEFAULT_CONTENT_TYPE
422 body = self.encode_request_body(data, content_type)
424 assert method in (None, 'GET')
425 method = method or 'GET'
427 assert not content_type, 'Can\'t use content_type on GET'
429 # Prepare request info.
430 parsed = urlparse.urlparse('/' + urlpath.lstrip('/'))
431 resource_url = urlparse.urljoin(self.urlhost, parsed.path)
432 query_params = urlparse.parse_qsl(parsed.query)
435 headers = get_case_insensitive_dict(headers or {})
437 headers['Content-Length'] = len(body)
439 headers['Content-Type'] = content_type
442 auth_attempted = False
444 for attempt in retry_loop(max_attempts, timeout):
445 # Log non-first attempt.
448 'Retrying request %s, attempt %d/%d...',
449 resource_url, attempt.attempt, max_attempts)
452 # Prepare and send a new request.
453 request = HttpRequest(method, resource_url, query_params, body,
454 headers, read_timeout, stream)
455 self.prepare_request(request, attempt.attempt)
456 if self.authenticator:
457 self.authenticator.authorize(request)
458 response = self.engine.perform_request(request)
459 logging.debug('Request %s succeeded', request.get_full_url())
462 except (ConnectionError, TimeoutError) as e:
465 'Unable to open url %s on attempt %d.\n%s',
466 request.get_full_url(), attempt.attempt, e.format())
469 except HttpError as e:
472 # Access denied -> authenticate.
473 if e.code in (401, 403):
475 'Authentication is required for %s on attempt %d.\n%s',
476 request.get_full_url(), attempt.attempt, e.format())
477 # Try to authenticate only once. If it doesn't help, then server does
478 # not support authentication or user doesn't have required access.
479 if not auth_attempted:
480 auth_attempted = True
481 if self.login(allow_user_interaction=False):
482 # Success! Run request again immediately.
483 attempt.skip_sleep = True
485 # Authentication attempt was unsuccessful.
487 'Unable to authenticate to %s (%s). Use auth.py to login: '
488 'python auth.py login --service=%s',
489 self.urlhost, e.format(), self.urlhost)
492 # Hit a error that can not be retried -> stop retry loop.
493 if not self.is_transient_http_error(e.code, retry_404, retry_50x):
494 # This HttpError means we reached the server and there was a problem
495 # with the request, so don't retry.
497 'Able to connect to %s but an exception was thrown.\n%s',
498 request.get_full_url(), e.format(verbose=True))
501 # Retry all other errors.
503 'Server responded with error on %s on attempt %d.\n%s',
504 request.get_full_url(), attempt.attempt, e.format())
508 'Unable to open given url, %s, after %d attempts.\n%s',
509 request.get_full_url(), max_attempts, last_error.format(verbose=True))
517 max_attempts=URL_OPEN_MAX_ATTEMPTS,
518 timeout=URL_OPEN_TIMEOUT,
520 """Sends JSON request to the server and parses JSON response it get back.
523 method: HTTP method to use ('GET', 'POST', ...).
524 urlpath: relative request path (e.g. '/auth/v1/...').
525 body: object to serialize to JSON and sent in the request.
526 max_attempts: how many times to retry 50x errors.
527 timeout: how long to wait for a response (including all retries).
528 headers: dict with additional request headers.
531 Deserialized JSON response on success, None on error or timeout.
533 response = self.request(
535 content_type=JSON_CONTENT_TYPE if body is not None else None,
538 max_attempts=max_attempts,
540 read_timeout=timeout,
548 text = response.read()
554 return json.loads(text)
556 logging.error('Not a JSON response when calling %s: %s', urlpath, text)
559 def prepare_request(self, request, attempt): # pylint: disable=R0201
560 """Modify HttpRequest before sending it by adding COUNT_KEY parameter."""
561 # Add COUNT_KEY only on retries.
562 if self.use_count_key and attempt:
563 request.params += [(COUNT_KEY, attempt)]
566 class HttpRequest(object):
567 """Request to HttpService."""
569 def __init__(self, method, url, params, body, headers, timeout, stream):
571 |method| - HTTP method to use
572 |url| - relative URL to the resource, without query parameters
573 |params| - list of (key, value) pairs to put into GET parameters
574 |body| - encoded body of the request (None or str)
575 |headers| - dict with request headers
576 |timeout| - socket read timeout (None to disable)
577 |stream| - True to stream response from socket
581 self.params = params[:]
583 self.headers = headers.copy()
584 self.timeout = timeout
590 """CookieJar object that will be used for cookies in this request."""
591 if self._cookies is None:
592 self._cookies = cookielib.CookieJar()
595 def get_full_url(self):
596 """Resource URL with url-encoded GET parameters."""
600 return '%s?%s' % (self.url, urllib.urlencode(self.params))
602 def make_fake_response(self, content='', headers=None):
603 """Makes new fake HttpResponse to this request, useful in tests."""
604 return HttpResponse.get_fake_response(content, self.get_full_url(), headers)
607 class HttpResponse(object):
608 """Response from HttpService."""
610 def __init__(self, stream, url, headers):
611 self._stream = stream
613 self._headers = get_case_insensitive_dict(headers)
617 def content_length(self):
618 """Total length to the response or None if not known in advance."""
619 length = self.get_header('Content-Length')
620 return int(length) if length is not None else None
622 def get_header(self, header):
623 """Returns response header (as str) or None if no such header."""
624 return self._headers.get(header)
626 def read(self, size=None):
627 """Reads up to |size| bytes from the stream and returns them.
629 If |size| is None reads all available bytes.
631 Raises TimeoutError on read timeout.
634 # cStringIO has a bug: stream.read(None) is not the same as stream.read().
635 data = self._stream.read() if size is None else self._stream.read(size)
636 self._read += len(data)
638 except (socket.timeout, ssl.SSLError, requests.Timeout) as e:
639 logging.error('Timeout while reading from %s, read %d of %s: %s',
640 self._url, self._read, self.content_length, e)
641 raise TimeoutError(e)
644 def get_fake_response(cls, content, url, headers=None):
645 """Returns HttpResponse with predefined content, useful in tests."""
646 headers = dict(headers or {})
647 headers['Content-Length'] = len(content)
648 return cls(StringIO.StringIO(content), url, headers)
651 class Authenticator(object):
652 """Base class for objects that know how to authenticate into http services."""
654 def authorize(self, request):
655 """Add authentication information to the request."""
657 def login(self, allow_user_interaction):
658 """Run interactive authentication flow."""
659 raise NotImplementedError()
662 """Purges access credentials from local cache."""
665 class RequestsLibEngine(object):
666 """Class that knows how to execute HttpRequests via requests library."""
668 # Preferred number of connections in a connection pool.
669 CONNECTION_POOL_SIZE = 64
670 # If True will not open more than CONNECTION_POOL_SIZE connections.
671 CONNECTION_POOL_BLOCK = False
672 # Maximum number of internal connection retries in a connection pool.
673 CONNECTION_RETRIES = 0
675 def __init__(self, ca_certs):
676 super(RequestsLibEngine, self).__init__()
677 self.session = requests.Session()
679 self.session.trust_env = False
680 self.session.verify = ca_certs
681 # Configure connection pools.
682 for protocol in ('https://', 'http://'):
683 self.session.mount(protocol, adapters.HTTPAdapter(
684 pool_connections=self.CONNECTION_POOL_SIZE,
685 pool_maxsize=self.CONNECTION_POOL_SIZE,
686 max_retries=self.CONNECTION_RETRIES,
687 pool_block=self.CONNECTION_POOL_BLOCK))
689 def perform_request(self, request):
690 """Sends a HttpRequest to the server and reads back the response.
692 Returns HttpResponse.
695 ConnectionError - failed to establish connection to the server.
696 TimeoutError - timeout while connecting or reading response.
697 HttpError - server responded with >= 400 error code.
700 response = self.session.request(
701 method=request.method,
703 params=request.params,
705 headers=request.headers,
706 cookies=request.cookies,
707 timeout=request.timeout,
708 stream=request.stream)
709 response.raise_for_status()
711 stream = response.raw
713 stream = StringIO.StringIO(response.content)
714 return HttpResponse(stream, request.get_full_url(), response.headers)
715 except requests.Timeout as e:
716 raise TimeoutError(e)
717 except requests.HTTPError as e:
718 raise HttpError(e.response.status_code, e)
719 except (requests.ConnectionError, socket.timeout, ssl.SSLError) as e:
720 raise ConnectionError(e)
723 # TODO(vadimsh): Remove once everything is using OAuth or HMAC-based auth.
724 class CookieBasedAuthenticator(Authenticator):
725 """Uses cookies (that AppEngine recognizes) to authenticate to |urlhost|."""
727 def __init__(self, urlhost, cookie_jar):
728 super(CookieBasedAuthenticator, self).__init__()
729 self.urlhost = urlhost
730 self.cookie_jar = cookie_jar
734 self._lock = threading.Lock()
736 def authorize(self, request):
737 # Copy all cookies from authenticator cookie jar to request cookie jar.
739 with self.cookie_jar:
740 for cookie in self.cookie_jar:
741 request.cookies.set_cookie(cookie)
743 def login(self, allow_user_interaction):
744 # Cookie authentication is always interactive (it asks for user name).
745 if not allow_user_interaction:
746 print >> sys.stderr, 'Cookie authentication requires interactive login'
748 # To be used from inside AuthServer.
749 cookie_jar = self.cookie_jar
750 # RPC server that uses AuthenticationSupport's cookie jar.
751 class AuthServer(upload.AbstractRpcServer):
752 def _GetOpener(self):
753 # Authentication code needs to know about 302 response.
754 # So make OpenerDirector without HTTPRedirectHandler.
755 opener = urllib2.OpenerDirector()
756 opener.add_handler(urllib2.ProxyHandler())
757 opener.add_handler(urllib2.UnknownHandler())
758 opener.add_handler(urllib2.HTTPHandler())
759 opener.add_handler(urllib2.HTTPDefaultErrorHandler())
760 opener.add_handler(urllib2.HTTPSHandler())
761 opener.add_handler(urllib2.HTTPErrorProcessor())
762 opener.add_handler(urllib2.HTTPCookieProcessor(cookie_jar))
764 def PerformAuthentication(self):
766 return self.authenticated
769 rpc_server = AuthServer(self.urlhost, self.get_credentials)
770 return rpc_server.PerformAuthentication()
773 domain = urlparse.urlparse(self.urlhost).netloc
775 with self.cookie_jar:
776 self.cookie_jar.clear(domain)
780 def get_credentials(self):
781 """Called during authentication process to get the credentials.
783 May be called multiple times if authentication fails.
785 Returns tuple (email, password).
787 if self.email and self.password:
788 return (self.email, self.password)
789 self._keyring = self._keyring or upload.KeyringCreds(self.urlhost,
790 self.urlhost, self.email)
791 return self._keyring.GetUserCredentials()
794 # TODO(vadimsh): Remove once everything is using OAuth or HMAC-based auth.
795 class ThreadSafeCookieJar(cookielib.MozillaCookieJar):
796 """MozillaCookieJar with thread safe load and save."""
799 """Context manager interface."""
802 def __exit__(self, *_args):
803 """Saves cookie jar when exiting the block."""
807 def load(self, filename=None, ignore_discard=False, ignore_expires=False):
808 """Loads cookies from the file if it exists."""
809 filename = os.path.expanduser(filename or self.filename)
810 with self._cookies_lock:
811 if os.path.exists(filename):
813 cookielib.MozillaCookieJar.load(
814 self, filename, ignore_discard, ignore_expires)
815 logging.debug('Loaded cookies from %s', filename)
816 except (cookielib.LoadError, IOError):
820 fd = os.open(filename, os.O_CREAT, 0600)
823 logging.debug('Failed to create %s', filename)
825 os.chmod(filename, 0600)
827 logging.debug('Failed to fix mode for %s', filename)
829 def save(self, filename=None, ignore_discard=False, ignore_expires=False):
830 """Saves cookies to the file, completely overwriting it."""
831 logging.debug('Saving cookies to %s', filename or self.filename)
832 with self._cookies_lock:
834 cookielib.MozillaCookieJar.save(
835 self, filename, ignore_discard, ignore_expires)
837 logging.error('Failed to save %s', filename)
840 class OAuthAuthenticator(Authenticator):
841 """Uses OAuth Authorization header to authenticate requests."""
843 def __init__(self, urlhost, options):
844 super(OAuthAuthenticator, self).__init__()
845 self.urlhost = urlhost
846 self.options = options
847 self._lock = threading.Lock()
848 self._access_token = oauth.load_access_token(self.urlhost, self.options)
850 def authorize(self, request):
852 if self._access_token:
853 request.headers['Authorization'] = 'Bearer %s' % self._access_token
855 def login(self, allow_user_interaction):
857 self._access_token = oauth.create_access_token(
858 self.urlhost, self.options, allow_user_interaction)
859 return self._access_token is not None
863 self._access_token = None
864 oauth.purge_access_token(self.urlhost, self.options)
867 class RetryAttempt(object):
868 """Contains information about current retry attempt.
870 Yielded from retry_loop.
873 def __init__(self, attempt, remaining):
874 """Information about current attempt in retry loop:
875 |attempt| - zero based index of attempt.
876 |remaining| - how much time is left before retry loop finishes retries.
878 self.attempt = attempt
879 self.remaining = remaining
880 self.skip_sleep = False
883 def calculate_sleep_before_retry(attempt, max_duration):
884 """How long to sleep before retrying an attempt in retry_loop."""
885 # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
888 # random.random() returns [0.0, 1.0). Starts with relatively short waiting
889 # time by starting with 1.5/2+1.5^-1 median offset.
890 duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
891 assert duration > 0.1
892 duration = min(MAX_SLEEP, duration)
894 duration = min(max_duration, duration)
898 def sleep_before_retry(attempt, max_duration):
899 """Sleeps for some amount of time when retrying the attempt in retry_loop.
901 To be mocked in tests.
903 time.sleep(calculate_sleep_before_retry(attempt, max_duration))
907 """Used by retry loop to get current time.
909 To be mocked in tests.
914 def retry_loop(max_attempts=None, timeout=None):
915 """Yields whenever new attempt to perform some action is needed.
917 Yields instances of RetryAttempt class that contains information about current
918 attempt. Setting |skip_sleep| attribute of RetryAttempt to True will cause
919 retry loop to run next attempt immediately.
921 start = current_time()
922 for attempt in itertools.count():
924 if max_attempts and attempt == max_attempts:
926 # Retried for too long?
927 remaining = (timeout - (current_time() - start)) if timeout else None
928 if remaining is not None and remaining < 0:
930 # Kick next iteration.
931 attemp_obj = RetryAttempt(attempt, remaining)
933 if attemp_obj.skip_sleep:
935 # Only sleep if we are going to try again.
936 if max_attempts and attempt != max_attempts - 1:
937 remaining = (timeout - (current_time() - start)) if timeout else None
938 if remaining is not None and remaining < 0:
940 sleep_before_retry(attempt, remaining)