1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/sync_client.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/operation_observer.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
17 #include "content/public/browser/browser_thread.h"
18 #include "google_apis/drive/task_util.h"
20 using content::BrowserThread;
27 // The delay constant is used to delay processing a sync task. We should not
28 // process SyncTasks immediately for the following reasons:
30 // 1) For fetching, the user may accidentally click on "Make available
31 // offline" checkbox on a file, and immediately cancel it in a second.
32 // It's a waste to fetch the file in this case.
34 // 2) For uploading, file writing via HTML5 file system API is performed in
35 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We
36 // shouldn't start uploading right after the step 1). Besides, the user
37 // may edit the same file repeatedly in a short period of time.
39 // TODO(satorux): We should find a way to handle the upload case more nicely,
40 // and shorten the delay. crbug.com/134774
41 const int kDelaySeconds = 1;
43 // The delay constant is used to delay retrying a sync task on server errors.
44 const int kLongDelaySeconds = 600;
46 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
47 // fetched (not present locally), to |to_update| if the file needs update.
48 void CollectBacklog(ResourceMetadata* metadata,
49 std::vector<std::string>* to_fetch,
50 std::vector<std::string>* to_update) {
54 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
55 for (; !it->IsAtEnd(); it->Advance()) {
56 const std::string& local_id = it->GetID();
57 const ResourceEntry& entry = it->GetValue();
58 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
59 to_update->push_back(local_id);
63 bool should_update = false;
64 switch (entry.metadata_edit_state()) {
65 case ResourceEntry::CLEAN:
67 case ResourceEntry::SYNCING:
68 case ResourceEntry::DIRTY:
73 FileCacheEntry cache_entry;
74 if (it->GetCacheEntry(&cache_entry)) {
75 if (cache_entry.is_pinned() && !cache_entry.is_present())
76 to_fetch->push_back(local_id);
78 if (cache_entry.is_dirty())
82 to_update->push_back(local_id);
84 DCHECK(!it->HasError());
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
90 std::vector<std::string>* local_ids) {
91 scoped_ptr<FileCache::Iterator> it = cache->GetIterator();
92 for (; !it->IsAtEnd(); it->Advance()) {
93 const FileCacheEntry& cache_entry = it->GetValue();
94 const std::string& local_id = it->GetID();
95 if (!cache_entry.is_pinned() || !cache_entry.is_present())
99 FileError error = metadata->GetResourceEntryById(local_id, &entry);
100 if (error != FILE_ERROR_OK) {
101 LOG(WARNING) << "Entry not found: " << local_id;
105 // If MD5s don't match, it indicates the local cache file is stale, unless
106 // the file is dirty (the MD5 is "local"). We should never re-fetch the
107 // file when we have a locally modified version.
108 if (entry.file_specific_info().md5() == cache_entry.md5() ||
109 cache_entry.is_dirty())
112 error = cache->Remove(local_id);
113 if (error != FILE_ERROR_OK) {
114 LOG(WARNING) << "Failed to remove cache entry: " << local_id;
118 error = cache->Pin(local_id);
119 if (error != FILE_ERROR_OK) {
120 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
124 local_ids->push_back(local_id);
126 DCHECK(!it->HasError());
131 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
132 SyncClient::SyncTask::~SyncTask() {}
134 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
135 file_system::OperationObserver* observer,
136 JobScheduler* scheduler,
137 ResourceMetadata* metadata,
139 LoaderController* loader_controller,
140 const base::FilePath& temporary_file_directory)
141 : blocking_task_runner_(blocking_task_runner),
142 operation_observer_(observer),
145 download_operation_(new file_system::DownloadOperation(
146 blocking_task_runner,
151 temporary_file_directory)),
152 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
158 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
159 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
160 weak_ptr_factory_(this) {
161 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
164 SyncClient::~SyncClient() {
165 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
168 void SyncClient::StartProcessingBacklog() {
169 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171 std::vector<std::string>* to_fetch = new std::vector<std::string>;
172 std::vector<std::string>* to_update = new std::vector<std::string>;
173 blocking_task_runner_->PostTaskAndReply(
175 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
176 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
177 weak_ptr_factory_.GetWeakPtr(),
178 base::Owned(to_fetch),
179 base::Owned(to_update)));
182 void SyncClient::StartCheckingExistingPinnedFiles() {
183 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
185 std::vector<std::string>* local_ids = new std::vector<std::string>;
186 blocking_task_runner_->PostTaskAndReply(
188 base::Bind(&CheckExistingPinnedFiles,
192 base::Bind(&SyncClient::AddFetchTasks,
193 weak_ptr_factory_.GetWeakPtr(),
194 base::Owned(local_ids)));
197 void SyncClient::AddFetchTask(const std::string& local_id) {
198 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
199 AddFetchTaskInternal(local_id, delay_);
202 void SyncClient::RemoveFetchTask(const std::string& local_id) {
203 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
205 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
206 if (it == tasks_.end())
209 SyncTask* task = &it->second;
210 switch (task->state) {
215 if (!task->cancel_closure.is_null())
216 task->cancel_closure.Run();
221 void SyncClient::AddUpdateTask(const ClientContext& context,
222 const std::string& local_id) {
223 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
224 AddUpdateTaskInternal(context, local_id, delay_);
227 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
228 const base::TimeDelta& delay) {
229 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
232 task.task = base::Bind(
233 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
234 base::Unretained(download_operation_.get()),
236 ClientContext(BACKGROUND),
237 base::Bind(&SyncClient::OnGetFileContentInitialized,
238 weak_ptr_factory_.GetWeakPtr()),
239 google_apis::GetContentCallback(),
240 base::Bind(&SyncClient::OnFetchFileComplete,
241 weak_ptr_factory_.GetWeakPtr(),
243 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
247 const std::string& local_id,
248 const base::TimeDelta& delay) {
250 task.task = base::Bind(
251 &EntryUpdatePerformer::UpdateEntry,
252 base::Unretained(entry_update_performer_.get()),
255 base::Bind(&SyncClient::OnUpdateComplete,
256 weak_ptr_factory_.GetWeakPtr(),
258 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
261 void SyncClient::AddTask(const SyncTasks::key_type& key,
262 const SyncTask& task,
263 const base::TimeDelta& delay) {
264 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
266 SyncTasks::iterator it = tasks_.find(key);
267 if (it != tasks_.end()) {
268 switch (it->second.state) {
270 // The same task will run, do nothing.
273 // Something has changed since the task started. Schedule rerun.
274 it->second.should_run_again = true;
280 DCHECK_EQ(PENDING, task.state);
283 base::MessageLoopProxy::current()->PostDelayedTask(
285 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
289 void SyncClient::StartTask(const SyncTasks::key_type& key) {
290 SyncTasks::iterator it = tasks_.find(key);
291 if (it == tasks_.end())
294 SyncTask* task = &it->second;
295 switch (task->state) {
297 task->state = RUNNING;
300 case RUNNING: // Do nothing.
305 void SyncClient::OnGetLocalIdsOfBacklog(
306 const std::vector<std::string>* to_fetch,
307 const std::vector<std::string>* to_update) {
308 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
310 // Give priority to upload tasks over fetch tasks, so that dirty files are
311 // uploaded as soon as possible.
312 for (size_t i = 0; i < to_update->size(); ++i) {
313 const std::string& local_id = (*to_update)[i];
314 DVLOG(1) << "Queuing to update: " << local_id;
315 AddUpdateTask(ClientContext(BACKGROUND), local_id);
318 for (size_t i = 0; i < to_fetch->size(); ++i) {
319 const std::string& local_id = (*to_fetch)[i];
320 DVLOG(1) << "Queuing to fetch: " << local_id;
321 AddFetchTaskInternal(local_id, delay_);
325 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
326 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328 for (size_t i = 0; i < local_ids->size(); ++i)
329 AddFetchTask((*local_ids)[i]);
332 void SyncClient::OnGetFileContentInitialized(
334 scoped_ptr<ResourceEntry> entry,
335 const base::FilePath& local_cache_file_path,
336 const base::Closure& cancel_download_closure) {
337 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
339 if (error != FILE_ERROR_OK)
342 const SyncTasks::key_type key(FETCH, entry->local_id());
343 SyncTasks::iterator it = tasks_.find(key);
344 DCHECK(it != tasks_.end());
346 it->second.cancel_closure = cancel_download_closure;
349 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
350 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
352 const SyncTasks::key_type key(type, local_id);
353 SyncTasks::iterator it = tasks_.find(key);
354 DCHECK(it != tasks_.end());
356 if (it->second.should_run_again) {
357 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
358 it->second.should_run_again = false;
359 it->second.task.Run();
367 void SyncClient::OnFetchFileComplete(const std::string& local_id,
369 const base::FilePath& local_path,
370 scoped_ptr<ResourceEntry> entry) {
371 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
373 if (!OnTaskComplete(FETCH, local_id))
376 if (error == FILE_ERROR_OK) {
377 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
380 case FILE_ERROR_ABORT:
381 // If user cancels download, unpin the file so that we do not sync the
383 base::PostTaskAndReplyWithResult(
384 blocking_task_runner_,
386 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
387 base::Bind(&util::EmptyFileOperationCallback));
389 case FILE_ERROR_NO_CONNECTION:
390 // Add the task again so that we'll retry once the connection is back.
391 AddFetchTaskInternal(local_id, delay_);
393 case FILE_ERROR_SERVICE_UNAVAILABLE:
394 // Add the task again so that we'll retry once the service is back.
395 AddFetchTaskInternal(local_id, long_delay_);
396 operation_observer_->OnDriveSyncError(
397 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
401 operation_observer_->OnDriveSyncError(
402 file_system::DRIVE_SYNC_ERROR_MISC,
404 LOG(WARNING) << "Failed to fetch " << local_id
405 << ": " << FileErrorToString(error);
410 void SyncClient::OnUpdateComplete(const std::string& local_id,
412 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
414 if (!OnTaskComplete(UPDATE, local_id))
417 if (error == FILE_ERROR_OK) {
418 DVLOG(1) << "Updated " << local_id;
420 // Add update tasks for child entries which may be waiting for the parent to
422 ResourceEntryVector* entries = new ResourceEntryVector;
423 base::PostTaskAndReplyWithResult(
424 blocking_task_runner_.get(),
426 base::Bind(&ResourceMetadata::ReadDirectoryById,
427 base::Unretained(metadata_), local_id, entries),
428 base::Bind(&SyncClient::AddChildUpdateTasks,
429 weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
432 case FILE_ERROR_ABORT:
433 // Ignore it because this is caused by user's cancel operations.
435 case FILE_ERROR_NO_CONNECTION:
436 // Add the task again so that we'll retry once the connection is back.
437 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
438 base::TimeDelta::FromSeconds(0));
440 case FILE_ERROR_SERVICE_UNAVAILABLE:
441 // Add the task again so that we'll retry once the service is back.
442 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
443 operation_observer_->OnDriveSyncError(
444 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
448 operation_observer_->OnDriveSyncError(
449 file_system::DRIVE_SYNC_ERROR_MISC,
451 LOG(WARNING) << "Failed to update " << local_id << ": "
452 << FileErrorToString(error);
457 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
459 if (error != FILE_ERROR_OK)
462 for (size_t i = 0; i < entries->size(); ++i) {
463 const ResourceEntry& entry = (*entries)[i];
464 if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
465 AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
466 base::TimeDelta::FromSeconds(0));
471 } // namespace internal