Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / utils / net.py
1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
4
5 """Classes and functions for generic network communication over HTTP."""
6
7 import cookielib
8 import cStringIO as StringIO
9 import httplib
10 import itertools
11 import json
12 import logging
13 import math
14 import os
15 import random
16 import re
17 import socket
18 import ssl
19 import threading
20 import time
21 import urllib
22 import urlparse
23
24 from third_party import requests
25 from third_party.requests import adapters
26 from third_party.requests import structures
27
28 from utils import oauth
29 from utils import tools
30
31
32 # TODO(vadimsh): Remove this once we don't have to support python 2.6 anymore.
33 def monkey_patch_httplib():
34   """Patch httplib.HTTPConnection to have '_tunnel_host' attribute.
35
36   'requests' library (>= v2) accesses 'HTTPConnection._tunnel_host' attribute
37   added only in python 2.6.3. This function patches HTTPConnection to have it
38   on python 2.6.2 as well.
39   """
40   conn = httplib.HTTPConnection('example.com')
41   if not hasattr(conn, '_tunnel_host'):
42     httplib.HTTPConnection._tunnel_host = None
43 monkey_patch_httplib()
44
45
46 # Default maximum number of attempts to trying opening a url before aborting.
47 URL_OPEN_MAX_ATTEMPTS = 30
48
49 # Default timeout when retrying.
50 URL_OPEN_TIMEOUT = 6*60.
51
52 # Default timeout when reading from open HTTP connection.
53 URL_READ_TIMEOUT = 60
54
55 # Content type for url encoded POST body.
56 URL_ENCODED_FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded'
57 # Content type for JSON body.
58 JSON_CONTENT_TYPE = 'application/json; charset=UTF-8'
59 # Default content type for POST body.
60 DEFAULT_CONTENT_TYPE = URL_ENCODED_FORM_CONTENT_TYPE
61
62 # Content type -> function that encodes a request body.
63 CONTENT_ENCODERS = {
64   URL_ENCODED_FORM_CONTENT_TYPE:
65     urllib.urlencode,
66   JSON_CONTENT_TYPE:
67     lambda x: json.dumps(x, sort_keys=True, separators=(',', ':')),
68 }
69
70
71 # Google Storage URL regular expression.
72 GS_STORAGE_HOST_URL_RE = re.compile(r'https://.*\.storage\.googleapis\.com')
73
74 # All possible authentication methods with corresponding types of method config
75 # object (or None if method is not configurable). See configure_auth.
76 # Order is important: it's visible in commands --help output.
77 AUTH_METHODS = [
78   ('oauth', oauth.OAuthConfig),
79   ('bot', None),
80   ('none', None),
81 ]
82
83 # Global (for now) map: server URL (http://example.com) -> HttpService instance.
84 # Used by get_http_service to cache HttpService instances.
85 _http_services = {}
86 _http_services_lock = threading.Lock()
87
88 # This lock ensures that user won't be confused with multiple concurrent
89 # login prompts.
90 _auth_lock = threading.Lock()
91
92 # Set in 'configure_auth'. If configure_auth is not called before first request,
93 # will be set to defaults generated by 'get_default_auth_config'.
94 _auth_method = None
95 _auth_method_config = None
96
97
98 class NetError(IOError):
99   """Generic network related error."""
100
101   def __init__(self, inner_exc=None):
102     super(NetError, self).__init__(str(inner_exc or self.__doc__))
103     self.inner_exc = inner_exc
104
105   def format(self, verbose=False):
106     """Human readable description with detailed information about the error."""
107     out = [str(self.inner_exc)]
108     if verbose:
109       headers = None
110       body = None
111       if isinstance(self.inner_exc, requests.HTTPError):
112         headers = self.inner_exc.response.headers.items()
113         body = self.inner_exc.response.content
114       if headers or body:
115         out.append('----------')
116         if headers:
117           for header, value in headers:
118             if not header.startswith('x-'):
119               out.append('%s: %s' % (header.capitalize(), value))
120           out.append('')
121         out.append(body or '<empty body>')
122         out.append('----------')
123     return '\n'.join(out)
124
125
126 class TimeoutError(NetError):
127   """Timeout while reading HTTP response."""
128
129
130 class ConnectionError(NetError):
131   """Failed to connect to the server."""
132
133
134 class HttpError(NetError):
135   """Server returned HTTP error code."""
136
137   def __init__(self, code, inner_exc=None):
138     super(HttpError, self).__init__(inner_exc)
139     self.code = code
140
141
142 def url_open(url, **kwargs):  # pylint: disable=W0621
143   """Attempts to open the given url multiple times.
144
145   |data| can be either:
146     - None for a GET request
147     - str for pre-encoded data
148     - list for data to be encoded
149     - dict for data to be encoded
150
151   See HttpService.request for a full list of arguments.
152
153   Returns HttpResponse object, where the response may be read from, or None
154   if it was unable to connect.
155   """
156   urlhost, urlpath = split_server_request_url(url)
157   service = get_http_service(urlhost)
158   return service.request(urlpath, **kwargs)
159
160
161 def url_read(url, **kwargs):
162   """Attempts to open the given url multiple times and read all data from it.
163
164   Accepts same arguments as url_open function.
165
166   Returns all data read or None if it was unable to connect or read the data.
167   """
168   kwargs['stream'] = False
169   response = url_open(url, **kwargs)
170   if not response:
171     return None
172   try:
173     return response.read()
174   except TimeoutError:
175     return None
176
177
178 def url_read_json(url, **kwargs):
179   """Attempts to open the given url multiple times and read all data from it.
180
181   Accepts same arguments as url_open function.
182
183   Returns all data read or None if it was unable to connect or read the data.
184   """
185   urlhost, urlpath = split_server_request_url(url)
186   service = get_http_service(urlhost)
187   try:
188     return service.json_request(urlpath, **kwargs)
189   except TimeoutError:
190     return None
191
192
193 def url_retrieve(filepath, url, **kwargs):
194   """Downloads an URL to a file. Returns True on success."""
195   response = url_open(url, **kwargs)
196   if not response:
197     return False
198   try:
199     with open(filepath, 'wb') as f:
200       while True:
201         buf = response.read(65536)
202         if not buf:
203           return True
204         f.write(buf)
205   except (IOError, OSError, TimeoutError):
206     try:
207       os.remove(filepath)
208     except IOError:
209       pass
210     return False
211
212
213 def split_server_request_url(url):
214   """Splits the url into scheme+netloc and path+params+query+fragment."""
215   url_parts = list(urlparse.urlparse(url))
216   urlhost = '%s://%s' % (url_parts[0], url_parts[1])
217   urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
218   return urlhost, urlpath
219
220
221 def get_http_service(urlhost, allow_cached=True):
222   """Returns existing or creates new instance of HttpService that can send
223   requests to given base urlhost.
224   """
225   def new_service():
226     return HttpService(
227         urlhost,
228         engine=RequestsLibEngine(),
229         authenticator=create_authenticator(urlhost))
230
231   # Ensure consistency in url naming.
232   urlhost = str(urlhost).lower().rstrip('/')
233
234   if not allow_cached:
235     return new_service()
236   with _http_services_lock:
237     service = _http_services.get(urlhost)
238     if not service:
239       service = new_service()
240       _http_services[urlhost] = service
241     return service
242
243
244 def get_default_auth_config():
245   """Returns auth configuration used by default if configure_auth is not called.
246
247   If running in a headless mode on bots, will use 'bot' auth, otherwise
248   'oauth' with default oauth config.
249
250   Returns pair (auth method name, auth method config).
251   """
252   if tools.is_headless():
253     return 'bot', None
254   else:
255     return 'oauth', oauth.make_oauth_config()
256
257
258 def configure_auth(method, config=None):
259   """Defines what authentication methods to use.
260
261   Possible authentication methods are:
262     'bot' - use HMAC authentication based on a secret key.
263     'oauth' - use oauth-based authentication.
264     'none' - do not use authentication.
265
266   Arguments:
267     method: what method to use.
268     config: object that holds configuration for authentication method.
269         Concrete type depends on a method used (see AUTH_METHODS for expected
270         type). Passed to corresponding authenticator instance.
271   """
272   global _auth_method
273   global _auth_method_config
274   assert method in dict(AUTH_METHODS), method
275   config_type = dict(AUTH_METHODS)[method]
276   if config_type and not isinstance(config, config_type):
277     raise TypeError(
278         'Expecting \'%s\' auth config to be of type %s. Got %s instead.' %
279         (method, config_type, type(config)))
280   elif not config_type and config is not None:
281     raise TypeError('Auth method \'%s\' is not configurable.' % method)
282   with _auth_lock:
283     _auth_method = method
284     _auth_method_config = config
285
286
287 def get_auth_method():
288   """Returns authentication method used by default.
289
290   Set with 'configure_auth'. See 'configure_auth' doc string for existing
291   auth method.
292   """
293   return _auth_method
294
295
296 def create_authenticator(urlhost):
297   """Makes Authenticator instance used by HttpService to access |urlhost|."""
298   # We use signed URL for Google Storage, no need for special authentication.
299   if GS_STORAGE_HOST_URL_RE.match(urlhost):
300     return None
301
302   # Lazy initialize auth config with defaults.
303   if not _auth_method:
304     default_method, default_config = get_default_auth_config()
305     configure_auth(default_method, default_config)
306
307   # Use configuration set with 'configure_auth'.
308   with _auth_lock:
309     if _auth_method == 'bot':
310       # TODO(vadimsh): Implement it. Use IP whitelist (that doesn't require
311       # any authenticator instance) for now.
312       return None
313     elif _auth_method == 'oauth':
314       return OAuthAuthenticator(urlhost, _auth_method_config)
315     elif _auth_method == 'none':
316       return None
317   raise AssertionError('Invalid auth method: %s' % _auth_method)
318
319
320 def get_case_insensitive_dict(original):
321   """Given a dict with string keys returns new CaseInsensitiveDict.
322
323   Raises ValueError if there are duplicate keys.
324   """
325   normalized = structures.CaseInsensitiveDict(original or {})
326   if len(normalized) != len(original):
327     raise ValueError('Duplicate keys in: %s' % repr(original))
328   return normalized
329
330
331 class HttpService(object):
332   """Base class for a class that provides an API to HTTP based service:
333     - Provides 'request' method.
334     - Supports automatic request retries.
335     - Thread safe.
336   """
337
338   def __init__(self, urlhost, engine, authenticator=None):
339     self.urlhost = urlhost
340     self.engine = engine
341     self.authenticator = authenticator
342
343   @staticmethod
344   def is_transient_http_error(code, retry_404, retry_50x):
345     """Returns True if given HTTP response code is a transient error."""
346     # Google Storage can return this and it should be retried.
347     if code == 408:
348       return True
349     # Retry 404 only if allowed by the caller.
350     if code == 404:
351       return retry_404
352     # All other 4** errors are fatal.
353     if code < 500:
354       return False
355     # Retry >= 500 error only if allowed by the caller.
356     return retry_50x
357
358   @staticmethod
359   def encode_request_body(body, content_type):
360     """Returns request body encoded according to its content type."""
361     # No body or it is already encoded.
362     if body is None or isinstance(body, str):
363       return body
364     # Any body should have content type set.
365     assert content_type, 'Request has body, but no content type'
366     encoder = CONTENT_ENCODERS.get(content_type)
367     assert encoder, ('Unknown content type %s' % content_type)
368     return encoder(body)
369
370   def login(self, allow_user_interaction):
371     """Runs authentication flow to refresh short lived access token.
372
373     Authentication flow may need to interact with the user (read username from
374     stdin, open local browser for OAuth2, etc.). If interaction is required and
375     |allow_user_interaction| is False, the login will silently be considered
376     failed (i.e. this function returns False).
377
378     'request' method always uses non-interactive login, so long-lived
379     authentication tokens (OAuth2 refresh token, etc) have to be set up
380     manually by developer (by calling 'auth.py login' perhaps) prior running
381     any swarming or isolate scripts.
382     """
383     # Use global lock to ensure two authentication flows never run in parallel.
384     with _auth_lock:
385       if self.authenticator:
386         return self.authenticator.login(allow_user_interaction)
387       return False
388
389   def logout(self):
390     """Purges access credentials from local cache."""
391     if self.authenticator:
392       self.authenticator.logout()
393
394   def request(
395       self,
396       urlpath,
397       data=None,
398       content_type=None,
399       max_attempts=URL_OPEN_MAX_ATTEMPTS,
400       retry_404=False,
401       retry_50x=True,
402       timeout=URL_OPEN_TIMEOUT,
403       read_timeout=URL_READ_TIMEOUT,
404       stream=True,
405       method=None,
406       headers=None):
407     """Attempts to open the given url multiple times.
408
409     |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
410
411     |data| can be either:
412       - None for a GET request
413       - str for pre-encoded data
414       - list for data to be form-encoded
415       - dict for data to be form-encoded
416
417     - Optionally retries HTTP 404 and 50x.
418     - Retries up to |max_attempts| times. If None or 0, there's no limit in the
419       number of retries.
420     - Retries up to |timeout| duration in seconds. If None or 0, there's no
421       limit in the time taken to do retries.
422     - If both |max_attempts| and |timeout| are None or 0, this functions retries
423       indefinitely.
424
425     If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
426     when performing the request. By default it's GET if |data| is None and POST
427     if |data| is not None.
428
429     If |headers| is given, it should be a dict with HTTP headers to append
430     to request. Caller is responsible for providing headers that make sense.
431
432     If |read_timeout| is not None will configure underlying socket to
433     raise TimeoutError exception whenever there's no response from the server
434     for more than |read_timeout| seconds. It can happen during any read
435     operation so once you pass non-None |read_timeout| be prepared to handle
436     these exceptions in subsequent reads from the stream.
437
438     Returns a file-like object, where the response may be read from, or None
439     if it was unable to connect. If |stream| is False will read whole response
440     into memory buffer before returning file-like object that reads from this
441     memory buffer.
442     """
443     assert urlpath and urlpath[0] == '/', urlpath
444
445     if data is not None:
446       assert method in (None, 'POST', 'PUT')
447       method = method or 'POST'
448       content_type = content_type or DEFAULT_CONTENT_TYPE
449       body = self.encode_request_body(data, content_type)
450     else:
451       assert method in (None, 'GET')
452       method = method or 'GET'
453       body = None
454       assert not content_type, 'Can\'t use content_type on GET'
455
456     # Prepare request info.
457     parsed = urlparse.urlparse('/' + urlpath.lstrip('/'))
458     resource_url = urlparse.urljoin(self.urlhost, parsed.path)
459     query_params = urlparse.parse_qsl(parsed.query)
460
461     # Prepare headers.
462     headers = get_case_insensitive_dict(headers or {})
463     if body is not None:
464       headers['Content-Length'] = len(body)
465       if content_type:
466         headers['Content-Type'] = content_type
467
468     last_error = None
469     auth_attempted = False
470
471     for attempt in retry_loop(max_attempts, timeout):
472       # Log non-first attempt.
473       if attempt.attempt:
474         logging.warning(
475             'Retrying request %s, attempt %d/%d...',
476             resource_url, attempt.attempt, max_attempts)
477
478       try:
479         # Prepare and send a new request.
480         request = HttpRequest(
481             method, resource_url, query_params, body,
482             headers, read_timeout, stream)
483         if self.authenticator:
484           self.authenticator.authorize(request)
485         response = self.engine.perform_request(request)
486         logging.debug('Request %s succeeded', request.get_full_url())
487         return response
488
489       except (ConnectionError, TimeoutError) as e:
490         last_error = e
491         logging.warning(
492             'Unable to open url %s on attempt %d.\n%s',
493             request.get_full_url(), attempt.attempt, e.format())
494         continue
495
496       except HttpError as e:
497         last_error = e
498
499         # Access denied -> authenticate.
500         if e.code in (401, 403):
501           logging.warning(
502               'Authentication is required for %s on attempt %d.\n%s',
503               request.get_full_url(), attempt.attempt, e.format())
504           # Try to authenticate only once. If it doesn't help, then server does
505           # not support authentication or user doesn't have required access.
506           if not auth_attempted:
507             auth_attempted = True
508             if self.login(allow_user_interaction=False):
509               # Success! Run request again immediately.
510               attempt.skip_sleep = True
511               continue
512           # Authentication attempt was unsuccessful.
513           logging.error(
514               'Unable to authenticate to %s (%s). Use auth.py to login: '
515               'python auth.py login --service=%s',
516               self.urlhost, e.format(), self.urlhost)
517           return None
518
519         # Hit a error that can not be retried -> stop retry loop.
520         if not self.is_transient_http_error(e.code, retry_404, retry_50x):
521           # This HttpError means we reached the server and there was a problem
522           # with the request, so don't retry.
523           logging.error(
524               'Able to connect to %s but an exception was thrown.\n%s',
525               request.get_full_url(), e.format(verbose=True))
526           return None
527
528         # Retry all other errors.
529         logging.warning(
530             'Server responded with error on %s on attempt %d.\n%s',
531             request.get_full_url(), attempt.attempt, e.format())
532         continue
533
534     logging.error(
535         'Unable to open given url, %s, after %d attempts.\n%s',
536         request.get_full_url(), max_attempts, last_error.format(verbose=True))
537     return None
538
539   def json_request(
540       self,
541       urlpath,
542       method=None,
543       data=None,
544       max_attempts=URL_OPEN_MAX_ATTEMPTS,
545       timeout=URL_OPEN_TIMEOUT,
546       headers=None):
547     """Sends JSON request to the server and parses JSON response it get back.
548
549     Arguments:
550       method: HTTP method to use ('GET', 'POST', ...).
551       urlpath: relative request path (e.g. '/auth/v1/...').
552       data: object to serialize to JSON and sent in the request.
553       max_attempts: how many times to retry 50x errors.
554       timeout: how long to wait for a response (including all retries).
555       headers: dict with additional request headers.
556
557     If |method| is given it can be 'GET', 'POST' or 'PUT' and it will be used
558     when performing the request. By default it's GET if |data| is None and POST
559     if |data| is not None.
560
561     Returns:
562       Deserialized JSON response on success, None on error or timeout.
563     """
564     response = self.request(
565         urlpath,
566         content_type=JSON_CONTENT_TYPE if data is not None else None,
567         data=data,
568         headers=headers,
569         max_attempts=max_attempts,
570         retry_404=False,
571         retry_50x=True,
572         stream=False,
573         method=method,
574         timeout=timeout)
575     if not response:
576       return None
577     try:
578       text = response.read()
579       if not text:
580         return None
581     except TimeoutError:
582       return None
583     try:
584       return json.loads(text)
585     except ValueError:
586       logging.error('Not a JSON response when calling %s: %s', urlpath, text)
587       return None
588
589
590 class HttpRequest(object):
591   """Request to HttpService."""
592
593   def __init__(self, method, url, params, body, headers, timeout, stream):
594     """Arguments:
595       |method| - HTTP method to use
596       |url| - relative URL to the resource, without query parameters
597       |params| - list of (key, value) pairs to put into GET parameters
598       |body| - encoded body of the request (None or str)
599       |headers| - dict with request headers
600       |timeout| - socket read timeout (None to disable)
601       |stream| - True to stream response from socket
602     """
603     self.method = method
604     self.url = url
605     self.params = params[:]
606     self.body = body
607     self.headers = headers.copy()
608     self.timeout = timeout
609     self.stream = stream
610     self._cookies = None
611
612   @property
613   def cookies(self):
614     """CookieJar object that will be used for cookies in this request."""
615     if self._cookies is None:
616       self._cookies = cookielib.CookieJar()
617     return self._cookies
618
619   def get_full_url(self):
620     """Resource URL with url-encoded GET parameters."""
621     if not self.params:
622       return self.url
623     else:
624       return '%s?%s' % (self.url, urllib.urlencode(self.params))
625
626   def make_fake_response(self, content='', headers=None):
627     """Makes new fake HttpResponse to this request, useful in tests."""
628     return HttpResponse.get_fake_response(content, self.get_full_url(), headers)
629
630
631 class HttpResponse(object):
632   """Response from HttpService."""
633
634   def __init__(self, stream, url, headers):
635     self._stream = stream
636     self._url = url
637     self._headers = get_case_insensitive_dict(headers)
638     self._read = 0
639
640   @property
641   def content_length(self):
642     """Total length to the response or None if not known in advance."""
643     length = self.get_header('Content-Length')
644     return int(length) if length is not None else None
645
646   def get_header(self, header):
647     """Returns response header (as str) or None if no such header."""
648     return self._headers.get(header)
649
650   def read(self, size=None):
651     """Reads up to |size| bytes from the stream and returns them.
652
653     If |size| is None reads all available bytes.
654
655     Raises TimeoutError on read timeout.
656     """
657     try:
658       # cStringIO has a bug: stream.read(None) is not the same as stream.read().
659       data = self._stream.read() if size is None else self._stream.read(size)
660       self._read += len(data)
661       return data
662     except (socket.timeout, ssl.SSLError, requests.Timeout) as e:
663       logging.error('Timeout while reading from %s, read %d of %s: %s',
664           self._url, self._read, self.content_length, e)
665       raise TimeoutError(e)
666
667   @classmethod
668   def get_fake_response(cls, content, url, headers=None):
669     """Returns HttpResponse with predefined content, useful in tests."""
670     headers = dict(headers or {})
671     headers['Content-Length'] = len(content)
672     return cls(StringIO.StringIO(content), url, headers)
673
674
675 class Authenticator(object):
676   """Base class for objects that know how to authenticate into http services."""
677
678   def authorize(self, request):
679     """Add authentication information to the request."""
680
681   def login(self, allow_user_interaction):
682     """Run interactive authentication flow."""
683     raise NotImplementedError()
684
685   def logout(self):
686     """Purges access credentials from local cache."""
687
688
689 class RequestsLibEngine(object):
690   """Class that knows how to execute HttpRequests via requests library."""
691
692   # Preferred number of connections in a connection pool.
693   CONNECTION_POOL_SIZE = 64
694   # If True will not open more than CONNECTION_POOL_SIZE connections.
695   CONNECTION_POOL_BLOCK = False
696   # Maximum number of internal connection retries in a connection pool.
697   CONNECTION_RETRIES = 0
698
699   def __init__(self):
700     super(RequestsLibEngine, self).__init__()
701     self.session = requests.Session()
702     # Configure session.
703     self.session.trust_env = False
704     self.session.verify = tools.get_cacerts_bundle()
705     # Configure connection pools.
706     for protocol in ('https://', 'http://'):
707       self.session.mount(protocol, adapters.HTTPAdapter(
708           pool_connections=self.CONNECTION_POOL_SIZE,
709           pool_maxsize=self.CONNECTION_POOL_SIZE,
710           max_retries=self.CONNECTION_RETRIES,
711           pool_block=self.CONNECTION_POOL_BLOCK))
712
713   def perform_request(self, request):
714     """Sends a HttpRequest to the server and reads back the response.
715
716     Returns HttpResponse.
717
718     Raises:
719       ConnectionError - failed to establish connection to the server.
720       TimeoutError - timeout while connecting or reading response.
721       HttpError - server responded with >= 400 error code.
722     """
723     try:
724       response = self.session.request(
725           method=request.method,
726           url=request.url,
727           params=request.params,
728           data=request.body,
729           headers=request.headers,
730           cookies=request.cookies,
731           timeout=request.timeout,
732           stream=request.stream)
733       response.raise_for_status()
734       if request.stream:
735         stream = response.raw
736       else:
737         stream = StringIO.StringIO(response.content)
738       return HttpResponse(stream, request.get_full_url(), response.headers)
739     except requests.Timeout as e:
740       raise TimeoutError(e)
741     except requests.HTTPError as e:
742       raise HttpError(e.response.status_code, e)
743     except (requests.ConnectionError, socket.timeout, ssl.SSLError) as e:
744       raise ConnectionError(e)
745
746
747 class OAuthAuthenticator(Authenticator):
748   """Uses OAuth Authorization header to authenticate requests."""
749
750   def __init__(self, urlhost, config):
751     super(OAuthAuthenticator, self).__init__()
752     assert isinstance(config, oauth.OAuthConfig)
753     self.urlhost = urlhost
754     self.config = config
755     self._lock = threading.Lock()
756     self._access_token_known = False
757     self._access_token = None
758
759   def authorize(self, request):
760     with self._lock:
761       if not self._access_token_known:
762         self._access_token = oauth.load_access_token(self.urlhost, self.config)
763         self._access_token_known = True
764       if self._access_token:
765         request.headers['Authorization'] = 'Bearer %s' % self._access_token
766
767   def login(self, allow_user_interaction):
768     with self._lock:
769       self._access_token = oauth.create_access_token(
770           self.urlhost, self.config, allow_user_interaction)
771       self._access_token_known = True
772       return self._access_token is not None
773
774   def logout(self):
775     with self._lock:
776       self._access_token = None
777       self._access_token_known = True
778       oauth.purge_access_token(self.urlhost, self.config)
779
780
781 class RetryAttempt(object):
782   """Contains information about current retry attempt.
783
784   Yielded from retry_loop.
785   """
786
787   def __init__(self, attempt, remaining):
788     """Information about current attempt in retry loop:
789       |attempt| - zero based index of attempt.
790       |remaining| - how much time is left before retry loop finishes retries.
791     """
792     self.attempt = attempt
793     self.remaining = remaining
794     self.skip_sleep = False
795
796
797 def calculate_sleep_before_retry(attempt, max_duration):
798   """How long to sleep before retrying an attempt in retry_loop."""
799   # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
800   # survive.
801   MAX_SLEEP = 10.
802   # random.random() returns [0.0, 1.0). Starts with relatively short waiting
803   # time by starting with 1.5/2+1.5^-1 median offset.
804   duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
805   assert duration > 0.1
806   duration = min(MAX_SLEEP, duration)
807   if max_duration:
808     duration = min(max_duration, duration)
809   return duration
810
811
812 def sleep_before_retry(attempt, max_duration):
813   """Sleeps for some amount of time when retrying the attempt in retry_loop.
814
815   To be mocked in tests.
816   """
817   time.sleep(calculate_sleep_before_retry(attempt, max_duration))
818
819
820 def current_time():
821   """Used by retry loop to get current time.
822
823   To be mocked in tests.
824   """
825   return time.time()
826
827
828 def retry_loop(max_attempts=None, timeout=None):
829   """Yields whenever new attempt to perform some action is needed.
830
831   Yields instances of RetryAttempt class that contains information about current
832   attempt. Setting |skip_sleep| attribute of RetryAttempt to True will cause
833   retry loop to run next attempt immediately.
834   """
835   start = current_time()
836   for attempt in itertools.count():
837     # Too many attempts?
838     if max_attempts and attempt == max_attempts:
839       break
840     # Retried for too long?
841     remaining = (timeout - (current_time() - start)) if timeout else None
842     if remaining is not None and remaining < 0:
843       break
844     # Kick next iteration.
845     attemp_obj = RetryAttempt(attempt, remaining)
846     yield attemp_obj
847     if attemp_obj.skip_sleep:
848       continue
849     # Only sleep if we are going to try again.
850     if max_attempts and attempt != max_attempts - 1:
851       remaining = (timeout - (current_time() - start)) if timeout else None
852       if remaining is not None and remaining < 0:
853         break
854       sleep_before_retry(attempt, remaining)