#include "chrome/browser/chromeos/drive/drive.pb.h"
#include "chrome/browser/chromeos/drive/file_cache.h"
#include "chrome/browser/chromeos/drive/file_system/download_operation.h"
-#include "chrome/browser/chromeos/drive/file_system/operation_observer.h"
+#include "chrome/browser/chromeos/drive/file_system/operation_delegate.h"
#include "chrome/browser/chromeos/drive/file_system_util.h"
#include "chrome/browser/chromeos/drive/job_scheduler.h"
#include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
DCHECK(!it->HasError());
}
-// Runs the task and returns a dummy cancel closure.
-base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
- task.Run();
- return base::Closure();
+// Gets the parent entry of the entry specified by the ID.
+FileError GetParentResourceEntry(ResourceMetadata* metadata,
+ const std::string& local_id,
+ ResourceEntry* parent) {
+ ResourceEntry entry;
+ FileError error = metadata->GetResourceEntryById(local_id, &entry);
+ if (error != FILE_ERROR_OK)
+ return error;
+ return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
}
} // namespace
-SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
+SyncClient::SyncTask::SyncTask()
+ : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
SyncClient::SyncTask::~SyncTask() {}
SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
- file_system::OperationObserver* observer,
+ file_system::OperationDelegate* delegate,
JobScheduler* scheduler,
ResourceMetadata* metadata,
FileCache* cache,
LoaderController* loader_controller,
const base::FilePath& temporary_file_directory)
: blocking_task_runner_(blocking_task_runner),
- operation_observer_(observer),
+ operation_delegate_(delegate),
metadata_(metadata),
cache_(cache),
download_operation_(new file_system::DownloadOperation(
blocking_task_runner,
- observer,
+ delegate,
scheduler,
metadata,
cache,
temporary_file_directory)),
entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
- observer,
+ delegate,
scheduler,
metadata,
cache,
SyncTask* task = &it->second;
switch (task->state) {
+ case SUSPENDED:
case PENDING:
- tasks_.erase(it);
+ OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
break;
case RUNNING:
if (!task->cancel_closure.is_null())
AddUpdateTaskInternal(context, local_id, delay_);
}
-void SyncClient::AddFetchTaskInternal(const std::string& local_id,
- const base::TimeDelta& delay) {
+bool SyncClient:: WaitForUpdateTaskToComplete(
+ const std::string& local_id,
+ const FileOperationCallback& callback) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
- SyncTask task;
- task.task = base::Bind(
- &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
- base::Unretained(download_operation_.get()),
+ SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id));
+ if (it == tasks_.end())
+ return false;
+
+ SyncTask* task = &it->second;
+ task->waiting_callbacks.push_back(callback);
+ return true;
+}
+
+base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
+ const ClientContext& context) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+ return download_operation_->EnsureFileDownloadedByLocalId(
local_id,
- ClientContext(BACKGROUND),
+ context,
GetFileContentInitializedCallback(),
google_apis::GetContentCallback(),
base::Bind(&SyncClient::OnFetchFileComplete,
weak_ptr_factory_.GetWeakPtr(),
local_id));
+}
+
+void SyncClient::AddFetchTaskInternal(const std::string& local_id,
+ const base::TimeDelta& delay) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+
+ SyncTask task;
+ task.state = PENDING;
+ task.context = ClientContext(BACKGROUND);
+ task.task = base::Bind(&SyncClient::PerformFetchTask,
+ base::Unretained(this),
+ local_id);
AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
}
+base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
+ const ClientContext& context) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+ entry_update_performer_->UpdateEntry(
+ local_id,
+ context,
+ base::Bind(&SyncClient::OnTaskComplete,
+ weak_ptr_factory_.GetWeakPtr(),
+ UPDATE,
+ local_id));
+ return base::Closure();
+}
+
void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
const std::string& local_id,
const base::TimeDelta& delay) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+
SyncTask task;
- task.task = base::Bind(
- &RunTaskAndReturnDummyCancelClosure,
- base::Bind(&EntryUpdatePerformer::UpdateEntry,
- base::Unretained(entry_update_performer_.get()),
- local_id,
- context,
- base::Bind(&SyncClient::OnUpdateComplete,
- weak_ptr_factory_.GetWeakPtr(),
- local_id)));
+ task.state = PENDING;
+ task.context = context;
+ task.task = base::Bind(&SyncClient::PerformUpdateTask,
+ base::Unretained(this),
+ local_id);
AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
}
SyncTasks::iterator it = tasks_.find(key);
if (it != tasks_.end()) {
switch (it->second.state) {
+ case SUSPENDED:
+ // Activate the task.
+ it->second.state = PENDING;
+ break;
case PENDING:
// The same task will run, do nothing.
- break;
+ return;
case RUNNING:
// Something has changed since the task started. Schedule rerun.
it->second.should_run_again = true;
- break;
+ return;
}
- return;
+ } else {
+ tasks_[key] = task;
}
-
DCHECK_EQ(PENDING, task.state);
- tasks_[key] = task;
-
base::MessageLoopProxy::current()->PostDelayedTask(
FROM_HERE,
base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
}
void SyncClient::StartTask(const SyncTasks::key_type& key) {
+ ResourceEntry* parent = new ResourceEntry;
+ base::PostTaskAndReplyWithResult(
+ blocking_task_runner_.get(),
+ FROM_HERE,
+ base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
+ base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
+ weak_ptr_factory_.GetWeakPtr(),
+ key,
+ base::Owned(parent)));
+}
+
+void SyncClient::StartTaskAfterGetParentResourceEntry(
+ const SyncTasks::key_type& key,
+ const ResourceEntry* parent,
+ FileError error) {
+ const SyncType type = key.first;
+ const std::string& local_id = key.second;
SyncTasks::iterator it = tasks_.find(key);
if (it == tasks_.end())
return;
SyncTask* task = &it->second;
switch (task->state) {
+ case SUSPENDED:
case PENDING:
- task->state = RUNNING;
- task->cancel_closure = task->task.Run();
break;
case RUNNING: // Do nothing.
- break;
+ return;
}
+
+ if (error != FILE_ERROR_OK) {
+ OnTaskComplete(type, local_id, error);
+ return;
+ }
+
+ if (type == UPDATE &&
+ parent->resource_id().empty() &&
+ parent->local_id() != util::kDriveTrashDirLocalId) {
+ // Parent entry needs to be synced to get a resource ID.
+ // Suspend the task and register it as a dependent task of the parent.
+ const SyncTasks::key_type key_parent(type, parent->local_id());
+ SyncTasks::iterator it_parent = tasks_.find(key_parent);
+ if (it_parent == tasks_.end()) {
+ OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
+ LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
+ << local_id << ", parent_id = " << parent->local_id();
+ return;
+ }
+ task->state = SUSPENDED;
+ it_parent->second.dependent_tasks.push_back(key);
+ return;
+ }
+
+ // Run the task.
+ task->state = RUNNING;
+ task->cancel_closure = task->task.Run(task->context);
}
void SyncClient::OnGetLocalIdsOfBacklog(
AddFetchTask((*local_ids)[i]);
}
-bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
+void SyncClient::OnTaskComplete(SyncType type,
+ const std::string& local_id,
+ FileError error) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
const SyncTasks::key_type key(type, local_id);
SyncTasks::iterator it = tasks_.find(key);
DCHECK(it != tasks_.end());
+ base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
+
+ switch (error) {
+ case FILE_ERROR_OK:
+ DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
+ break;
+ case FILE_ERROR_ABORT:
+ // Ignore it because this is caused by user's cancel operations.
+ break;
+ case FILE_ERROR_NO_CONNECTION:
+ // Run the task again so that we'll retry once the connection is back.
+ it->second.should_run_again = true;
+ it->second.context = ClientContext(BACKGROUND);
+ break;
+ case FILE_ERROR_SERVICE_UNAVAILABLE:
+ // Run the task again so that we'll retry once the service is back.
+ it->second.should_run_again = true;
+ it->second.context = ClientContext(BACKGROUND);
+ retry_delay = long_delay_;
+ operation_delegate_->OnDriveSyncError(
+ file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
+ break;
+ default:
+ operation_delegate_->OnDriveSyncError(
+ file_system::DRIVE_SYNC_ERROR_MISC, local_id);
+ LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
+ << ": " << FileErrorToString(error);
+ }
+
+ for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) {
+ base::MessageLoopProxy::current()->PostTask(
+ FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error));
+ }
+ it->second.waiting_callbacks.clear();
+
if (it->second.should_run_again) {
DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
+ it->second.state = PENDING;
it->second.should_run_again = false;
- it->second.task.Run();
- return false;
+ base::MessageLoopProxy::current()->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
+ retry_delay);
+ } else {
+ for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
+ StartTask(it->second.dependent_tasks[i]);
+ tasks_.erase(it);
}
-
- tasks_.erase(it);
- return true;
}
void SyncClient::OnFetchFileComplete(const std::string& local_id,
const base::FilePath& local_path,
scoped_ptr<ResourceEntry> entry) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
- if (!OnTaskComplete(FETCH, local_id))
- return;
-
- if (error == FILE_ERROR_OK) {
- DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
- } else {
- switch (error) {
- case FILE_ERROR_ABORT:
- // If user cancels download, unpin the file so that we do not sync the
- // file again.
- base::PostTaskAndReplyWithResult(
- blocking_task_runner_,
- FROM_HERE,
- base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
- base::Bind(&util::EmptyFileOperationCallback));
- break;
- case FILE_ERROR_NO_CONNECTION:
- // Add the task again so that we'll retry once the connection is back.
- AddFetchTaskInternal(local_id, delay_);
- break;
- case FILE_ERROR_SERVICE_UNAVAILABLE:
- // Add the task again so that we'll retry once the service is back.
- AddFetchTaskInternal(local_id, long_delay_);
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
- local_id);
- break;
- default:
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_MISC,
- local_id);
- LOG(WARNING) << "Failed to fetch " << local_id
- << ": " << FileErrorToString(error);
- }
- }
-}
-
-void SyncClient::OnUpdateComplete(const std::string& local_id,
- FileError error) {
- DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
- if (!OnTaskComplete(UPDATE, local_id))
- return;
-
- if (error == FILE_ERROR_OK) {
- DVLOG(1) << "Updated " << local_id;
-
- // Add update tasks for child entries which may be waiting for the parent to
- // be updated.
- ResourceEntryVector* entries = new ResourceEntryVector;
+ OnTaskComplete(FETCH, local_id, error);
+ if (error == FILE_ERROR_ABORT) {
+ // If user cancels download, unpin the file so that we do not sync the file
+ // again.
base::PostTaskAndReplyWithResult(
- blocking_task_runner_.get(),
+ blocking_task_runner_,
FROM_HERE,
- base::Bind(&ResourceMetadata::ReadDirectoryById,
- base::Unretained(metadata_), local_id, entries),
- base::Bind(&SyncClient::AddChildUpdateTasks,
- weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
- } else {
- switch (error) {
- case FILE_ERROR_ABORT:
- // Ignore it because this is caused by user's cancel operations.
- break;
- case FILE_ERROR_NO_CONNECTION:
- // Add the task again so that we'll retry once the connection is back.
- AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
- base::TimeDelta::FromSeconds(0));
- break;
- case FILE_ERROR_SERVICE_UNAVAILABLE:
- // Add the task again so that we'll retry once the service is back.
- AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
- local_id);
- break;
- default:
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_MISC,
- local_id);
- LOG(WARNING) << "Failed to update " << local_id << ": "
- << FileErrorToString(error);
- }
- }
-}
-
-void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
- FileError error) {
- if (error != FILE_ERROR_OK)
- return;
-
- for (size_t i = 0; i < entries->size(); ++i) {
- const ResourceEntry& entry = (*entries)[i];
- if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
- AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
- base::TimeDelta::FromSeconds(0));
- }
+ base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
+ base::Bind(&util::EmptyFileOperationCallback));
}
}