#include "base/format_macros.h"
#include "chrome/browser/sync_file_system/logger.h"
-#include "chrome/browser/sync_file_system/sync_file_system_service.h"
namespace sync_file_system {
+const int64 SyncProcessRunner::kSyncDelayInMilliseconds =
+ 1 * base::Time::kMillisecondsPerSecond; // 1 sec
+const int64 SyncProcessRunner::kSyncDelayWithSyncError =
+ 3 * base::Time::kMillisecondsPerSecond; // 3 sec
+const int64 SyncProcessRunner::kSyncDelayFastInMilliseconds = 100; // 100 ms
+const int SyncProcessRunner::kPendingChangeThresholdForFastSync = 10;
+const int64 SyncProcessRunner::kSyncDelaySlowInMilliseconds =
+ 30 * base::Time::kMillisecondsPerSecond; // 30 sec
+const int64 SyncProcessRunner::kSyncDelayMaxInMilliseconds =
+ 30 * 60 * base::Time::kMillisecondsPerSecond; // 30 min
+
namespace {
-// Default delay when more changes are available.
-const int64 kSyncDelayInMilliseconds = 1 * base::Time::kMillisecondsPerSecond;
+class BaseTimerHelper : public SyncProcessRunner::TimerHelper {
+ public:
+ BaseTimerHelper() {}
+
+ virtual bool IsRunning() OVERRIDE {
+ return timer_.IsRunning();
+ }
+
+ virtual void Start(const tracked_objects::Location& from_here,
+ const base::TimeDelta& delay,
+ const base::Closure& closure) OVERRIDE {
+ timer_.Start(from_here, delay, closure);
+ }
-// Default delay when the previous change has had an error (but remote service
-// is running).
-const int64 kSyncDelayWithSyncError = 3 * base::Time::kMillisecondsPerSecond;
+ virtual base::TimeTicks Now() const OVERRIDE {
+ return base::TimeTicks::Now();
+ }
-// Default delay when there're more than 10 pending changes.
-const int64 kSyncDelayFastInMilliseconds = 100;
-const int kPendingChangeThresholdForFastSync = 10;
+ virtual ~BaseTimerHelper() {}
-// Default delay when remote service is temporarily unavailable.
-const int64 kSyncDelaySlowInMilliseconds =
- 30 * base::Time::kMillisecondsPerSecond; // Start with 30 sec + exp backoff
+ private:
+ base::OneShotTimer<SyncProcessRunner> timer_;
-// Default delay when there're no changes.
-const int64 kSyncDelayMaxInMilliseconds =
- 30 * 60 * base::Time::kMillisecondsPerSecond; // 30 min
+ DISALLOW_COPY_AND_ASSIGN(BaseTimerHelper);
+};
bool WasSuccessfulSync(SyncStatusCode status) {
return status == SYNC_STATUS_OK ||
SyncProcessRunner::SyncProcessRunner(
const std::string& name,
- SyncFileSystemService* sync_service)
+ Client* client,
+ scoped_ptr<TimerHelper> timer_helper,
+ size_t max_parallel_task)
: name_(name),
- sync_service_(sync_service),
- current_delay_(0),
- last_delay_(0),
+ client_(client),
+ max_parallel_task_(max_parallel_task),
+ running_tasks_(0),
+ timer_helper_(timer_helper.Pass()),
+ service_state_(SYNC_SERVICE_RUNNING),
pending_changes_(0),
- running_(false),
- factory_(this) {}
+ factory_(this) {
+ DCHECK_LE(1u, max_parallel_task_);
+ if (!timer_helper_)
+ timer_helper_.reset(new BaseTimerHelper);
+}
SyncProcessRunner::~SyncProcessRunner() {}
void SyncProcessRunner::Schedule() {
- int64 delay = kSyncDelayInMilliseconds;
if (pending_changes_ == 0) {
ScheduleInternal(kSyncDelayMaxInMilliseconds);
return;
}
- switch (GetServiceState()) {
+
+ SyncServiceState last_service_state = service_state_;
+ service_state_ = GetServiceState();
+
+ switch (service_state_) {
case SYNC_SERVICE_RUNNING:
+ ResetThrottling();
if (pending_changes_ > kPendingChangeThresholdForFastSync)
- delay = kSyncDelayFastInMilliseconds;
+ ScheduleInternal(kSyncDelayFastInMilliseconds);
else
- delay = kSyncDelayInMilliseconds;
- break;
+ ScheduleInternal(kSyncDelayInMilliseconds);
+ return;
case SYNC_SERVICE_TEMPORARY_UNAVAILABLE:
- delay = kSyncDelaySlowInMilliseconds;
- if (last_delay_ >= kSyncDelaySlowInMilliseconds)
- delay = last_delay_ * 2;
- if (delay >= kSyncDelayMaxInMilliseconds)
- delay = kSyncDelayMaxInMilliseconds;
- break;
+ if (last_service_state != service_state_)
+ ThrottleSync(kSyncDelaySlowInMilliseconds);
+ ScheduleInternal(kSyncDelaySlowInMilliseconds);
+ return;
case SYNC_SERVICE_AUTHENTICATION_REQUIRED:
case SYNC_SERVICE_DISABLED:
- delay = kSyncDelayMaxInMilliseconds;
- break;
+ if (last_service_state != service_state_)
+ ThrottleSync(kSyncDelaySlowInMilliseconds);
+ ScheduleInternal(kSyncDelayMaxInMilliseconds);
+ return;
}
- ScheduleInternal(delay);
+
+ NOTREACHED();
+ ScheduleInternal(kSyncDelayMaxInMilliseconds);
}
-void SyncProcessRunner::ScheduleIfNotRunning() {
- if (!timer_.IsRunning())
- Schedule();
+void SyncProcessRunner::ThrottleSync(int64 base_delay) {
+ base::TimeTicks now = timer_helper_->Now();
+ base::TimeDelta elapsed = std::min(now, throttle_until_) - throttle_from_;
+ DCHECK(base::TimeDelta() <= elapsed);
+
+ throttle_from_ = now;
+ // Extend throttling duration by twice the elapsed time.
+ // That is, if the backoff repeats in a short period, the throttling period
+ // doesn't grow exponentially. If the backoff happens on the end of
+ // throttling period, it causes another throttling period that is twice as
+ // long as previous.
+ base::TimeDelta base_delay_delta =
+ base::TimeDelta::FromMilliseconds(base_delay);
+ const base::TimeDelta max_delay =
+ base::TimeDelta::FromMilliseconds(kSyncDelayMaxInMilliseconds);
+ throttle_until_ =
+ std::min(now + max_delay,
+ std::max(now + base_delay_delta, throttle_until_ + 2 * elapsed));
+}
+
+void SyncProcessRunner::ResetOldThrottling() {
+ if (throttle_until_ < base::TimeTicks::Now())
+ ResetThrottling();
+}
+
+void SyncProcessRunner::ResetThrottling() {
+ throttle_from_ = base::TimeTicks();
+ throttle_until_ = base::TimeTicks();
}
void SyncProcessRunner::OnChangesUpdated(
pending_changes_ = pending_changes;
if (old_pending_changes != pending_changes) {
if (pending_changes == 0)
- sync_service()->OnSyncIdle();
+ client_->OnSyncIdle();
util::Log(logging::LOG_VERBOSE, FROM_HERE,
"[%s] pending_changes updated: %" PRId64,
name_.c_str(), pending_changes);
Schedule();
}
+SyncFileSystemService* SyncProcessRunner::GetSyncService() {
+ return client_->GetSyncService();
+}
+
SyncServiceState SyncProcessRunner::GetServiceState() {
- return sync_service()->GetSyncServiceState();
+ return client_->GetSyncServiceState();
}
-void SyncProcessRunner::Finished(SyncStatusCode status) {
- DCHECK(running_);
- running_ = false;
+void SyncProcessRunner::Finished(const base::TimeTicks& start_time,
+ SyncStatusCode status) {
+ DCHECK_LT(0u, running_tasks_);
+ DCHECK_LE(running_tasks_, max_parallel_task_);
+ --running_tasks_;
util::Log(logging::LOG_VERBOSE, FROM_HERE,
- "[%s] * Finished (elapsed: %" PRId64 " sec)",
- name_.c_str(),
- (base::Time::Now() - last_scheduled_).InSeconds());
+ "[%s] * Finished (elapsed: %" PRId64 " ms)", name_.c_str(),
+ (timer_helper_->Now() - start_time).InMilliseconds());
+
if (status == SYNC_STATUS_NO_CHANGE_TO_SYNC ||
- status == SYNC_STATUS_FILE_BUSY)
+ status == SYNC_STATUS_FILE_BUSY) {
ScheduleInternal(kSyncDelayMaxInMilliseconds);
- else if (!WasSuccessfulSync(status) &&
- GetServiceState() == SYNC_SERVICE_RUNNING)
- ScheduleInternal(kSyncDelayWithSyncError);
+ return;
+ }
+
+ if (WasSuccessfulSync(status))
+ ResetOldThrottling();
else
- Schedule();
+ ThrottleSync(kSyncDelayWithSyncError);
+
+ Schedule();
}
void SyncProcessRunner::Run() {
- if (running_)
+ if (running_tasks_ >= max_parallel_task_)
return;
- running_ = true;
- last_scheduled_ = base::Time::Now();
- last_delay_ = current_delay_;
+ ++running_tasks_;
+ base::TimeTicks now = timer_helper_->Now();
+ last_run_ = now;
util::Log(logging::LOG_VERBOSE, FROM_HERE,
"[%s] * Started", name_.c_str());
- StartSync(
- base::Bind(&SyncProcessRunner::Finished, factory_.GetWeakPtr()));
+ StartSync(base::Bind(&SyncProcessRunner::Finished, factory_.GetWeakPtr(),
+ now));
+ if (running_tasks_ < max_parallel_task_)
+ Schedule();
}
void SyncProcessRunner::ScheduleInternal(int64 delay) {
- base::TimeDelta time_to_next = base::TimeDelta::FromMilliseconds(delay);
-
- if (timer_.IsRunning()) {
- if (current_delay_ == delay)
- return;
-
- base::TimeDelta elapsed = base::Time::Now() - last_scheduled_;
- if (elapsed < time_to_next) {
- time_to_next = time_to_next - elapsed;
- } else {
- time_to_next = base::TimeDelta::FromMilliseconds(
- kSyncDelayFastInMilliseconds);
+ base::TimeTicks now = timer_helper_->Now();
+ base::TimeTicks next_scheduled;
+
+ if (timer_helper_->IsRunning()) {
+ next_scheduled = last_run_ + base::TimeDelta::FromMilliseconds(delay);
+ if (next_scheduled < now) {
+ next_scheduled =
+ now + base::TimeDelta::FromMilliseconds(kSyncDelayFastInMilliseconds);
}
- timer_.Stop();
+ } else {
+ next_scheduled = now + base::TimeDelta::FromMilliseconds(delay);
}
- if (current_delay_ != delay) {
- util::Log(logging::LOG_VERBOSE, FROM_HERE,
- "[%s] Scheduling task in %" PRId64 " secs",
- name_.c_str(), time_to_next.InSeconds());
- }
- current_delay_ = delay;
+ if (next_scheduled < throttle_until_)
+ next_scheduled = throttle_until_;
+
+ if (timer_helper_->IsRunning() && last_scheduled_ == next_scheduled)
+ return;
+
+ util::Log(logging::LOG_VERBOSE, FROM_HERE,
+ "[%s] Scheduling task in %" PRId64 " ms",
+ name_.c_str(), (next_scheduled - now).InMilliseconds());
+
+ last_scheduled_ = next_scheduled;
- timer_.Start(FROM_HERE, time_to_next, this, &SyncProcessRunner::Run);
+ timer_helper_->Start(
+ FROM_HERE, next_scheduled - now,
+ base::Bind(&SyncProcessRunner::Run, base::Unretained(this)));
}
} // namespace sync_file_system