# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import logging
-import posixpath
+import time
import traceback
from app_yaml_helper import AppYamlHelper
-from appengine_wrappers import IsDeadlineExceededError, logservice
+from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue
from branch_utility import BranchUtility
from compiled_file_system import CompiledFileSystem
+from custom_logger import CustomLogger
from data_source_registry import CreateDataSources
-from environment import GetAppVersion, IsDevServer
-from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS
-from file_system_util import CreateURLsFromPaths
-from future import Future
+from environment import GetAppVersion
from gcs_file_system_provider import CloudStorageFileSystemProvider
from github_file_system_provider import GithubFileSystemProvider
from host_file_system_provider import HostFileSystemProvider
from object_store_creator import ObjectStoreCreator
-from render_servlet import RenderServlet
+from render_refresher import RenderRefresher
from server_instance import ServerInstance
from servlet import Servlet, Request, Response
-from special_paths import SITE_VERIFICATION_FILE
-from timer import Timer, TimerClosure
+from timer import Timer
-class _SingletonRenderServletDelegate(RenderServlet.Delegate):
- def __init__(self, server_instance):
- self._server_instance = server_instance
+_log = CustomLogger('cron')
- def CreateServerInstance(self):
- return self._server_instance
-
-class _CronLogger(object):
- '''Wraps the logging.* methods to prefix them with 'cron' and flush
- immediately. The flushing is important because often these cron runs time
- out and we lose the logs.
- '''
- def info(self, msg, *args): self._log(logging.info, msg, args)
- def warning(self, msg, *args): self._log(logging.warning, msg, args)
- def error(self, msg, *args): self._log(logging.error, msg, args)
-
- def _log(self, logfn, msg, args):
- try:
- logfn('cron: %s' % msg, *args)
- finally:
- logservice.flush()
-
-_cronlog = _CronLogger()
-
-def _RequestEachItem(title, items, request_callback):
- '''Runs a task |request_callback| named |title| for each item in |items|.
- |request_callback| must take an item and return a servlet response.
- Returns true if every item was successfully run, false if any return a
- non-200 response or raise an exception.
- '''
- _cronlog.info('%s: starting', title)
- success_count, failure_count = 0, 0
- timer = Timer()
- try:
- for i, item in enumerate(items):
- def error_message(detail):
- return '%s: error rendering %s (%s of %s): %s' % (
- title, item, i + 1, len(items), detail)
- try:
- response = request_callback(item)
- if response.status == 200:
- success_count += 1
- else:
- _cronlog.error(error_message('response status %s' % response.status))
- failure_count += 1
- except Exception as e:
- _cronlog.error(error_message(traceback.format_exc()))
- failure_count += 1
- if IsDeadlineExceededError(e): raise
- finally:
- _cronlog.info('%s: rendered %s of %s with %s failures in %s',
- title, success_count, len(items), failure_count,
- timer.Stop().FormatElapsed())
- return success_count == len(items)
class CronServlet(Servlet):
'''Servlet which runs a cron job.
def CreateHostFileSystemProvider(self,
object_store_creator,
- max_trunk_revision=None):
+ pinned_commit=None):
return HostFileSystemProvider(object_store_creator,
- max_trunk_revision=max_trunk_revision)
+ pinned_commit=pinned_commit)
def CreateGithubFileSystemProvider(self, object_store_creator):
return GithubFileSystemProvider(object_store_creator)
return GetAppVersion()
def Get(self):
- # Crons often time out, and if they do we need to make sure to flush the
+ # Refreshes may time out, and if they do we need to make sure to flush the
# logs before the process gets killed (Python gives us a couple of
# seconds).
#
# So, manually flush logs at the end of the cron run. However, sometimes
- # even that isn't enough, which is why in this file we use _cronlog and
+ # even that isn't enough, which is why in this file we use _log and
# make it flush the log every time its used.
logservice.AUTOFLUSH_ENABLED = False
try:
return self._GetImpl()
except BaseException:
- _cronlog.error('Caught top-level exception! %s', traceback.format_exc())
+ _log.error('Caught top-level exception! %s', traceback.format_exc())
finally:
logservice.flush()
def _GetImpl(self):
# Cron strategy:
#
- # Find all public template files and static files, and render them. Most of
- # the time these won't have changed since the last cron run, so it's a
- # little wasteful, but hopefully rendering is really fast (if it isn't we
- # have a problem).
- _cronlog.info('starting')
-
- # This is returned every time RenderServlet wants to create a new
- # ServerInstance.
+ # Collect all DataSources, the PlatformBundle, the ContentProviders, and
+ # any other statically renderered contents (e.g. examples content),
+ # and spin up taskqueue tasks which will refresh any cached data relevant
+ # to these assets.
#
- # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking
- # everything. Need retry logic at the fetcher level.
- server_instance = self._GetSafeServerInstance()
- trunk_fs = server_instance.host_file_system_provider.GetTrunk()
+ # TODO(rockot/kalman): At the moment examples are not actually refreshed
+ # because they're too slow.
- def render(path):
- request = Request(path, self._request.host, self._request.headers)
- delegate = _SingletonRenderServletDelegate(server_instance)
- return RenderServlet(request, delegate).Get()
+ _log.info('starting')
- def request_files_in_dir(path, prefix='', strip_ext=None):
- '''Requests every file found under |path| in this host file system, with
- a request prefix of |prefix|. |strip_ext| is an optional list of file
- extensions that should be stripped from paths before requesting.
- '''
- def maybe_strip_ext(name):
- if name == SITE_VERIFICATION_FILE or not strip_ext:
- return name
- base, ext = posixpath.splitext(name)
- return base if ext in strip_ext else name
- files = [maybe_strip_ext(name)
- for name, _ in CreateURLsFromPaths(trunk_fs, path, prefix)]
- return _RequestEachItem(path, files, render)
-
- results = []
+ server_instance = self._GetSafeServerInstance()
+ master_fs = server_instance.host_file_system_provider.GetMaster()
+ master_commit = master_fs.GetCommitID().Get()
- try:
- # Start running the hand-written Cron methods first; they can be run in
- # parallel. They are resolved at the end.
- def run_cron_for_future(target):
- title = target.__class__.__name__
- future, init_timer = TimerClosure(target.Cron)
- assert isinstance(future, Future), (
- '%s.Cron() did not return a Future' % title)
- def resolve():
- resolve_timer = Timer()
- try:
- future.Get()
- except Exception as e:
- _cronlog.error('%s: error %s' % (title, traceback.format_exc()))
- results.append(False)
- if IsDeadlineExceededError(e): raise
- finally:
- resolve_timer.Stop()
- _cronlog.info('%s took %s: %s to initialize and %s to resolve' %
- (title,
- init_timer.With(resolve_timer).FormatElapsed(),
- init_timer.FormatElapsed(),
- resolve_timer.FormatElapsed()))
- return Future(callback=resolve)
+ # This is the guy that would be responsible for refreshing the cache of
+ # examples. Here for posterity, hopefully it will be added to the targets
+ # below someday.
+ render_refresher = RenderRefresher(server_instance, self._request)
- targets = (CreateDataSources(server_instance).values() +
- [server_instance.content_providers,
- server_instance.platform_bundle])
- title = 'initializing %s parallel Cron targets' % len(targets)
- _cronlog.info(title)
- timer = Timer()
- try:
- cron_futures = [run_cron_for_future(target) for target in targets]
- finally:
- _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
+ # Get the default taskqueue
+ queue = taskqueue.Queue()
- # Samples are too expensive to run on the dev server, where there is no
- # parallel fetch.
- #
- # XXX(kalman): Currently samples are *always* too expensive to fetch, so
- # disabling them for now. It won't break anything so long as we're still
- # not enforcing that everything gets cached for normal instances.
- if False: # should be "not IsDevServer()":
- # Fetch each individual sample file.
- results.append(request_files_in_dir(EXAMPLES,
- prefix='extensions/examples'))
+ # GAE documentation specifies that it's bad to add tasks to a queue
+ # within one second of purging. We wait 2 seconds, because we like
+ # to go the extra mile.
+ queue.purge()
+ time.sleep(2)
- # Resolve the hand-written Cron method futures.
- title = 'resolving %s parallel Cron targets' % len(targets)
- _cronlog.info(title)
+ success = True
+ try:
+ data_sources = CreateDataSources(server_instance)
+ targets = (data_sources.items() +
+ [('content_providers', server_instance.content_providers),
+ ('platform_bundle', server_instance.platform_bundle)])
+ title = 'initializing %s parallel targets' % len(targets)
+ _log.info(title)
timer = Timer()
- try:
- for future in cron_futures:
- future.Get()
- finally:
- _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
-
+ for name, target in targets:
+ refresh_paths = target.GetRefreshPaths()
+ for path in refresh_paths:
+ queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path),
+ params={'commit': master_commit}))
+ _log.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
except:
- results.append(False)
# This should never actually happen (each cron step does its own
# conservative error checking), so re-raise no matter what it is.
- _cronlog.error('uncaught error: %s' % traceback.format_exc())
+ _log.error('uncaught error: %s' % traceback.format_exc())
+ success = False
raise
finally:
- success = all(results)
- _cronlog.info('finished (%s)', 'success' if success else 'FAILED')
+ _log.info('finished (%s)', 'success' if success else 'FAILED')
return (Response.Ok('Success') if success else
Response.InternalError('Failure'))
def _GetSafeServerInstance(self):
- '''Returns a ServerInstance with a host file system at a safe revision,
- meaning the last revision that the current running version of the server
+ '''Returns a ServerInstance with a host file system at a safe commit,
+ meaning the last commit that the current running version of the server
existed.
'''
delegate = self._delegate
- # IMPORTANT: Get a ServerInstance pinned to the most recent revision, not
+ # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not
# HEAD. These cron jobs take a while and run very frequently such that
# there is usually one running at any given time, and eventually a file
# that we're dealing with will change underneath it, putting the server in
# an undefined state.
server_instance_near_head = self._CreateServerInstance(
- self._GetMostRecentRevision())
+ self._GetMostRecentCommit())
app_yaml_handler = AppYamlHelper(
server_instance_near_head.object_store_creator,
safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
delegate.GetAppVersion()) - 1
- _cronlog.info('app version %s is out of date, safe is %s',
+ _log.info('app version %s is out of date, safe is %s',
delegate.GetAppVersion(), safe_revision)
return self._CreateServerInstance(safe_revision)
- def _GetMostRecentRevision(self):
- '''Gets the revision of the most recent patch submitted to the host file
- system. This is similar to HEAD but it's a concrete revision so won't
+ def _GetMostRecentCommit(self):
+ '''Gets the commit of the most recent patch submitted to the host file
+ system. This is similar to HEAD but it's a concrete commit so won't
change as the cron runs.
'''
head_fs = (
- self._CreateServerInstance(None).host_file_system_provider.GetTrunk())
- return head_fs.Stat('').version
+ self._CreateServerInstance(None).host_file_system_provider.GetMaster())
+ return head_fs.GetCommitID().Get()
- def _CreateServerInstance(self, revision):
- '''Creates a ServerInstance pinned to |revision|, or HEAD if None.
+ def _CreateServerInstance(self, commit):
+ '''Creates a ServerInstance pinned to |commit|, or HEAD if None.
NOTE: If passed None it's likely that during the cron run patches will be
submitted at HEAD, which may change data underneath the cron run.
'''
object_store_creator = ObjectStoreCreator(start_empty=True)
branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
- object_store_creator, max_trunk_revision=revision)
+ object_store_creator, pinned_commit=commit)
github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
object_store_creator)
gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider(