# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import logging
import time
import traceback
from app_yaml_helper import AppYamlHelper
-from appengine_wrappers import (
- GetAppVersion, IsDeadlineExceededError, logservice)
+from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue
from branch_utility import BranchUtility
from compiled_file_system import CompiledFileSystem
+from custom_logger import CustomLogger
from data_source_registry import CreateDataSources
-from empty_dir_file_system import EmptyDirFileSystem
-from environment import IsDevServer
-from file_system_util import CreateURLsFromPaths
+from environment import GetAppVersion
+from gcs_file_system_provider import CloudStorageFileSystemProvider
from github_file_system_provider import GithubFileSystemProvider
from host_file_system_provider import HostFileSystemProvider
from object_store_creator import ObjectStoreCreator
-from render_servlet import RenderServlet
+from render_refresher import RenderRefresher
from server_instance import ServerInstance
from servlet import Servlet, Request, Response
-import svn_constants
+from timer import Timer
-class _SingletonRenderServletDelegate(RenderServlet.Delegate):
- def __init__(self, server_instance):
- self._server_instance = server_instance
- def CreateServerInstance(self):
- return self._server_instance
+_log = CustomLogger('cron')
-class _CronLogger(object):
- '''Wraps the logging.* methods to prefix them with 'cron' and flush
- immediately. The flushing is important because often these cron runs time
- out and we lose the logs.
- '''
- def info(self, msg, *args): self._log(logging.info, msg, args)
- def warning(self, msg, *args): self._log(logging.warning, msg, args)
- def error(self, msg, *args): self._log(logging.error, msg, args)
-
- def _log(self, logfn, msg, args):
- try:
- logfn('cron: %s' % msg, *args)
- finally:
- logservice.flush()
-
-_cronlog = _CronLogger()
-
-def _RequestEachItem(title, items, request_callback):
- '''Runs a task |request_callback| named |title| for each item in |items|.
- |request_callback| must take an item and return a servlet response.
- Returns true if every item was successfully run, false if any return a
- non-200 response or raise an exception.
- '''
- _cronlog.info('%s: starting', title)
- success_count, failure_count = 0, 0
- start_time = time.time()
- try:
- for i, item in enumerate(items):
- def error_message(detail):
- return '%s: error rendering %s (%s of %s): %s' % (
- title, item, i + 1, len(items), detail)
- try:
- response = request_callback(item)
- if response.status == 200:
- success_count += 1
- else:
- _cronlog.error(error_message('response status %s' % response.status))
- failure_count += 1
- except Exception as e:
- _cronlog.error(error_message(traceback.format_exc()))
- failure_count += 1
- if IsDeadlineExceededError(e): raise
- finally:
- elapsed_seconds = time.time() - start_time
- _cronlog.info('%s: rendered %s of %s with %s failures in %s seconds',
- title, success_count, len(items), failure_count, elapsed_seconds);
- return success_count == len(items)
class CronServlet(Servlet):
'''Servlet which runs a cron job.
def CreateHostFileSystemProvider(self,
object_store_creator,
- max_trunk_revision=None):
+ pinned_commit=None):
return HostFileSystemProvider(object_store_creator,
- max_trunk_revision=max_trunk_revision)
+ pinned_commit=pinned_commit)
def CreateGithubFileSystemProvider(self, object_store_creator):
return GithubFileSystemProvider(object_store_creator)
+ def CreateGCSFileSystemProvider(self, object_store_creator):
+ return CloudStorageFileSystemProvider(object_store_creator)
+
def GetAppVersion(self):
return GetAppVersion()
def Get(self):
- # Crons often time out, and if they do we need to make sure to flush the
+ # Refreshes may time out, and if they do we need to make sure to flush the
# logs before the process gets killed (Python gives us a couple of
# seconds).
#
# So, manually flush logs at the end of the cron run. However, sometimes
- # even that isn't enough, which is why in this file we use _cronlog and
+ # even that isn't enough, which is why in this file we use _log and
# make it flush the log every time its used.
logservice.AUTOFLUSH_ENABLED = False
try:
return self._GetImpl()
except BaseException:
- _cronlog.error('Caught top-level exception! %s', traceback.format_exc())
+ _log.error('Caught top-level exception! %s', traceback.format_exc())
finally:
logservice.flush()
def _GetImpl(self):
# Cron strategy:
#
- # Find all public template files and static files, and render them. Most of
- # the time these won't have changed since the last cron run, so it's a
- # little wasteful, but hopefully rendering is really fast (if it isn't we
- # have a problem).
- _cronlog.info('starting')
-
- # This is returned every time RenderServlet wants to create a new
- # ServerInstance.
+ # Collect all DataSources, the PlatformBundle, the ContentProviders, and
+ # any other statically renderered contents (e.g. examples content),
+ # and spin up taskqueue tasks which will refresh any cached data relevant
+ # to these assets.
#
- # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking
- # everything. Need retry logic at the fetcher level.
- server_instance = self._GetSafeServerInstance()
- trunk_fs = server_instance.host_file_system_provider.GetTrunk()
-
- def render(path):
- request = Request(path, self._request.host, self._request.headers)
- delegate = _SingletonRenderServletDelegate(server_instance)
- return RenderServlet(request, delegate).Get()
-
- def request_files_in_dir(path, prefix=''):
- '''Requests every file found under |path| in this host file system, with
- a request prefix of |prefix|.
- '''
- files = [name for name, _ in CreateURLsFromPaths(trunk_fs, path, prefix)]
- return _RequestEachItem(path, files, render)
-
- results = []
-
- try:
- # Rendering the public templates will also pull in all of the private
- # templates.
- results.append(request_files_in_dir(svn_constants.PUBLIC_TEMPLATE_PATH))
-
- # Rendering the public templates will have pulled in the .js and
- # manifest.json files (for listing examples on the API reference pages),
- # but there are still images, CSS, etc.
- results.append(request_files_in_dir(svn_constants.STATIC_PATH,
- prefix='static/'))
+ # TODO(rockot/kalman): At the moment examples are not actually refreshed
+ # because they're too slow.
- # Samples are too expensive to run on the dev server, where there is no
- # parallel fetch.
- if not IsDevServer():
- # Fetch each individual sample file.
- results.append(request_files_in_dir(svn_constants.EXAMPLES_PATH,
- prefix='extensions/examples/'))
+ _log.info('starting')
- # Fetch the zip file of each example (contains all the individual
- # files).
- example_zips = []
- for root, _, files in trunk_fs.Walk(svn_constants.EXAMPLES_PATH):
- example_zips.extend(
- root + '.zip' for name in files if name == 'manifest.json')
- results.append(_RequestEachItem(
- 'example zips',
- example_zips,
- lambda path: render('extensions/examples/' + path)))
+ server_instance = self._GetSafeServerInstance()
+ master_fs = server_instance.host_file_system_provider.GetMaster()
+ master_commit = master_fs.GetCommitID().Get()
- def run_cron(data_source):
- title = data_source.__class__.__name__
- _cronlog.info('%s: starting' % title)
- start_time = time.time()
- try:
- data_source.Cron()
- except Exception as e:
- _cronlog.error('%s: error %s' % (title, traceback.format_exc()))
- results.append(False)
- if IsDeadlineExceededError(e): raise
- finally:
- _cronlog.info(
- '%s: took %s seconds' % (title, time.time() - start_time))
+ # This is the guy that would be responsible for refreshing the cache of
+ # examples. Here for posterity, hopefully it will be added to the targets
+ # below someday.
+ render_refresher = RenderRefresher(server_instance, self._request)
- for data_source in CreateDataSources(server_instance).values():
- run_cron(data_source)
+ # Get the default taskqueue
+ queue = taskqueue.Queue()
- run_cron(server_instance.content_providers)
+ # GAE documentation specifies that it's bad to add tasks to a queue
+ # within one second of purging. We wait 2 seconds, because we like
+ # to go the extra mile.
+ queue.purge()
+ time.sleep(2)
+ success = True
+ try:
+ data_sources = CreateDataSources(server_instance)
+ targets = (data_sources.items() +
+ [('content_providers', server_instance.content_providers),
+ ('platform_bundle', server_instance.platform_bundle)])
+ title = 'initializing %s parallel targets' % len(targets)
+ _log.info(title)
+ timer = Timer()
+ for name, target in targets:
+ refresh_paths = target.GetRefreshPaths()
+ for path in refresh_paths:
+ queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path),
+ params={'commit': master_commit}))
+ _log.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
except:
- results.append(False)
# This should never actually happen (each cron step does its own
# conservative error checking), so re-raise no matter what it is.
- _cronlog.error('uncaught error: %s' % traceback.format_exc())
+ _log.error('uncaught error: %s' % traceback.format_exc())
+ success = False
raise
finally:
- success = all(results)
- _cronlog.info('finished (%s)', 'success' if success else 'FAILED')
+ _log.info('finished (%s)', 'success' if success else 'FAILED')
return (Response.Ok('Success') if success else
Response.InternalError('Failure'))
def _GetSafeServerInstance(self):
- '''Returns a ServerInstance with a host file system at a safe revision,
- meaning the last revision that the current running version of the server
+ '''Returns a ServerInstance with a host file system at a safe commit,
+ meaning the last commit that the current running version of the server
existed.
'''
delegate = self._delegate
- # IMPORTANT: Get a ServerInstance pinned to the most recent revision, not
+ # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not
# HEAD. These cron jobs take a while and run very frequently such that
# there is usually one running at any given time, and eventually a file
# that we're dealing with will change underneath it, putting the server in
# an undefined state.
server_instance_near_head = self._CreateServerInstance(
- self._GetMostRecentRevision())
+ self._GetMostRecentCommit())
app_yaml_handler = AppYamlHelper(
- svn_constants.APP_YAML_PATH,
server_instance_near_head.object_store_creator,
server_instance_near_head.host_file_system_provider)
safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
delegate.GetAppVersion()) - 1
- _cronlog.info('app version %s is out of date, safe is %s',
+ _log.info('app version %s is out of date, safe is %s',
delegate.GetAppVersion(), safe_revision)
return self._CreateServerInstance(safe_revision)
- def _GetMostRecentRevision(self):
- '''Gets the revision of the most recent patch submitted to the host file
- system. This is similar to HEAD but it's a concrete revision so won't
+ def _GetMostRecentCommit(self):
+ '''Gets the commit of the most recent patch submitted to the host file
+ system. This is similar to HEAD but it's a concrete commit so won't
change as the cron runs.
'''
head_fs = (
- self._CreateServerInstance(None).host_file_system_provider.GetTrunk())
- return head_fs.Stat('/').version
+ self._CreateServerInstance(None).host_file_system_provider.GetMaster())
+ return head_fs.GetCommitID().Get()
- def _CreateServerInstance(self, revision):
- '''Creates a ServerInstance pinned to |revision|, or HEAD if None.
+ def _CreateServerInstance(self, commit):
+ '''Creates a ServerInstance pinned to |commit|, or HEAD if None.
NOTE: If passed None it's likely that during the cron run patches will be
submitted at HEAD, which may change data underneath the cron run.
'''
object_store_creator = ObjectStoreCreator(start_empty=True)
branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
- object_store_creator, max_trunk_revision=revision)
+ object_store_creator, pinned_commit=commit)
github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
object_store_creator)
+ gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider(
+ object_store_creator)
return ServerInstance(object_store_creator,
CompiledFileSystem.Factory(object_store_creator),
branch_utility,
host_file_system_provider,
- github_file_system_provider)
+ github_file_system_provider,
+ gcs_file_system_provider)