Upstream version 11.40.277.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / utils / net.py
1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
4
5 """Classes and functions for generic network communication over HTTP."""
6
7 import cookielib
8 import cStringIO as StringIO
9 import httplib
10 import itertools
11 import json
12 import logging
13 import math
14 import os
15 import random
16 import re
17 import socket
18 import ssl
19 import threading
20 import time
21 import urllib
22 import urlparse
23
24 from third_party import requests
25 from third_party.requests import adapters
26 from third_party.requests import structures
27
28 from utils import oauth
29 from utils import tools
30
31
32 # TODO(vadimsh): Remove this once we don't have to support python 2.6 anymore.
33 def monkey_patch_httplib():
34   """Patch httplib.HTTPConnection to have '_tunnel_host' attribute.
35
36   'requests' library (>= v2) accesses 'HTTPConnection._tunnel_host' attribute
37   added only in python 2.6.3. This function patches HTTPConnection to have it
38   on python 2.6.2 as well.
39   """
40   conn = httplib.HTTPConnection('example.com')
41   if not hasattr(conn, '_tunnel_host'):
42     httplib.HTTPConnection._tunnel_host = None
43 monkey_patch_httplib()
44
45
46 # Default maximum number of attempts to trying opening a url before aborting.
47 URL_OPEN_MAX_ATTEMPTS = 30
48
49 # Default timeout when retrying.
50 URL_OPEN_TIMEOUT = 6*60.
51
52 # Default timeout when reading from open HTTP connection.
53 URL_READ_TIMEOUT = 60
54
55 # Content type for url encoded POST body.
56 URL_ENCODED_FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded'
57 # Content type for JSON body.
58 JSON_CONTENT_TYPE = 'application/json; charset=UTF-8'
59 # Default content type for POST body.
60 DEFAULT_CONTENT_TYPE = URL_ENCODED_FORM_CONTENT_TYPE
61
62 # Content type -> function that encodes a request body.
63 CONTENT_ENCODERS = {
64   URL_ENCODED_FORM_CONTENT_TYPE:
65     urllib.urlencode,
66   JSON_CONTENT_TYPE:
67     lambda x: json.dumps(x, sort_keys=True, separators=(',', ':')),
68 }
69
70
71 # Google Storage URL regular expression.
72 GS_STORAGE_HOST_URL_RE = re.compile(r'https://.*\.storage\.googleapis\.com')
73
74 # All possible authentication methods with corresponding types of method config
75 # object (or None if method is not configurable). See configure_auth.
76 # Order is important: it's visible in commands --help output.
77 AUTH_METHODS = [
78   ('oauth', oauth.OAuthConfig),
79   ('bot', None),
80   ('none', None),
81 ]
82
83 # Global (for now) map: server URL (http://example.com) -> HttpService instance.
84 # Used by get_http_service to cache HttpService instances.
85 _http_services = {}
86 _http_services_lock = threading.Lock()
87
88 # This lock ensures that user won't be confused with multiple concurrent
89 # login prompts.
90 _auth_lock = threading.Lock()
91
92 # Set in 'configure_auth'. If configure_auth is not called before first request,
93 # will be set to defaults generated by 'get_default_auth_config'.
94 _auth_method = None
95 _auth_method_config = None
96
97
98 class NetError(IOError):
99   """Generic network related error."""
100
101   def __init__(self, inner_exc=None):
102     super(NetError, self).__init__(str(inner_exc or self.__doc__))
103     self.inner_exc = inner_exc
104
105   def format(self, verbose=False):
106     """Human readable description with detailed information about the error."""
107     out = [str(self.inner_exc)]
108     if verbose:
109       headers = None
110       body = None
111       if isinstance(self.inner_exc, requests.HTTPError):
112         headers = self.inner_exc.response.headers.items()
113         body = self.inner_exc.response.content
114       if headers or body:
115         out.append('----------')
116         if headers:
117           for header, value in headers:
118             if not header.startswith('x-'):
119               out.append('%s: %s' % (header.capitalize(), value))
120           out.append('')
121         out.append(body or '<empty body>')
122         out.append('----------')
123     return '\n'.join(out)
124
125
126 class TimeoutError(NetError):
127   """Timeout while reading HTTP response."""
128
129
130 class ConnectionError(NetError):
131   """Failed to connect to the server."""
132
133
134 class HttpError(NetError):
135   """Server returned HTTP error code."""
136
137   def __init__(self, code, inner_exc=None):
138     super(HttpError, self).__init__(inner_exc)
139     self.code = code
140
141
142 def url_open(url, **kwargs):  # pylint: disable=W0621
143   """Attempts to open the given url multiple times.
144
145   |data| can be either:
146     - None for a GET request
147     - str for pre-encoded data
148     - list for data to be encoded
149     - dict for data to be encoded
150
151   See HttpService.request for a full list of arguments.
152
153   Returns HttpResponse object, where the response may be read from, or None
154   if it was unable to connect.
155   """
156   urlhost, urlpath = split_server_request_url(url)
157   service = get_http_service(urlhost)
158   return service.request(urlpath, **kwargs)
159
160
161 def url_read(url, **kwargs):
162   """Attempts to open the given url multiple times and read all data from it.
163
164   Accepts same arguments as url_open function.
165
166   Returns all data read or None if it was unable to connect or read the data.
167   """
168   kwargs['stream'] = False
169   response = url_open(url, **kwargs)
170   if not response:
171     return None
172   try:
173     return response.read()
174   except TimeoutError:
175     return None
176
177
178 def url_read_json(url, **kwargs):
179   """Attempts to open the given url multiple times and read all data from it.
180
181   Accepts same arguments as url_open function.
182
183   Returns all data read or None if it was unable to connect or read the data.
184   """
185   urlhost, urlpath = split_server_request_url(url)
186   service = get_http_service(urlhost)
187   try:
188     return service.json_request(urlpath, **kwargs)
189   except TimeoutError:
190     return None
191
192
193 def url_retrieve(filepath, url, **kwargs):
194   """Downloads an URL to a file. Returns True on success."""
195   response = url_open(url, **kwargs)
196   if not response:
197     return False
198   try:
199     with open(filepath, 'wb') as f:
200       while True:
201         buf = response.read(65536)
202         if not buf:
203           return True
204         f.write(buf)
205   except (IOError, OSError, TimeoutError):
206     try:
207       os.remove(filepath)
208     except IOError:
209       pass
210     return False
211
212
213 def split_server_request_url(url):
214   """Splits the url into scheme+netloc and path+params+query+fragment."""
215   url_parts = list(urlparse.urlparse(url))
216   urlhost = '%s://%s' % (url_parts[0], url_parts[1])
217   urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
218   return urlhost, urlpath
219
220
221 def get_http_service(urlhost, allow_cached=True):
222   """Returns existing or creates new instance of HttpService that can send
223   requests to given base urlhost.
224   """
225   def new_service():
226     return HttpService(
227         urlhost,
228         engine=RequestsLibEngine(),
229         authenticator=create_authenticator(urlhost))
230
231   # Ensure consistency in url naming.
232   urlhost = str(urlhost).lower().rstrip('/')
233
234   if not allow_cached:
235     return new_service()
236   with _http_services_lock:
237     service = _http_services.get(urlhost)
238     if not service:
239       service = new_service()
240       _http_services[urlhost] = service
241     return service
242
243
244 def get_default_auth_config():
245   """Returns auth configuration used by default if configure_auth is not called.
246
247   If running in a headless mode on bots, will use 'bot' auth, otherwise
248   'oauth' with default oauth config.
249
250   Returns pair (auth method name, auth method config).
251   """
252   if tools.is_headless():
253     return 'bot', None
254   else:
255     return 'oauth', oauth.make_oauth_config()
256
257
258 def configure_auth(method, config=None):
259   """Defines what authentication methods to use.
260
261   Possible authentication methods are:
262     'bot' - use HMAC authentication based on a secret key.
263     'oauth' - use oauth-based authentication.
264     'none' - do not use authentication.
265
266   Arguments:
267     method: what method to use.
268     config: object that holds configuration for authentication method.
269         Concrete type depends on a method used (see AUTH_METHODS for expected
270         type). Passed to corresponding authenticator instance.
271   """
272   global _auth_method
273   global _auth_method_config
274   assert method in dict(AUTH_METHODS), method
275   config_type = dict(AUTH_METHODS)[method]
276   if config_type and not isinstance(config, config_type):
277     raise TypeError(
278         'Expecting \'%s\' auth config to be of type %s. Got %s instead.' %
279         (method, config_type, type(config)))
280   elif not config_type and config is not None:
281     raise TypeError('Auth method \'%s\' is not configurable.' % method)
282   with _auth_lock:
283     _auth_method = method
284     _auth_method_config = config
285
286
287 def get_auth_method():
288   """Returns authentication method used by default.
289
290   Set with 'configure_auth'. See 'configure_auth' doc string for existing
291   auth method.
292   """
293   return _auth_method
294
295
296 def create_authenticator(urlhost):
297   """Makes Authenticator instance used by HttpService to access |urlhost|."""
298   # We use signed URL for Google Storage, no need for special authentication.
299   if GS_STORAGE_HOST_URL_RE.match(urlhost):
300     return None
301
302   # Lazy initialize auth config with defaults.
303   if not _auth_method:
304     default_method, default_config = get_default_auth_config()
305     configure_auth(default_method, default_config)
306
307   # Use configuration set with 'configure_auth'.
308   with _auth_lock:
309     if _auth_method == 'bot':
310       # TODO(vadimsh): Implement it. Use IP whitelist (that doesn't require
311       # any authenticator instance) for now.
312       return None
313     elif _auth_method == 'oauth':
314       return OAuthAuthenticator(urlhost, _auth_method_config)
315     elif _auth_method == 'none':
316       return None
317   raise AssertionError('Invalid auth method: %s' % _auth_method)
318
319
320 def get_case_insensitive_dict(original):
321   """Given a dict with string keys returns new CaseInsensitiveDict.
322
323   Raises ValueError if there are duplicate keys.
324   """
325   normalized = structures.CaseInsensitiveDict(original or {})
326   if len(normalized) != len(original):
327     raise ValueError('Duplicate keys in: %s' % repr(original))
328   return normalized
329
330
331 class HttpService(object):
332   """Base class for a class that provides an API to HTTP based service:
333     - Provides 'request' method.
334     - Supports automatic request retries.
335     - Thread safe.
336   """
337
338   def __init__(self, urlhost, engine, authenticator=None):
339     self.urlhost = urlhost
340     self.engine = engine
341     self.authenticator = authenticator
342
343   @staticmethod
344   def is_transient_http_error(code, retry_404, retry_50x):
345     """Returns True if given HTTP response code is a transient error."""
346     # Google Storage can return this and it should be retried.
347     if code == 408:
348       return True
349     # Retry 404 only if allowed by the caller.
350     if code == 404:
351       return retry_404
352     # All other 4** errors are fatal.
353     if code < 500:
354       return False
355     # Retry >= 500 error only if allowed by the caller.
356     return retry_50x
357
358   @staticmethod
359   def encode_request_body(body, content_type):
360     """Returns request body encoded according to its content type."""
361     # No body or it is already encoded.
362     if body is None or isinstance(body, str):
363       return body
364     # Any body should have content type set.
365     assert content_type, 'Request has body, but no content type'
366     encoder = CONTENT_ENCODERS.get(content_type)
367     assert encoder, ('Unknown content type %s' % content_type)
368     return encoder(body)
369
370   def login(self, allow_user_interaction):
371     """Runs authentication flow to refresh short lived access token.
372
373     Authentication flow may need to interact with the user (read username from
374     stdin, open local browser for OAuth2, etc.). If interaction is required and
375     |allow_user_interaction| is False, the login will silently be considered
376     failed (i.e. this function returns False).
377
378     'request' method always uses non-interactive login, so long-lived
379     authentication tokens (OAuth2 refresh token, etc) have to be set up
380     manually by developer (by calling 'auth.py login' perhaps) prior running
381     any swarming or isolate scripts.
382     """
383     # Use global lock to ensure two authentication flows never run in parallel.
384     with _auth_lock:
385       if self.authenticator:
386         return self.authenticator.login(allow_user_interaction)
387       return False
388
389   def logout(self):
390     """Purges access credentials from local cache."""
391     if self.authenticator:
392       self.authenticator.logout()
393
394   def request(
395       self,
396       urlpath,
397       data=None,
398       content_type=None,
399       max_attempts=URL_OPEN_MAX_ATTEMPTS,
400       retry_404=False,
401       retry_50x=True,
402       timeout=URL_OPEN_TIMEOUT,
403       read_timeout=URL_READ_TIMEOUT,
404       stream=True,
405       method=None,
406       headers=None):
407     """Attempts to open the given url multiple times.
408
409     |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
410
411     |data| can be either:
412       - None for a GET request
413       - str for pre-encoded data
414       - list for data to be form-encoded
415       - dict for data to be form-encoded
416
417     - Optionally retries HTTP 404 and 50x.
418     - Retries up to |max_attempts| times. If None or 0, there's no limit in the
419       number of retries.
420     - Retries up to |timeout| duration in seconds. If None or 0, there's no
421       limit in the time taken to do retries.
422     - If both |max_attempts| and |timeout| are None or 0, this functions retries
423       indefinitely.
424
425     If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
426     when performing the request. By default it's GET if |data| is None and POST
427     if |data| is not None.
428
429     If |headers| is given, it should be a dict with HTTP headers to append
430     to request. Caller is responsible for providing headers that make sense.
431
432     If |read_timeout| is not None will configure underlying socket to
433     raise TimeoutError exception whenever there's no response from the server
434     for more than |read_timeout| seconds. It can happen during any read
435     operation so once you pass non-None |read_timeout| be prepared to handle
436     these exceptions in subsequent reads from the stream.
437
438     Returns a file-like object, where the response may be read from, or None
439     if it was unable to connect. If |stream| is False will read whole response
440     into memory buffer before returning file-like object that reads from this
441     memory buffer.
442     """
443     assert urlpath and urlpath[0] == '/', urlpath
444
445     if data is not None:
446       assert method in (None, 'POST', 'PUT')
447       method = method or 'POST'
448       content_type = content_type or DEFAULT_CONTENT_TYPE
449       body = self.encode_request_body(data, content_type)
450     else:
451       assert method in (None, 'GET')
452       method = method or 'GET'
453       body = None
454       assert not content_type, 'Can\'t use content_type on GET'
455
456     # Prepare request info.
457     parsed = urlparse.urlparse('/' + urlpath.lstrip('/'))
458     resource_url = urlparse.urljoin(self.urlhost, parsed.path)
459     query_params = urlparse.parse_qsl(parsed.query)
460
461     # Prepare headers.
462     headers = get_case_insensitive_dict(headers or {})
463     if body is not None:
464       headers['Content-Length'] = len(body)
465       if content_type:
466         headers['Content-Type'] = content_type
467
468     last_error = None
469     auth_attempted = False
470
471     for attempt in retry_loop(max_attempts, timeout):
472       # Log non-first attempt.
473       if attempt.attempt:
474         logging.warning(
475             'Retrying request %s, attempt %d/%d...',
476             resource_url, attempt.attempt, max_attempts)
477
478       try:
479         # Prepare and send a new request.
480         request = HttpRequest(
481             method, resource_url, query_params, body,
482             headers, read_timeout, stream)
483         if self.authenticator:
484           self.authenticator.authorize(request)
485         response = self.engine.perform_request(request)
486         logging.debug('Request %s succeeded', request.get_full_url())
487         return response
488
489       except (ConnectionError, TimeoutError) as e:
490         last_error = e
491         logging.warning(
492             'Unable to open url %s on attempt %d.\n%s',
493             request.get_full_url(), attempt.attempt, e.format())
494         continue
495
496       except HttpError as e:
497         last_error = e
498
499         # Access denied -> authenticate.
500         if e.code in (401, 403):
501           logging.warning(
502               'Authentication is required for %s on attempt %d.\n%s',
503               request.get_full_url(), attempt.attempt, e.format())
504           # Try to authenticate only once. If it doesn't help, then server does
505           # not support authentication or user doesn't have required access.
506           if not auth_attempted:
507             auth_attempted = True
508             if self.login(allow_user_interaction=False):
509               # Success! Run request again immediately.
510               attempt.skip_sleep = True
511               continue
512           # Authentication attempt was unsuccessful.
513           logging.error(
514               'Unable to authenticate to %s (%s). Use auth.py to login: '
515               'python auth.py login --service=%s',
516               self.urlhost, e.format(), self.urlhost)
517           return None
518
519         # Hit a error that can not be retried -> stop retry loop.
520         if not self.is_transient_http_error(e.code, retry_404, retry_50x):
521           # This HttpError means we reached the server and there was a problem
522           # with the request, so don't retry.
523           logging.error(
524               'Able to connect to %s but an exception was thrown.\n%s',
525               request.get_full_url(), e.format(verbose=True))
526           return None
527
528         # Retry all other errors.
529         logging.warning(
530             'Server responded with error on %s on attempt %d.\n%s',
531             request.get_full_url(), attempt.attempt, e.format())
532         continue
533
534     logging.error(
535         'Unable to open given url, %s, after %d attempts.\n%s',
536         request.get_full_url(), max_attempts, last_error.format(verbose=True))
537     return None
538
539   def json_request(self, urlpath, data=None, **kwargs):
540     """Sends JSON request to the server and parses JSON response it get back.
541
542     Arguments:
543       urlpath: relative request path (e.g. '/auth/v1/...').
544       data: object to serialize to JSON and sent in the request.
545
546     See self.request() for more details.
547
548     Returns:
549       Deserialized JSON response on success, None on error or timeout.
550     """
551     content_type = JSON_CONTENT_TYPE if data is not None else None
552     response = self.request(
553         urlpath, content_type=content_type, data=data, stream=False, **kwargs)
554     if not response:
555       return None
556     try:
557       text = response.read()
558       if not text:
559         return None
560     except TimeoutError:
561       return None
562     try:
563       return json.loads(text)
564     except ValueError:
565       logging.error('Not a JSON response when calling %s: %s', urlpath, text)
566       return None
567
568
569 class HttpRequest(object):
570   """Request to HttpService."""
571
572   def __init__(self, method, url, params, body, headers, timeout, stream):
573     """Arguments:
574       |method| - HTTP method to use
575       |url| - relative URL to the resource, without query parameters
576       |params| - list of (key, value) pairs to put into GET parameters
577       |body| - encoded body of the request (None or str)
578       |headers| - dict with request headers
579       |timeout| - socket read timeout (None to disable)
580       |stream| - True to stream response from socket
581     """
582     self.method = method
583     self.url = url
584     self.params = params[:]
585     self.body = body
586     self.headers = headers.copy()
587     self.timeout = timeout
588     self.stream = stream
589     self._cookies = None
590
591   @property
592   def cookies(self):
593     """CookieJar object that will be used for cookies in this request."""
594     if self._cookies is None:
595       self._cookies = cookielib.CookieJar()
596     return self._cookies
597
598   def get_full_url(self):
599     """Resource URL with url-encoded GET parameters."""
600     if not self.params:
601       return self.url
602     else:
603       return '%s?%s' % (self.url, urllib.urlencode(self.params))
604
605   def make_fake_response(self, content='', headers=None):
606     """Makes new fake HttpResponse to this request, useful in tests."""
607     return HttpResponse.get_fake_response(content, self.get_full_url(), headers)
608
609
610 class HttpResponse(object):
611   """Response from HttpService."""
612
613   def __init__(self, stream, url, headers):
614     self._stream = stream
615     self._url = url
616     self._headers = get_case_insensitive_dict(headers)
617     self._read = 0
618
619   @property
620   def content_length(self):
621     """Total length to the response or None if not known in advance."""
622     length = self.get_header('Content-Length')
623     return int(length) if length is not None else None
624
625   def get_header(self, header):
626     """Returns response header (as str) or None if no such header."""
627     return self._headers.get(header)
628
629   def read(self, size=None):
630     """Reads up to |size| bytes from the stream and returns them.
631
632     If |size| is None reads all available bytes.
633
634     Raises TimeoutError on read timeout.
635     """
636     try:
637       # cStringIO has a bug: stream.read(None) is not the same as stream.read().
638       data = self._stream.read() if size is None else self._stream.read(size)
639       self._read += len(data)
640       return data
641     except (socket.timeout, ssl.SSLError, requests.Timeout) as e:
642       logging.error('Timeout while reading from %s, read %d of %s: %s',
643           self._url, self._read, self.content_length, e)
644       raise TimeoutError(e)
645
646   @classmethod
647   def get_fake_response(cls, content, url, headers=None):
648     """Returns HttpResponse with predefined content, useful in tests."""
649     headers = dict(headers or {})
650     headers['Content-Length'] = len(content)
651     return cls(StringIO.StringIO(content), url, headers)
652
653
654 class Authenticator(object):
655   """Base class for objects that know how to authenticate into http services."""
656
657   def authorize(self, request):
658     """Add authentication information to the request."""
659
660   def login(self, allow_user_interaction):
661     """Run interactive authentication flow."""
662     raise NotImplementedError()
663
664   def logout(self):
665     """Purges access credentials from local cache."""
666
667
668 class RequestsLibEngine(object):
669   """Class that knows how to execute HttpRequests via requests library."""
670
671   # Preferred number of connections in a connection pool.
672   CONNECTION_POOL_SIZE = 64
673   # If True will not open more than CONNECTION_POOL_SIZE connections.
674   CONNECTION_POOL_BLOCK = False
675   # Maximum number of internal connection retries in a connection pool.
676   CONNECTION_RETRIES = 0
677
678   def __init__(self):
679     super(RequestsLibEngine, self).__init__()
680     self.session = requests.Session()
681     # Configure session.
682     self.session.trust_env = False
683     self.session.verify = tools.get_cacerts_bundle()
684     # Configure connection pools.
685     for protocol in ('https://', 'http://'):
686       self.session.mount(protocol, adapters.HTTPAdapter(
687           pool_connections=self.CONNECTION_POOL_SIZE,
688           pool_maxsize=self.CONNECTION_POOL_SIZE,
689           max_retries=self.CONNECTION_RETRIES,
690           pool_block=self.CONNECTION_POOL_BLOCK))
691
692   def perform_request(self, request):
693     """Sends a HttpRequest to the server and reads back the response.
694
695     Returns HttpResponse.
696
697     Raises:
698       ConnectionError - failed to establish connection to the server.
699       TimeoutError - timeout while connecting or reading response.
700       HttpError - server responded with >= 400 error code.
701     """
702     try:
703       response = self.session.request(
704           method=request.method,
705           url=request.url,
706           params=request.params,
707           data=request.body,
708           headers=request.headers,
709           cookies=request.cookies,
710           timeout=request.timeout,
711           stream=request.stream)
712       response.raise_for_status()
713       if request.stream:
714         stream = response.raw
715       else:
716         stream = StringIO.StringIO(response.content)
717       return HttpResponse(stream, request.get_full_url(), response.headers)
718     except requests.Timeout as e:
719       raise TimeoutError(e)
720     except requests.HTTPError as e:
721       raise HttpError(e.response.status_code, e)
722     except (requests.ConnectionError, socket.timeout, ssl.SSLError) as e:
723       raise ConnectionError(e)
724
725
726 class OAuthAuthenticator(Authenticator):
727   """Uses OAuth Authorization header to authenticate requests."""
728
729   def __init__(self, urlhost, config):
730     super(OAuthAuthenticator, self).__init__()
731     assert isinstance(config, oauth.OAuthConfig)
732     self.urlhost = urlhost
733     self.config = config
734     self._lock = threading.Lock()
735     self._access_token_known = False
736     self._access_token = None
737
738   def authorize(self, request):
739     with self._lock:
740       if not self._access_token_known:
741         self._access_token = oauth.load_access_token(self.urlhost, self.config)
742         self._access_token_known = True
743       if self._access_token:
744         request.headers['Authorization'] = 'Bearer %s' % self._access_token
745
746   def login(self, allow_user_interaction):
747     with self._lock:
748       self._access_token = oauth.create_access_token(
749           self.urlhost, self.config, allow_user_interaction)
750       self._access_token_known = True
751       return self._access_token is not None
752
753   def logout(self):
754     with self._lock:
755       self._access_token = None
756       self._access_token_known = True
757       oauth.purge_access_token(self.urlhost, self.config)
758
759
760 class RetryAttempt(object):
761   """Contains information about current retry attempt.
762
763   Yielded from retry_loop.
764   """
765
766   def __init__(self, attempt, remaining):
767     """Information about current attempt in retry loop:
768       |attempt| - zero based index of attempt.
769       |remaining| - how much time is left before retry loop finishes retries.
770     """
771     self.attempt = attempt
772     self.remaining = remaining
773     self.skip_sleep = False
774
775
776 def calculate_sleep_before_retry(attempt, max_duration):
777   """How long to sleep before retrying an attempt in retry_loop."""
778   # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
779   # survive.
780   MAX_SLEEP = 10.
781   # random.random() returns [0.0, 1.0). Starts with relatively short waiting
782   # time by starting with 1.5/2+1.5^-1 median offset.
783   duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
784   assert duration > 0.1
785   duration = min(MAX_SLEEP, duration)
786   if max_duration:
787     duration = min(max_duration, duration)
788   return duration
789
790
791 def sleep_before_retry(attempt, max_duration):
792   """Sleeps for some amount of time when retrying the attempt in retry_loop.
793
794   To be mocked in tests.
795   """
796   time.sleep(calculate_sleep_before_retry(attempt, max_duration))
797
798
799 def current_time():
800   """Used by retry loop to get current time.
801
802   To be mocked in tests.
803   """
804   return time.time()
805
806
807 def retry_loop(max_attempts=None, timeout=None):
808   """Yields whenever new attempt to perform some action is needed.
809
810   Yields instances of RetryAttempt class that contains information about current
811   attempt. Setting |skip_sleep| attribute of RetryAttempt to True will cause
812   retry loop to run next attempt immediately.
813   """
814   start = current_time()
815   for attempt in itertools.count():
816     # Too many attempts?
817     if max_attempts and attempt == max_attempts:
818       break
819     # Retried for too long?
820     remaining = (timeout - (current_time() - start)) if timeout else None
821     if remaining is not None and remaining < 0:
822       break
823     # Kick next iteration.
824     attemp_obj = RetryAttempt(attempt, remaining)
825     yield attemp_obj
826     if attemp_obj.skip_sleep:
827       continue
828     # Only sleep if we are going to try again.
829     if max_attempts and attempt != max_attempts - 1:
830       remaining = (timeout - (current_time() - start)) if timeout else None
831       if remaining is not None and remaining < 0:
832         break
833       sleep_before_retry(attempt, remaining)