Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / chrome / browser / sync_file_system / drive_backend / sync_engine.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/threading/sequenced_worker_pool.h"
11 #include "base/values.h"
12 #include "chrome/browser/drive/drive_api_service.h"
13 #include "chrome/browser/drive/drive_notification_manager.h"
14 #include "chrome/browser/drive/drive_notification_manager_factory.h"
15 #include "chrome/browser/drive/drive_service_interface.h"
16 #include "chrome/browser/drive/drive_uploader.h"
17 #include "chrome/browser/extensions/extension_service.h"
18 #include "chrome/browser/profiles/profile.h"
19 #include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
20 #include "chrome/browser/signin/signin_manager_factory.h"
21 #include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
22 #include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
23 #include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
24 #include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
25 #include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
26 #include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
27 #include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
28 #include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
29 #include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
30 #include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
31 #include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
32 #include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
33 #include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
34 #include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
35 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
36 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
37 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
38 #include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
39 #include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
40 #include "chrome/browser/sync_file_system/file_status_observer.h"
41 #include "chrome/browser/sync_file_system/logger.h"
42 #include "chrome/browser/sync_file_system/syncable_file_system_util.h"
43 #include "components/signin/core/browser/profile_oauth2_token_service.h"
44 #include "components/signin/core/browser/signin_manager.h"
45 #include "content/public/browser/browser_thread.h"
46 #include "extensions/browser/extension_system.h"
47 #include "extensions/browser/extension_system_provider.h"
48 #include "extensions/browser/extensions_browser_client.h"
49 #include "extensions/common/extension.h"
50 #include "google_apis/drive/drive_api_url_generator.h"
51 #include "google_apis/drive/gdata_wapi_url_generator.h"
52 #include "webkit/common/blob/scoped_file.h"
53 #include "webkit/common/fileapi/file_system_util.h"
54
55 namespace sync_file_system {
56
57 class RemoteChangeProcessor;
58
59 namespace drive_backend {
60
61 class SyncEngine::WorkerObserver
62     : public SyncWorker::Observer {
63  public:
64   WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
65                  base::WeakPtr<SyncEngine> sync_engine)
66       : ui_task_runner_(ui_task_runner),
67         sync_engine_(sync_engine){
68   }
69
70   virtual ~WorkerObserver() {}
71
72   virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
73     ui_task_runner_->PostTask(
74         FROM_HERE,
75         base::Bind(&SyncEngine::OnPendingFileListUpdated,
76                    sync_engine_,
77                    item_count));
78   }
79
80   virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
81                                    SyncFileStatus file_status,
82                                    SyncAction sync_action,
83                                    SyncDirection direction) OVERRIDE {
84     ui_task_runner_->PostTask(
85         FROM_HERE,
86         base::Bind(&SyncEngine::OnFileStatusChanged,
87                    sync_engine_,
88                    url, file_status, sync_action, direction));
89   }
90
91
92   virtual void UpdateServiceState(RemoteServiceState state,
93                                   const std::string& description) OVERRIDE {
94     ui_task_runner_->PostTask(
95         FROM_HERE,
96         base::Bind(&SyncEngine::UpdateServiceState,
97                    sync_engine_, state, description));
98   }
99
100  private:
101   scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
102   base::WeakPtr<SyncEngine> sync_engine_;
103
104   DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
105 };
106
107 namespace {
108
109 void EmptyStatusCallback(SyncStatusCode status) {}
110
111 }  // namespace
112
113 scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
114     content::BrowserContext* context) {
115   scoped_refptr<base::SequencedWorkerPool> worker_pool(
116       content::BrowserThread::GetBlockingPool());
117   scoped_refptr<base::SequencedTaskRunner> drive_task_runner(
118       worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
119           worker_pool->GetSequenceToken(),
120           base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
121
122   Profile* profile = Profile::FromBrowserContext(context);
123   ProfileOAuth2TokenService* token_service =
124       ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
125   scoped_ptr<drive::DriveServiceInterface> drive_service(
126       new drive::DriveAPIService(
127           token_service,
128           context->GetRequestContext(),
129           drive_task_runner.get(),
130           GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
131           GURL(google_apis::DriveApiUrlGenerator::
132                kBaseDownloadUrlForProduction),
133           GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
134           std::string() /* custom_user_agent */));
135   SigninManagerBase* signin_manager =
136       SigninManagerFactory::GetForProfile(profile);
137   drive_service->Initialize(signin_manager->GetAuthenticatedAccountId());
138
139   scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
140       new drive::DriveUploader(drive_service.get(), drive_task_runner.get()));
141
142   drive::DriveNotificationManager* notification_manager =
143       drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
144   ExtensionService* extension_service =
145       extensions::ExtensionSystem::Get(context)->extension_service();
146
147   scoped_refptr<base::SequencedTaskRunner> file_task_runner(
148       worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
149           worker_pool->GetSequenceToken(),
150           base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
151
152   // TODO(peria): Create another task runner to manage SyncWorker.
153   scoped_refptr<base::SingleThreadTaskRunner>
154       worker_task_runner = base::MessageLoopProxy::current();
155
156   scoped_ptr<drive_backend::SyncEngine> sync_engine(
157       new SyncEngine(drive_service.Pass(),
158                      drive_uploader.Pass(),
159                      worker_task_runner,
160                      notification_manager,
161                      extension_service,
162                      signin_manager));
163   sync_engine->Initialize(
164       GetSyncFileSystemDir(context->GetPath()),
165       file_task_runner.get(),
166       NULL);
167
168   return sync_engine.Pass();
169 }
170
171 void SyncEngine::AppendDependsOnFactories(
172     std::set<BrowserContextKeyedServiceFactory*>* factories) {
173   DCHECK(factories);
174   factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
175   factories->insert(SigninManagerFactory::GetInstance());
176   factories->insert(
177       extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
178 }
179
180 SyncEngine::~SyncEngine() {
181   net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
182   GetDriveService()->RemoveObserver(this);
183   if (notification_manager_)
184     notification_manager_->RemoveObserver(this);
185
186   // TODO(tzik): Destroy |sync_worker_| and |worker_observer_| on the worker.
187 }
188
189 void SyncEngine::Initialize(const base::FilePath& base_dir,
190                             base::SequencedTaskRunner* file_task_runner,
191                             leveldb::Env* env_override) {
192   // DriveServiceWrapper and DriveServiceOnWorker relay communications
193   // between DriveService and syncers in SyncWorker.
194   scoped_ptr<drive::DriveServiceInterface>
195       drive_service_on_worker(
196           new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
197                                    base::MessageLoopProxy::current(),
198                                    worker_task_runner_));
199   scoped_ptr<drive::DriveUploaderInterface>
200       drive_uploader_on_worker(
201           new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
202                                     base::MessageLoopProxy::current(),
203                                     worker_task_runner_));
204   scoped_ptr<SyncEngineContext>
205       sync_engine_context(
206           new SyncEngineContext(drive_service_on_worker.Pass(),
207                                 drive_uploader_on_worker.Pass(),
208                                 base::MessageLoopProxy::current(),
209                                 worker_task_runner_,
210                                 file_task_runner));
211
212   worker_observer_.reset(
213       new WorkerObserver(base::MessageLoopProxy::current(),
214                          weak_ptr_factory_.GetWeakPtr()));
215
216   base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
217   if (extension_service_)
218     extension_service_weak_ptr = extension_service_->AsWeakPtr();
219
220   // TODO(peria): Use PostTask on |worker_task_runner_| to call this function.
221   sync_worker_ = SyncWorker::CreateOnWorker(
222       base_dir,
223       worker_observer_.get(),
224       extension_service_weak_ptr,
225       sync_engine_context.Pass(),
226       env_override);
227
228   if (notification_manager_)
229     notification_manager_->AddObserver(this);
230   GetDriveService()->AddObserver(this);
231   net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
232 }
233
234 void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
235   service_observers_.AddObserver(observer);
236 }
237
238 void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
239   file_status_observers_.AddObserver(observer);
240 }
241
242 void SyncEngine::RegisterOrigin(
243     const GURL& origin, const SyncStatusCallback& callback) {
244   worker_task_runner_->PostTask(
245       FROM_HERE,
246       base::Bind(&SyncWorker::RegisterOrigin,
247                  base::Unretained(sync_worker_.get()),
248                  origin,
249                  RelayCallbackToCurrentThread(
250                      FROM_HERE, callback)));
251 }
252
253 void SyncEngine::EnableOrigin(
254     const GURL& origin, const SyncStatusCallback& callback) {
255   worker_task_runner_->PostTask(
256       FROM_HERE,
257       base::Bind(&SyncWorker::EnableOrigin,
258                  base::Unretained(sync_worker_.get()),
259                  origin,
260                  RelayCallbackToCurrentThread(
261                      FROM_HERE, callback)));
262 }
263
264 void SyncEngine::DisableOrigin(
265     const GURL& origin, const SyncStatusCallback& callback) {
266   worker_task_runner_->PostTask(
267       FROM_HERE,
268       base::Bind(&SyncWorker::DisableOrigin,
269                  base::Unretained(sync_worker_.get()),
270                  origin,
271                  RelayCallbackToCurrentThread(
272                      FROM_HERE, callback)));
273 }
274
275 void SyncEngine::UninstallOrigin(
276     const GURL& origin,
277     UninstallFlag flag,
278     const SyncStatusCallback& callback) {
279   worker_task_runner_->PostTask(
280       FROM_HERE,
281       base::Bind(&SyncWorker::UninstallOrigin,
282                  base::Unretained(sync_worker_.get()),
283                  origin, flag,
284                  RelayCallbackToCurrentThread(
285                      FROM_HERE, callback)));
286 }
287
288 void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
289   worker_task_runner_->PostTask(
290       FROM_HERE,
291       base::Bind(&SyncWorker::ProcessRemoteChange,
292                  base::Unretained(sync_worker_.get()),
293                  RelayCallbackToCurrentThread(
294                      FROM_HERE, callback)));
295 }
296
297 void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
298   remote_change_processor_ = processor;
299   remote_change_processor_wrapper_.reset(
300       new RemoteChangeProcessorWrapper(processor));
301
302   remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
303       remote_change_processor_wrapper_->AsWeakPtr(),
304       base::MessageLoopProxy::current(), /* ui_task_runner */
305       worker_task_runner_));
306
307   worker_task_runner_->PostTask(
308       FROM_HERE,
309       base::Bind(&SyncWorker::SetRemoteChangeProcessor,
310                  base::Unretained(sync_worker_.get()),
311                  remote_change_processor_on_worker_.get()));
312 }
313
314 LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
315   return this;
316 }
317
318 bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) {
319   // TODO(tzik): Implement this before we support manual conflict resolution.
320   return false;
321 }
322
323 RemoteServiceState SyncEngine::GetCurrentState() const {
324   // TODO(peria): Post task
325   return sync_worker_->GetCurrentState();
326 }
327
328 void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) {
329   // TODO(peria): Make this route asynchronous.
330   sync_worker_->GetOriginStatusMap(status_map);
331 }
332
333 scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) {
334   // TODO(peria): Make this route asynchronous.
335   return sync_worker_->DumpFiles(origin);
336 }
337
338 scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() {
339   // TODO(peria): Make this route asynchronous.
340   return sync_worker_->DumpDatabase();
341 }
342
343 void SyncEngine::SetSyncEnabled(bool enabled) {
344   worker_task_runner_->PostTask(
345       FROM_HERE,
346       base::Bind(&SyncWorker::SetSyncEnabled,
347                  base::Unretained(sync_worker_.get()),
348                  enabled));
349 }
350
351 void SyncEngine::UpdateSyncEnabled(bool enabled) {
352   const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled";
353   FOR_EACH_OBSERVER(
354       Observer, service_observers_,
355       OnRemoteServiceStateUpdated(GetCurrentState(), status_message));
356 }
357
358 SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy(
359     ConflictResolutionPolicy policy) {
360   // TODO(peria): Make this route asynchronous.
361   return sync_worker_->SetDefaultConflictResolutionPolicy(policy);
362 }
363
364 SyncStatusCode SyncEngine::SetConflictResolutionPolicy(
365     const GURL& origin,
366     ConflictResolutionPolicy policy) {
367   // TODO(peria): Make this route asynchronous.
368   return sync_worker_->SetConflictResolutionPolicy(origin, policy);
369 }
370
371 ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy()
372     const {
373   // TODO(peria): Make this route asynchronous.
374   return sync_worker_->GetDefaultConflictResolutionPolicy();
375 }
376
377 ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy(
378     const GURL& origin) const {
379   // TODO(peria): Make this route asynchronous.
380   return sync_worker_->GetConflictResolutionPolicy(origin);
381 }
382
383 void SyncEngine::GetRemoteVersions(
384     const fileapi::FileSystemURL& url,
385     const RemoteVersionsCallback& callback) {
386   // TODO(tzik): Implement this before we support manual conflict resolution.
387   callback.Run(SYNC_STATUS_FAILED, std::vector<Version>());
388 }
389
390 void SyncEngine::DownloadRemoteVersion(
391     const fileapi::FileSystemURL& url,
392     const std::string& version_id,
393     const DownloadVersionCallback& callback) {
394   // TODO(tzik): Implement this before we support manual conflict resolution.
395   callback.Run(SYNC_STATUS_FAILED, webkit_blob::ScopedFile());
396 }
397
398 void SyncEngine::PromoteDemotedChanges() {
399   MetadataDatabase* metadata_db = GetMetadataDatabase();
400   if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) {
401     metadata_db->PromoteLowerPriorityTrackersToNormal();
402     FOR_EACH_OBSERVER(
403         Observer,
404         service_observers_,
405         OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker()));
406   }
407 }
408
409 void SyncEngine::ApplyLocalChange(
410     const FileChange& local_change,
411     const base::FilePath& local_path,
412     const SyncFileMetadata& local_metadata,
413     const fileapi::FileSystemURL& url,
414     const SyncStatusCallback& callback) {
415   worker_task_runner_->PostTask(
416       FROM_HERE,
417       base::Bind(&SyncWorker::ApplyLocalChange,
418                  base::Unretained(sync_worker_.get()),
419                  local_change,
420                  local_path,
421                  local_metadata,
422                  url,
423                  RelayCallbackToCurrentThread(
424                      FROM_HERE, callback)));
425 }
426
427 SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() {
428   // TODO(peria): Post task
429   return sync_worker_->GetSyncTaskManager();
430 }
431
432 void SyncEngine::OnNotificationReceived() {
433   worker_task_runner_->PostTask(
434       FROM_HERE,
435       base::Bind(&SyncWorker::OnNotificationReceived,
436                  base::Unretained(sync_worker_.get())));
437 }
438
439 void SyncEngine::OnPushNotificationEnabled(bool) {}
440
441 void SyncEngine::OnReadyToSendRequests() {
442   const std::string account_id =
443       signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : "";
444
445   worker_task_runner_->PostTask(
446       FROM_HERE,
447       base::Bind(&SyncWorker::OnReadyToSendRequests,
448                  base::Unretained(sync_worker_.get()),
449                  account_id));
450 }
451
452 void SyncEngine::OnRefreshTokenInvalid() {
453   worker_task_runner_->PostTask(
454       FROM_HERE,
455       base::Bind(&SyncWorker::OnRefreshTokenInvalid,
456                  base::Unretained(sync_worker_.get())));
457 }
458
459 void SyncEngine::OnNetworkChanged(
460     net::NetworkChangeNotifier::ConnectionType type) {
461   worker_task_runner_->PostTask(
462       FROM_HERE,
463       base::Bind(&SyncWorker::OnNetworkChanged,
464                  base::Unretained(sync_worker_.get()),
465                  type));
466 }
467
468 drive::DriveServiceInterface* SyncEngine::GetDriveService() {
469   return drive_service_.get();
470 }
471
472 drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() {
473   return drive_uploader_.get();
474 }
475
476 MetadataDatabase* SyncEngine::GetMetadataDatabase() {
477   // TODO(peria): Post task
478   return sync_worker_->GetMetadataDatabase();
479 }
480
481 SyncEngine::SyncEngine(
482     scoped_ptr<drive::DriveServiceInterface> drive_service,
483     scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
484     base::SequencedTaskRunner* worker_task_runner,
485     drive::DriveNotificationManager* notification_manager,
486     ExtensionServiceInterface* extension_service,
487     SigninManagerBase* signin_manager)
488     : drive_service_(drive_service.Pass()),
489       drive_service_wrapper_(new DriveServiceWrapper(drive_service_.get())),
490       drive_uploader_(drive_uploader.Pass()),
491       drive_uploader_wrapper_(new DriveUploaderWrapper(drive_uploader_.get())),
492       notification_manager_(notification_manager),
493       extension_service_(extension_service),
494       signin_manager_(signin_manager),
495       worker_task_runner_(worker_task_runner),
496       weak_ptr_factory_(this) {}
497
498 void SyncEngine::OnPendingFileListUpdated(int item_count) {
499   FOR_EACH_OBSERVER(
500       Observer,
501       service_observers_,
502       OnRemoteChangeQueueUpdated(item_count));
503 }
504
505 void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
506                                      SyncFileStatus file_status,
507                                      SyncAction sync_action,
508                                      SyncDirection direction) {
509   FOR_EACH_OBSERVER(FileStatusObserver,
510                     file_status_observers_,
511                     OnFileStatusChanged(
512                         url, file_status, sync_action, direction));
513 }
514
515 void SyncEngine::UpdateServiceState(RemoteServiceState state,
516                                     const std::string& description) {
517   FOR_EACH_OBSERVER(
518       Observer, service_observers_,
519       OnRemoteServiceStateUpdated(state, description));
520 }
521
522 void SyncEngine::UpdateRegisteredApps() {
523   if (!extension_service_)
524     return;
525
526   MetadataDatabase* metadata_db = GetMetadataDatabase();
527   DCHECK(metadata_db);
528   std::vector<std::string> app_ids;
529   metadata_db->GetRegisteredAppIDs(&app_ids);
530
531   // Update the status of every origin using status from ExtensionService.
532   for (std::vector<std::string>::const_iterator itr = app_ids.begin();
533        itr != app_ids.end(); ++itr) {
534     const std::string& app_id = *itr;
535     GURL origin =
536         extensions::Extension::GetBaseURLFromExtensionId(app_id);
537     if (!extension_service_->GetInstalledExtension(app_id)) {
538       // Extension has been uninstalled.
539       // (At this stage we can't know if it was unpacked extension or not,
540       // so just purge the remote folder.)
541       UninstallOrigin(origin,
542                       RemoteFileSyncService::UNINSTALL_AND_PURGE_REMOTE,
543                       base::Bind(&EmptyStatusCallback));
544       continue;
545     }
546     FileTracker tracker;
547     if (!metadata_db->FindAppRootTracker(app_id, &tracker)) {
548       // App will register itself on first run.
549       continue;
550     }
551     bool is_app_enabled = extension_service_->IsExtensionEnabled(app_id);
552     bool is_app_root_tracker_enabled =
553         tracker.tracker_kind() == TRACKER_KIND_APP_ROOT;
554     if (is_app_enabled && !is_app_root_tracker_enabled)
555       EnableOrigin(origin, base::Bind(&EmptyStatusCallback));
556     else if (!is_app_enabled && is_app_root_tracker_enabled)
557       DisableOrigin(origin, base::Bind(&EmptyStatusCallback));
558   }
559 }
560
561 }  // namespace drive_backend
562 }  // namespace sync_file_system