#include "base/bind.h"
#include "base/location.h"
#include "base/memory/scoped_ptr.h"
+#include "base/sequenced_task_runner.h"
+#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
#include "chrome/browser/sync_file_system/sync_file_metadata.h"
-using fileapi::FileSystemURL;
+using storage::FileSystemURL;
namespace sync_file_system {
namespace drive_backend {
namespace {
-class SyncTaskAdapter : public SyncTask {
+class SyncTaskAdapter : public ExclusiveTask {
public:
explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
- virtual ~SyncTaskAdapter() {}
+ ~SyncTaskAdapter() override {}
- virtual void Run(scoped_ptr<SyncTaskToken> token) OVERRIDE {
- task_.Run(SyncTaskToken::WrapToCallback(token.Pass()));
+ void RunExclusive(const SyncStatusCallback& callback) override {
+ task_.Run(callback);
}
private:
SyncTaskManager::SyncTaskManager(
base::WeakPtr<Client> client,
- size_t maximum_background_task)
+ size_t maximum_background_task,
+ const scoped_refptr<base::SequencedTaskRunner>& task_runner)
: client_(client),
maximum_background_task_(maximum_background_task),
pending_task_seq_(0),
- task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) {
+ task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
+ task_runner_(task_runner),
+ weak_ptr_factory_(this) {
}
SyncTaskManager::~SyncTaskManager() {
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
client_.reset();
token_.reset();
}
void SyncTaskManager::Initialize(SyncStatusCode status) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
DCHECK(!token_);
- NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
- status);
+ NotifyTaskDone(
+ SyncTaskToken::CreateForForegroundTask(
+ weak_ptr_factory_.GetWeakPtr(), task_runner_.get()),
+ status);
}
void SyncTaskManager::ScheduleTask(
const Task& task,
Priority priority,
const SyncStatusCallback& callback) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
ScheduleSyncTask(from_here,
scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
priority,
scoped_ptr<SyncTask> task,
Priority priority,
const SyncStatusCallback& callback) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
if (!token) {
PushPendingTask(
- base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
+ base::Bind(&SyncTaskManager::ScheduleSyncTask,
+ weak_ptr_factory_.GetWeakPtr(), from_here,
base::Passed(&task), priority, callback),
priority);
return;
const tracked_objects::Location& from_here,
const Task& task,
const SyncStatusCallback& callback) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
return ScheduleSyncTaskIfIdle(
from_here,
scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
const tracked_objects::Location& from_here,
scoped_ptr<SyncTask> task,
const SyncStatusCallback& callback) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
if (!token)
return false;
DCHECK(token);
SyncTaskManager* manager = token->manager();
+ if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
+ DCHECK(!manager);
+ SyncStatusCallback callback = token->callback();
+ token->clear_callback();
+ callback.Run(status);
+ return;
+ }
+
if (manager)
manager->NotifyTaskDoneBody(token.Pass(), status);
}
// static
-void SyncTaskManager::MoveTaskToBackground(
- scoped_ptr<SyncTaskToken> token,
- scoped_ptr<BlockingFactor> blocking_factor,
+void SyncTaskManager::UpdateTaskBlocker(
+ scoped_ptr<SyncTaskToken> current_task_token,
+ scoped_ptr<TaskBlocker> task_blocker,
const Continuation& continuation) {
- DCHECK(token);
+ DCHECK(current_task_token);
+
+ SyncTaskManager* manager = current_task_token->manager();
+ if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
+ DCHECK(!manager);
+ continuation.Run(current_task_token.Pass());
+ return;
+ }
- SyncTaskManager* manager = token->manager();
if (!manager)
return;
- manager->MoveTaskToBackgroundBody(token.Pass(), blocking_factor.Pass(),
- continuation);
+
+ scoped_ptr<SyncTaskToken> foreground_task_token;
+ scoped_ptr<SyncTaskToken> background_task_token;
+ scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
+ if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
+ foreground_task_token = current_task_token.Pass();
+ else
+ background_task_token = current_task_token.Pass();
+
+ manager->UpdateTaskBlockerBody(foreground_task_token.Pass(),
+ background_task_token.Pass(),
+ task_log.Pass(),
+ task_blocker.Pass(),
+ continuation);
}
bool SyncTaskManager::IsRunningTask(int64 token_id) const {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
// If the client is gone, all task should be aborted.
if (!client_)
return false;
if (token_id == SyncTaskToken::kForegroundTaskTokenID)
return true;
- return ContainsKey(running_background_task_, token_id);
+ return ContainsKey(running_background_tasks_, token_id);
+}
+
+void SyncTaskManager::DetachFromSequence() {
+ sequence_checker_.DetachFromSequence();
}
void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
SyncStatusCode status) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
DCHECK(token);
DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
<< " (" << SyncStatusCodeToString(status) << ")"
- << " " << token_->location().ToString();
+ << " " << token->location().ToString();
- if (token->blocking_factor()) {
- dependency_manager_.Erase(*token->blocking_factor());
- token->clear_blocking_factor();
+ if (token->task_blocker()) {
+ dependency_manager_.Erase(token->task_blocker());
+ token->clear_task_blocker();
+ }
+
+ if (client_) {
+ if (token->has_task_log()) {
+ token->FinalizeTaskLog(SyncStatusCodeToString(status));
+ client_->RecordTaskLog(token->PassTaskLog());
+ }
}
scoped_ptr<SyncTask> task;
token->clear_callback();
if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
token_ = token.Pass();
- task = running_task_.Pass();
+ task = running_foreground_task_.Pass();
} else {
- task = running_background_task_.take_and_erase(token->token_id());
+ task = running_background_tasks_.take_and_erase(token->token_id());
}
+ // Acquire the token to prevent a new task to jump into the queue.
+ token = token_.Pass();
+
bool task_used_network = false;
if (task)
task_used_network = task->used_network();
if (!callback.is_null())
callback.Run(status);
- StartNextTask();
+ // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
+ // making the call-chaing longer.
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
+ weak_ptr_factory_.GetWeakPtr(), base::Passed(&token)));
}
-void SyncTaskManager::MoveTaskToBackgroundBody(
- scoped_ptr<SyncTaskToken> token,
- scoped_ptr<BlockingFactor> blocking_factor,
+void SyncTaskManager::UpdateTaskBlockerBody(
+ scoped_ptr<SyncTaskToken> foreground_task_token,
+ scoped_ptr<SyncTaskToken> background_task_token,
+ scoped_ptr<TaskLogger::TaskLog> task_log,
+ scoped_ptr<TaskBlocker> task_blocker,
const Continuation& continuation) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
+ // Run the task directly if the parallelization is disabled.
if (!maximum_background_task_) {
- continuation.Run(token.Pass());
+ DCHECK(foreground_task_token);
+ DCHECK(!background_task_token);
+ foreground_task_token->SetTaskLog(task_log.Pass());
+ continuation.Run(foreground_task_token.Pass());
return;
}
- if (running_background_task_.size() >= maximum_background_task_ ||
- !dependency_manager_.Insert(*blocking_factor)) {
- DCHECK(!running_background_task_.empty());
+ // Clear existing |task_blocker| from |dependency_manager_| before
+ // getting |foreground_task_token|, so that we can avoid dead lock.
+ if (background_task_token && background_task_token->task_blocker()) {
+ dependency_manager_.Erase(background_task_token->task_blocker());
+ background_task_token->clear_task_blocker();
+ }
- // Wait for NotifyTaskDone to release a |blocking_factor|.
+ // Try to get |foreground_task_token|. If it's not available, wait for
+ // current foreground task to finish.
+ if (!foreground_task_token) {
+ DCHECK(background_task_token);
+ foreground_task_token = GetToken(background_task_token->location(),
+ SyncStatusCallback());
+ if (!foreground_task_token) {
+ PushPendingTask(
+ base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
+ weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(&foreground_task_token),
+ base::Passed(&background_task_token),
+ base::Passed(&task_log),
+ base::Passed(&task_blocker),
+ continuation),
+ PRIORITY_HIGH);
+ MaybeStartNextForegroundTask(nullptr);
+ return;
+ }
+ }
+
+ // Check if the task can run as a background task now.
+ // If there are too many task running or any other task blocks current
+ // task, wait for any other task to finish.
+ bool task_number_limit_exceeded =
+ !background_task_token &&
+ running_background_tasks_.size() >= maximum_background_task_;
+ if (task_number_limit_exceeded ||
+ !dependency_manager_.Insert(task_blocker.get())) {
+ DCHECK(!running_background_tasks_.empty());
+ DCHECK(pending_backgrounding_task_.is_null());
+
+ // Wait for NotifyTaskDone to release a |task_blocker|.
pending_backgrounding_task_ =
- base::Bind(&SyncTaskManager::MoveTaskToBackground,
- base::Passed(&token), base::Passed(&blocking_factor),
+ base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
+ weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(&foreground_task_token),
+ base::Passed(&background_task_token),
+ base::Passed(&task_log),
+ base::Passed(&task_blocker),
continuation);
return;
}
- tracked_objects::Location from_here = token->location();
- SyncStatusCallback callback = token->callback();
- token->clear_callback();
-
- scoped_ptr<SyncTaskToken> background_task_token =
- SyncTaskToken::CreateForBackgroundTask(
- AsWeakPtr(), task_token_seq_++, blocking_factor.Pass());
- background_task_token->UpdateTask(from_here, callback);
-
- NotifyTaskBackgrounded(token.Pass(), *background_task_token);
- continuation.Run(background_task_token.Pass());
-}
+ if (background_task_token) {
+ background_task_token->set_task_blocker(task_blocker.Pass());
+ } else {
+ tracked_objects::Location from_here = foreground_task_token->location();
+ SyncStatusCallback callback = foreground_task_token->callback();
+ foreground_task_token->clear_callback();
+
+ background_task_token =
+ SyncTaskToken::CreateForBackgroundTask(weak_ptr_factory_.GetWeakPtr(),
+ task_runner_.get(),
+ task_token_seq_++,
+ task_blocker.Pass());
+ background_task_token->UpdateTask(from_here, callback);
+ running_background_tasks_.set(background_task_token->token_id(),
+ running_foreground_task_.Pass());
+ }
-void SyncTaskManager::NotifyTaskBackgrounded(
- scoped_ptr<SyncTaskToken> foreground_task_token,
- const SyncTaskToken& background_task_token) {
token_ = foreground_task_token.Pass();
- running_background_task_.set(background_task_token.token_id(),
- running_task_.Pass());
-
- StartNextTask();
+ MaybeStartNextForegroundTask(nullptr);
+ background_task_token->SetTaskLog(task_log.Pass());
+ continuation.Run(background_task_token.Pass());
}
scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
const tracked_objects::Location& from_here,
const SyncStatusCallback& callback) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
if (!token_)
- return scoped_ptr<SyncTaskToken>();
+ return nullptr;
token_->UpdateTask(from_here, callback);
return token_.Pass();
}
void SyncTaskManager::PushPendingTask(
const base::Closure& closure, Priority priority) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
}
void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
scoped_ptr<SyncTask> task) {
- DCHECK(!running_task_);
- running_task_ = task.Pass();
- running_task_->Run(token.Pass());
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+ DCHECK(!running_foreground_task_);
+
+ running_foreground_task_ = task.Pass();
+ running_foreground_task_->RunPreflight(token.Pass());
}
-void SyncTaskManager::StartNextTask() {
+void SyncTaskManager::MaybeStartNextForegroundTask(
+ scoped_ptr<SyncTaskToken> token) {
+ DCHECK(sequence_checker_.CalledOnValidSequencedThread());
+
+ if (token) {
+ DCHECK(!token_);
+ token_ = token.Pass();
+ }
+
if (!pending_backgrounding_task_.is_null()) {
base::Closure closure = pending_backgrounding_task_;
pending_backgrounding_task_.Reset();
return;
}
+ if (!token_)
+ return;
+
if (!pending_tasks_.empty()) {
base::Closure closure = pending_tasks_.top().task;
pending_tasks_.pop();