# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import os
-
-from app_yaml_helper import AppYamlHelper
-
-def GetAppVersion():
- if 'CURRENT_VERSION_ID' in os.environ:
- # The version ID looks like 2-0-25.36712548, we only want the 2-0-25.
- return os.environ['CURRENT_VERSION_ID'].split('.', 1)[0]
- # Not running on appengine, get it from the app.yaml file ourselves.
- app_yaml_path = os.path.join(os.path.split(__file__)[0], 'app.yaml')
- with open(app_yaml_path, 'r') as app_yaml:
- return AppYamlHelper.ExtractVersion(app_yaml.read())
-
def IsDeadlineExceededError(error):
'''A general way of determining whether |error| is a DeadlineExceededError,
since there are 3 different types thrown by AppEngine and we might as well
handle them all the same way. For more info see:
https://developers.google.com/appengine/articles/deadlineexceedederrors
'''
- return error.__class__.__name__ == 'DeadlineExceededError'
+ return type(error).__name__ == 'DeadlineExceededError'
+
+
+def IsDownloadError(error):
+ return type(error).__name__ == 'DownloadError'
+
# This will attempt to import the actual App Engine modules, and if it fails,
# they will be replaced with fake modules. This is useful during testing.
try:
+ import google.appengine.api.app_identity as app_identity
import google.appengine.api.files as files
import google.appengine.api.logservice as logservice
import google.appengine.api.memcache as memcache
+ import google.appengine.api.taskqueue as taskqueue
import google.appengine.api.urlfetch as urlfetch
import google.appengine.ext.blobstore as blobstore
from google.appengine.ext.blobstore.blobstore import BlobReferenceProperty
import google.appengine.ext.db as db
- import google.appengine.ext.webapp as webapp
+ import webapp2
except ImportError:
import re
from StringIO import StringIO
for k, v in FAKE_URL_FETCHER_CONFIGURATION.iteritems():
if k.match(key):
return v
- return None
+ raise ValueError('No configuration found for %s' % key)
class _RPC(object):
def __init__(self, result=None):
def wait(self):
pass
+ class FakeAppIdentity(object):
+ """A fake app_identity module that returns no access tokens."""
+ def get_access_token(self, scope):
+ return (None, None)
+ app_identity = FakeAppIdentity()
+
class FakeUrlFetch(object):
"""A fake urlfetch module that uses the current
|FAKE_URL_FETCHER_CONFIGURATION| to map urls to fake fetchers.
class _Response(object):
def __init__(self, content):
self.content = content
- self.headers = { 'content-type': 'none' }
+ self.headers = {'Content-Type': 'none'}
self.status_code = 200
def fetch(self, url, **kwargs):
+ url = url.split('?', 1)[0]
response = self._Response(_GetConfiguration(url).fetch(url))
if response.content is None:
response.status_code = 404
return response
- def create_rpc(self):
+ def create_rpc(self, **kwargs):
return _RPC()
def make_fetch_call(self, rpc, url, **kwargs):
_BLOBS = {}
class FakeBlobstore(object):
+ class BlobNotFoundError(Exception):
+ pass
+
class BlobReader(object):
def __init__(self, blob_key):
self._data = _BLOBS[blob_key].getvalue()
class Client(object):
def set_multi_async(self, mapping, namespace='', time=0):
- for k, v in mapping.iteritems():
- memcache.set(k, v, namespace=namespace, time=time)
+ return _RPC(result=dict(
+ (k, memcache.set(k, v, namespace=namespace, time=time))
+ for k, v in mapping.iteritems()))
def get_multi_async(self, keys, namespace='', time=0):
return _RPC(result=dict(
memcache = InMemoryMemcache()
- class webapp(object):
+ class webapp2(object):
class RequestHandler(object):
- """A fake webapp.RequestHandler class for Handler to extend.
+ """A fake webapp2.RequestHandler class for Handler to extend.
"""
def __init__(self, request, response):
self.request = request
class BlobReferenceProperty(object):
pass
+
+ # Executes any queued tasks synchronously as they are queued.
+ _task_runner = None
+
+ def SetTaskRunnerForTest(task_runner):
+ global _task_runner
+ _task_runner = task_runner
+
+ class SynchronousTaskQueue(object):
+ class Task(object):
+ def __init__(self, url=None, params={}):
+ self.url_ = url
+ self.params_ = params
+
+ def GetUrl(self):
+ return self.url_
+
+ def GetCommit(self):
+ return self.params_.get('commit')
+
+ class Queue(object):
+ def __init__(self, name='default'):
+ pass
+
+ def add(self, task):
+ global _task_runner
+ if _task_runner:
+ _task_runner(task.GetUrl(), task.GetCommit())
+ return _RPC()
+
+ def purge(self):
+ return _RPC()
+
+ taskqueue = SynchronousTaskQueue()