432ea35317f30641c4c466523329aa8c09675ff7
[platform/framework/web/crosswalk.git] / src / chrome / browser / chromeos / drive / sync_client.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/drive/sync_client.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/operation_delegate.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/job_scheduler.h"
17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
18 #include "content/public/browser/browser_thread.h"
19 #include "google_apis/drive/task_util.h"
20
21 using content::BrowserThread;
22
23 namespace drive {
24 namespace internal {
25
26 namespace {
27
28 // The delay constant is used to delay processing a sync task. We should not
29 // process SyncTasks immediately for the following reasons:
30 //
31 // 1) For fetching, the user may accidentally click on "Make available
32 //    offline" checkbox on a file, and immediately cancel it in a second.
33 //    It's a waste to fetch the file in this case.
34 //
35 // 2) For uploading, file writing via HTML5 file system API is performed in
36 //    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
37 //    shouldn't start uploading right after the step 1). Besides, the user
38 //    may edit the same file repeatedly in a short period of time.
39 //
40 // TODO(satorux): We should find a way to handle the upload case more nicely,
41 // and shorten the delay. crbug.com/134774
42 const int kDelaySeconds = 1;
43
44 // The delay constant is used to delay retrying a sync task on server errors.
45 const int kLongDelaySeconds = 600;
46
47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
48 // fetched (not present locally), to |to_update| if the file needs update.
49 void CollectBacklog(ResourceMetadata* metadata,
50                     std::vector<std::string>* to_fetch,
51                     std::vector<std::string>* to_update) {
52   DCHECK(to_fetch);
53   DCHECK(to_update);
54
55   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
56   for (; !it->IsAtEnd(); it->Advance()) {
57     const std::string& local_id = it->GetID();
58     const ResourceEntry& entry = it->GetValue();
59     if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
60       to_update->push_back(local_id);
61       continue;
62     }
63
64     bool should_update = false;
65     switch (entry.metadata_edit_state()) {
66       case ResourceEntry::CLEAN:
67         break;
68       case ResourceEntry::SYNCING:
69       case ResourceEntry::DIRTY:
70         should_update = true;
71         break;
72     }
73
74     if (entry.file_specific_info().cache_state().is_pinned() &&
75         !entry.file_specific_info().cache_state().is_present())
76       to_fetch->push_back(local_id);
77
78     if (entry.file_specific_info().cache_state().is_dirty())
79       should_update = true;
80
81     if (should_update)
82       to_update->push_back(local_id);
83   }
84   DCHECK(!it->HasError());
85 }
86
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
89                               FileCache* cache,
90                               std::vector<std::string>* local_ids) {
91   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
92   for (; !it->IsAtEnd(); it->Advance()) {
93     const ResourceEntry& entry = it->GetValue();
94     const FileCacheEntry& cache_state =
95         entry.file_specific_info().cache_state();
96     const std::string& local_id = it->GetID();
97     if (!cache_state.is_pinned() || !cache_state.is_present())
98       continue;
99
100     // If MD5s don't match, it indicates the local cache file is stale, unless
101     // the file is dirty (the MD5 is "local"). We should never re-fetch the
102     // file when we have a locally modified version.
103     if (entry.file_specific_info().md5() == cache_state.md5() ||
104         cache_state.is_dirty())
105       continue;
106
107     FileError error = cache->Remove(local_id);
108     if (error != FILE_ERROR_OK) {
109       LOG(WARNING) << "Failed to remove cache entry: " << local_id;
110       continue;
111     }
112
113     error = cache->Pin(local_id);
114     if (error != FILE_ERROR_OK) {
115       LOG(WARNING) << "Failed to pin cache entry: " << local_id;
116       continue;
117     }
118
119     local_ids->push_back(local_id);
120   }
121   DCHECK(!it->HasError());
122 }
123
124 // Gets the parent entry of the entry specified by the ID.
125 FileError GetParentResourceEntry(ResourceMetadata* metadata,
126                                  const std::string& local_id,
127                                  ResourceEntry* parent) {
128   ResourceEntry entry;
129   FileError error = metadata->GetResourceEntryById(local_id, &entry);
130   if (error != FILE_ERROR_OK)
131     return error;
132   return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
133 }
134
135 }  // namespace
136
137 SyncClient::SyncTask::SyncTask()
138     : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
139 SyncClient::SyncTask::~SyncTask() {}
140
141 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
142                        file_system::OperationDelegate* delegate,
143                        JobScheduler* scheduler,
144                        ResourceMetadata* metadata,
145                        FileCache* cache,
146                        LoaderController* loader_controller,
147                        const base::FilePath& temporary_file_directory)
148     : blocking_task_runner_(blocking_task_runner),
149       operation_delegate_(delegate),
150       metadata_(metadata),
151       cache_(cache),
152       download_operation_(new file_system::DownloadOperation(
153           blocking_task_runner,
154           delegate,
155           scheduler,
156           metadata,
157           cache,
158           temporary_file_directory)),
159       entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
160                                                        delegate,
161                                                        scheduler,
162                                                        metadata,
163                                                        cache,
164                                                        loader_controller)),
165       delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
166       long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
167       weak_ptr_factory_(this) {
168   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
169 }
170
171 SyncClient::~SyncClient() {
172   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
173 }
174
175 void SyncClient::StartProcessingBacklog() {
176   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
177
178   std::vector<std::string>* to_fetch = new std::vector<std::string>;
179   std::vector<std::string>* to_update = new std::vector<std::string>;
180   blocking_task_runner_->PostTaskAndReply(
181       FROM_HERE,
182       base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
183       base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
184                  weak_ptr_factory_.GetWeakPtr(),
185                  base::Owned(to_fetch),
186                  base::Owned(to_update)));
187 }
188
189 void SyncClient::StartCheckingExistingPinnedFiles() {
190   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
191
192   std::vector<std::string>* local_ids = new std::vector<std::string>;
193   blocking_task_runner_->PostTaskAndReply(
194       FROM_HERE,
195       base::Bind(&CheckExistingPinnedFiles,
196                  metadata_,
197                  cache_,
198                  local_ids),
199       base::Bind(&SyncClient::AddFetchTasks,
200                  weak_ptr_factory_.GetWeakPtr(),
201                  base::Owned(local_ids)));
202 }
203
204 void SyncClient::AddFetchTask(const std::string& local_id) {
205   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
206   AddFetchTaskInternal(local_id, delay_);
207 }
208
209 void SyncClient::RemoveFetchTask(const std::string& local_id) {
210   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
211
212   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
213   if (it == tasks_.end())
214     return;
215
216   SyncTask* task = &it->second;
217   switch (task->state) {
218     case SUSPENDED:
219     case PENDING:
220       OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
221       break;
222     case RUNNING:
223       if (!task->cancel_closure.is_null())
224         task->cancel_closure.Run();
225       break;
226   }
227 }
228
229 void SyncClient::AddUpdateTask(const ClientContext& context,
230                                const std::string& local_id) {
231   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
232   AddUpdateTaskInternal(context, local_id, delay_);
233 }
234
235 bool SyncClient:: WaitForUpdateTaskToComplete(
236     const std::string& local_id,
237     const FileOperationCallback& callback) {
238   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239
240   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id));
241   if (it == tasks_.end())
242     return false;
243
244   SyncTask* task = &it->second;
245   task->waiting_callbacks.push_back(callback);
246   return true;
247 }
248
249 base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
250                                            const ClientContext& context) {
251   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
252   return download_operation_->EnsureFileDownloadedByLocalId(
253       local_id,
254       context,
255       GetFileContentInitializedCallback(),
256       google_apis::GetContentCallback(),
257       base::Bind(&SyncClient::OnFetchFileComplete,
258                  weak_ptr_factory_.GetWeakPtr(),
259                  local_id));
260 }
261
262 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
263                                       const base::TimeDelta& delay) {
264   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
265
266   SyncTask task;
267   task.state = PENDING;
268   task.context = ClientContext(BACKGROUND);
269   task.task = base::Bind(&SyncClient::PerformFetchTask,
270                          base::Unretained(this),
271                          local_id);
272   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
273 }
274
275 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
276                                             const ClientContext& context) {
277   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
278   entry_update_performer_->UpdateEntry(
279       local_id,
280       context,
281       base::Bind(&SyncClient::OnTaskComplete,
282                  weak_ptr_factory_.GetWeakPtr(),
283                  UPDATE,
284                  local_id));
285   return base::Closure();
286 }
287
288 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
289                                        const std::string& local_id,
290                                        const base::TimeDelta& delay) {
291   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
292
293   SyncTask task;
294   task.state = PENDING;
295   task.context = context;
296   task.task = base::Bind(&SyncClient::PerformUpdateTask,
297                          base::Unretained(this),
298                          local_id);
299   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
300 }
301
302 void SyncClient::AddTask(const SyncTasks::key_type& key,
303                          const SyncTask& task,
304                          const base::TimeDelta& delay) {
305   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
306
307   SyncTasks::iterator it = tasks_.find(key);
308   if (it != tasks_.end()) {
309     switch (it->second.state) {
310       case SUSPENDED:
311         // Activate the task.
312         it->second.state = PENDING;
313         break;
314       case PENDING:
315         // The same task will run, do nothing.
316         return;
317       case RUNNING:
318         // Something has changed since the task started. Schedule rerun.
319         it->second.should_run_again = true;
320         return;
321     }
322   } else {
323     tasks_[key] = task;
324   }
325   DCHECK_EQ(PENDING, task.state);
326   base::MessageLoopProxy::current()->PostDelayedTask(
327       FROM_HERE,
328       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
329       delay);
330 }
331
332 void SyncClient::StartTask(const SyncTasks::key_type& key) {
333   ResourceEntry* parent = new ResourceEntry;
334   base::PostTaskAndReplyWithResult(
335       blocking_task_runner_.get(),
336       FROM_HERE,
337       base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
338       base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
339                  weak_ptr_factory_.GetWeakPtr(),
340                  key,
341                  base::Owned(parent)));
342 }
343
344 void SyncClient::StartTaskAfterGetParentResourceEntry(
345     const SyncTasks::key_type& key,
346     const ResourceEntry* parent,
347     FileError error) {
348   const SyncType type = key.first;
349   const std::string& local_id = key.second;
350   SyncTasks::iterator it = tasks_.find(key);
351   if (it == tasks_.end())
352     return;
353
354   SyncTask* task = &it->second;
355   switch (task->state) {
356     case SUSPENDED:
357     case PENDING:
358       break;
359     case RUNNING:  // Do nothing.
360       return;
361   }
362
363   if (error != FILE_ERROR_OK) {
364     OnTaskComplete(type, local_id, error);
365     return;
366   }
367
368   if (type == UPDATE &&
369       parent->resource_id().empty() &&
370       parent->local_id() != util::kDriveTrashDirLocalId) {
371     // Parent entry needs to be synced to get a resource ID.
372     // Suspend the task and register it as a dependent task of the parent.
373     const SyncTasks::key_type key_parent(type, parent->local_id());
374     SyncTasks::iterator it_parent = tasks_.find(key_parent);
375     if (it_parent == tasks_.end()) {
376       OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
377       LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
378                    << local_id << ", parent_id = " << parent->local_id();
379       return;
380     }
381     task->state = SUSPENDED;
382     it_parent->second.dependent_tasks.push_back(key);
383     return;
384   }
385
386   // Run the task.
387   task->state = RUNNING;
388   task->cancel_closure = task->task.Run(task->context);
389 }
390
391 void SyncClient::OnGetLocalIdsOfBacklog(
392     const std::vector<std::string>* to_fetch,
393     const std::vector<std::string>* to_update) {
394   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
395
396   // Give priority to upload tasks over fetch tasks, so that dirty files are
397   // uploaded as soon as possible.
398   for (size_t i = 0; i < to_update->size(); ++i) {
399     const std::string& local_id = (*to_update)[i];
400     DVLOG(1) << "Queuing to update: " << local_id;
401     AddUpdateTask(ClientContext(BACKGROUND), local_id);
402   }
403
404   for (size_t i = 0; i < to_fetch->size(); ++i) {
405     const std::string& local_id = (*to_fetch)[i];
406     DVLOG(1) << "Queuing to fetch: " << local_id;
407     AddFetchTaskInternal(local_id, delay_);
408   }
409 }
410
411 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
412   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
413
414   for (size_t i = 0; i < local_ids->size(); ++i)
415     AddFetchTask((*local_ids)[i]);
416 }
417
418 void SyncClient::OnTaskComplete(SyncType type,
419                                 const std::string& local_id,
420                                 FileError error) {
421   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
422
423   const SyncTasks::key_type key(type, local_id);
424   SyncTasks::iterator it = tasks_.find(key);
425   DCHECK(it != tasks_.end());
426
427   base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
428
429   switch (error) {
430     case FILE_ERROR_OK:
431       DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
432       break;
433     case FILE_ERROR_ABORT:
434       // Ignore it because this is caused by user's cancel operations.
435       break;
436     case FILE_ERROR_NO_CONNECTION:
437       // Run the task again so that we'll retry once the connection is back.
438       it->second.should_run_again = true;
439       it->second.context = ClientContext(BACKGROUND);
440       break;
441     case FILE_ERROR_SERVICE_UNAVAILABLE:
442       // Run the task again so that we'll retry once the service is back.
443       it->second.should_run_again = true;
444       it->second.context = ClientContext(BACKGROUND);
445       retry_delay = long_delay_;
446       operation_delegate_->OnDriveSyncError(
447           file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
448       break;
449     default:
450       operation_delegate_->OnDriveSyncError(
451           file_system::DRIVE_SYNC_ERROR_MISC, local_id);
452       LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
453                    << ": " << FileErrorToString(error);
454   }
455
456   for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) {
457     base::MessageLoopProxy::current()->PostTask(
458         FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error));
459   }
460   it->second.waiting_callbacks.clear();
461
462   if (it->second.should_run_again) {
463     DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
464     it->second.state = PENDING;
465     it->second.should_run_again = false;
466     base::MessageLoopProxy::current()->PostDelayedTask(
467         FROM_HERE,
468         base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
469         retry_delay);
470   } else {
471     for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
472       StartTask(it->second.dependent_tasks[i]);
473     tasks_.erase(it);
474   }
475 }
476
477 void SyncClient::OnFetchFileComplete(const std::string& local_id,
478                                      FileError error,
479                                      const base::FilePath& local_path,
480                                      scoped_ptr<ResourceEntry> entry) {
481   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
482   OnTaskComplete(FETCH, local_id, error);
483   if (error == FILE_ERROR_ABORT) {
484     // If user cancels download, unpin the file so that we do not sync the file
485     // again.
486     base::PostTaskAndReplyWithResult(
487         blocking_task_runner_.get(),
488         FROM_HERE,
489         base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
490         base::Bind(&util::EmptyFileOperationCallback));
491   }
492 }
493
494 }  // namespace internal
495 }  // namespace drive