1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
10 #include "base/threading/sequenced_worker_pool.h"
11 #include "base/values.h"
12 #include "chrome/browser/drive/drive_api_service.h"
13 #include "chrome/browser/drive/drive_notification_manager.h"
14 #include "chrome/browser/drive/drive_notification_manager_factory.h"
15 #include "chrome/browser/drive/drive_service_interface.h"
16 #include "chrome/browser/drive/drive_uploader.h"
17 #include "chrome/browser/extensions/extension_service.h"
18 #include "chrome/browser/profiles/profile.h"
19 #include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
20 #include "chrome/browser/signin/signin_manager_factory.h"
21 #include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
22 #include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
23 #include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
24 #include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
25 #include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
26 #include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
27 #include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
28 #include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
29 #include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
30 #include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
31 #include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
32 #include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
33 #include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
34 #include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
35 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
36 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
37 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
38 #include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
39 #include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
40 #include "chrome/browser/sync_file_system/file_status_observer.h"
41 #include "chrome/browser/sync_file_system/logger.h"
42 #include "chrome/browser/sync_file_system/syncable_file_system_util.h"
43 #include "components/signin/core/browser/profile_oauth2_token_service.h"
44 #include "components/signin/core/browser/signin_manager.h"
45 #include "content/public/browser/browser_thread.h"
46 #include "extensions/browser/extension_system.h"
47 #include "extensions/browser/extension_system_provider.h"
48 #include "extensions/browser/extensions_browser_client.h"
49 #include "extensions/common/extension.h"
50 #include "google_apis/drive/drive_api_url_generator.h"
51 #include "google_apis/drive/gdata_wapi_url_generator.h"
52 #include "webkit/common/blob/scoped_file.h"
53 #include "webkit/common/fileapi/file_system_util.h"
55 namespace sync_file_system {
57 class RemoteChangeProcessor;
59 namespace drive_backend {
61 class SyncEngine::WorkerObserver
62 : public SyncWorker::Observer {
64 WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
65 base::WeakPtr<SyncEngine> sync_engine)
66 : ui_task_runner_(ui_task_runner),
67 sync_engine_(sync_engine){
70 virtual ~WorkerObserver() {}
72 virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
73 ui_task_runner_->PostTask(
75 base::Bind(&SyncEngine::OnPendingFileListUpdated,
80 virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
81 SyncFileStatus file_status,
82 SyncAction sync_action,
83 SyncDirection direction) OVERRIDE {
84 ui_task_runner_->PostTask(
86 base::Bind(&SyncEngine::OnFileStatusChanged,
88 url, file_status, sync_action, direction));
92 virtual void UpdateServiceState(RemoteServiceState state,
93 const std::string& description) OVERRIDE {
94 ui_task_runner_->PostTask(
96 base::Bind(&SyncEngine::UpdateServiceState,
97 sync_engine_, state, description));
101 scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
102 base::WeakPtr<SyncEngine> sync_engine_;
104 DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
109 void EmptyStatusCallback(SyncStatusCode status) {}
113 scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
114 content::BrowserContext* context) {
115 scoped_refptr<base::SequencedWorkerPool> worker_pool(
116 content::BrowserThread::GetBlockingPool());
117 scoped_refptr<base::SequencedTaskRunner> drive_task_runner(
118 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
119 worker_pool->GetSequenceToken(),
120 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
122 Profile* profile = Profile::FromBrowserContext(context);
123 ProfileOAuth2TokenService* token_service =
124 ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
125 scoped_ptr<drive::DriveServiceInterface> drive_service(
126 new drive::DriveAPIService(
128 context->GetRequestContext(),
129 drive_task_runner.get(),
130 GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
131 GURL(google_apis::DriveApiUrlGenerator::
132 kBaseDownloadUrlForProduction),
133 GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
134 std::string() /* custom_user_agent */));
135 SigninManagerBase* signin_manager =
136 SigninManagerFactory::GetForProfile(profile);
137 drive_service->Initialize(signin_manager->GetAuthenticatedAccountId());
139 scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
140 new drive::DriveUploader(drive_service.get(), drive_task_runner.get()));
142 drive::DriveNotificationManager* notification_manager =
143 drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
144 ExtensionService* extension_service =
145 extensions::ExtensionSystem::Get(context)->extension_service();
147 scoped_refptr<base::SequencedTaskRunner> file_task_runner(
148 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
149 worker_pool->GetSequenceToken(),
150 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
152 // TODO(peria): Create another task runner to manage SyncWorker.
153 scoped_refptr<base::SingleThreadTaskRunner>
154 worker_task_runner = base::MessageLoopProxy::current();
156 scoped_ptr<drive_backend::SyncEngine> sync_engine(
157 new SyncEngine(drive_service.Pass(),
158 drive_uploader.Pass(),
160 notification_manager,
163 sync_engine->Initialize(
164 GetSyncFileSystemDir(context->GetPath()),
165 file_task_runner.get(),
168 return sync_engine.Pass();
171 void SyncEngine::AppendDependsOnFactories(
172 std::set<BrowserContextKeyedServiceFactory*>* factories) {
174 factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
175 factories->insert(SigninManagerFactory::GetInstance());
177 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
180 SyncEngine::~SyncEngine() {
181 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
182 GetDriveService()->RemoveObserver(this);
183 if (notification_manager_)
184 notification_manager_->RemoveObserver(this);
186 // TODO(tzik): Destroy |sync_worker_| and |worker_observer_| on the worker.
189 void SyncEngine::Initialize(const base::FilePath& base_dir,
190 base::SequencedTaskRunner* file_task_runner,
191 leveldb::Env* env_override) {
192 // DriveServiceWrapper and DriveServiceOnWorker relay communications
193 // between DriveService and syncers in SyncWorker.
194 scoped_ptr<drive::DriveServiceInterface>
195 drive_service_on_worker(
196 new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
197 base::MessageLoopProxy::current(),
198 worker_task_runner_));
199 scoped_ptr<drive::DriveUploaderInterface>
200 drive_uploader_on_worker(
201 new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
202 base::MessageLoopProxy::current(),
203 worker_task_runner_));
204 scoped_ptr<SyncEngineContext>
206 new SyncEngineContext(drive_service_on_worker.Pass(),
207 drive_uploader_on_worker.Pass(),
208 base::MessageLoopProxy::current(),
212 worker_observer_.reset(
213 new WorkerObserver(base::MessageLoopProxy::current(),
214 weak_ptr_factory_.GetWeakPtr()));
216 base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
217 if (extension_service_)
218 extension_service_weak_ptr = extension_service_->AsWeakPtr();
220 // TODO(peria): Use PostTask on |worker_task_runner_| to call this function.
221 sync_worker_ = SyncWorker::CreateOnWorker(
223 worker_observer_.get(),
224 extension_service_weak_ptr,
225 sync_engine_context.Pass(),
228 if (notification_manager_)
229 notification_manager_->AddObserver(this);
230 GetDriveService()->AddObserver(this);
231 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
234 void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
235 service_observers_.AddObserver(observer);
238 void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
239 file_status_observers_.AddObserver(observer);
242 void SyncEngine::RegisterOrigin(
243 const GURL& origin, const SyncStatusCallback& callback) {
244 worker_task_runner_->PostTask(
246 base::Bind(&SyncWorker::RegisterOrigin,
247 base::Unretained(sync_worker_.get()),
249 RelayCallbackToCurrentThread(
250 FROM_HERE, callback)));
253 void SyncEngine::EnableOrigin(
254 const GURL& origin, const SyncStatusCallback& callback) {
255 worker_task_runner_->PostTask(
257 base::Bind(&SyncWorker::EnableOrigin,
258 base::Unretained(sync_worker_.get()),
260 RelayCallbackToCurrentThread(
261 FROM_HERE, callback)));
264 void SyncEngine::DisableOrigin(
265 const GURL& origin, const SyncStatusCallback& callback) {
266 worker_task_runner_->PostTask(
268 base::Bind(&SyncWorker::DisableOrigin,
269 base::Unretained(sync_worker_.get()),
271 RelayCallbackToCurrentThread(
272 FROM_HERE, callback)));
275 void SyncEngine::UninstallOrigin(
278 const SyncStatusCallback& callback) {
279 worker_task_runner_->PostTask(
281 base::Bind(&SyncWorker::UninstallOrigin,
282 base::Unretained(sync_worker_.get()),
284 RelayCallbackToCurrentThread(
285 FROM_HERE, callback)));
288 void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
289 worker_task_runner_->PostTask(
291 base::Bind(&SyncWorker::ProcessRemoteChange,
292 base::Unretained(sync_worker_.get()),
293 RelayCallbackToCurrentThread(
294 FROM_HERE, callback)));
297 void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
298 remote_change_processor_ = processor;
299 remote_change_processor_wrapper_.reset(
300 new RemoteChangeProcessorWrapper(processor));
302 remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
303 remote_change_processor_wrapper_->AsWeakPtr(),
304 base::MessageLoopProxy::current(), /* ui_task_runner */
305 worker_task_runner_));
307 worker_task_runner_->PostTask(
309 base::Bind(&SyncWorker::SetRemoteChangeProcessor,
310 base::Unretained(sync_worker_.get()),
311 remote_change_processor_on_worker_.get()));
314 LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
318 bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) {
319 // TODO(tzik): Implement this before we support manual conflict resolution.
323 RemoteServiceState SyncEngine::GetCurrentState() const {
324 // TODO(peria): Post task
325 return sync_worker_->GetCurrentState();
328 void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) {
329 // TODO(peria): Make this route asynchronous.
330 sync_worker_->GetOriginStatusMap(status_map);
333 scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) {
334 // TODO(peria): Make this route asynchronous.
335 return sync_worker_->DumpFiles(origin);
338 scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() {
339 // TODO(peria): Make this route asynchronous.
340 return sync_worker_->DumpDatabase();
343 void SyncEngine::SetSyncEnabled(bool enabled) {
344 worker_task_runner_->PostTask(
346 base::Bind(&SyncWorker::SetSyncEnabled,
347 base::Unretained(sync_worker_.get()),
351 void SyncEngine::UpdateSyncEnabled(bool enabled) {
352 const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled";
354 Observer, service_observers_,
355 OnRemoteServiceStateUpdated(GetCurrentState(), status_message));
358 SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy(
359 ConflictResolutionPolicy policy) {
360 // TODO(peria): Make this route asynchronous.
361 return sync_worker_->SetDefaultConflictResolutionPolicy(policy);
364 SyncStatusCode SyncEngine::SetConflictResolutionPolicy(
366 ConflictResolutionPolicy policy) {
367 // TODO(peria): Make this route asynchronous.
368 return sync_worker_->SetConflictResolutionPolicy(origin, policy);
371 ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy()
373 // TODO(peria): Make this route asynchronous.
374 return sync_worker_->GetDefaultConflictResolutionPolicy();
377 ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy(
378 const GURL& origin) const {
379 // TODO(peria): Make this route asynchronous.
380 return sync_worker_->GetConflictResolutionPolicy(origin);
383 void SyncEngine::GetRemoteVersions(
384 const fileapi::FileSystemURL& url,
385 const RemoteVersionsCallback& callback) {
386 // TODO(tzik): Implement this before we support manual conflict resolution.
387 callback.Run(SYNC_STATUS_FAILED, std::vector<Version>());
390 void SyncEngine::DownloadRemoteVersion(
391 const fileapi::FileSystemURL& url,
392 const std::string& version_id,
393 const DownloadVersionCallback& callback) {
394 // TODO(tzik): Implement this before we support manual conflict resolution.
395 callback.Run(SYNC_STATUS_FAILED, webkit_blob::ScopedFile());
398 void SyncEngine::PromoteDemotedChanges() {
399 MetadataDatabase* metadata_db = GetMetadataDatabase();
400 if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) {
401 metadata_db->PromoteLowerPriorityTrackersToNormal();
405 OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker()));
409 void SyncEngine::ApplyLocalChange(
410 const FileChange& local_change,
411 const base::FilePath& local_path,
412 const SyncFileMetadata& local_metadata,
413 const fileapi::FileSystemURL& url,
414 const SyncStatusCallback& callback) {
415 worker_task_runner_->PostTask(
417 base::Bind(&SyncWorker::ApplyLocalChange,
418 base::Unretained(sync_worker_.get()),
423 RelayCallbackToCurrentThread(
424 FROM_HERE, callback)));
427 SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() {
428 // TODO(peria): Post task
429 return sync_worker_->GetSyncTaskManager();
432 void SyncEngine::OnNotificationReceived() {
433 worker_task_runner_->PostTask(
435 base::Bind(&SyncWorker::OnNotificationReceived,
436 base::Unretained(sync_worker_.get())));
439 void SyncEngine::OnPushNotificationEnabled(bool) {}
441 void SyncEngine::OnReadyToSendRequests() {
442 const std::string account_id =
443 signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : "";
445 worker_task_runner_->PostTask(
447 base::Bind(&SyncWorker::OnReadyToSendRequests,
448 base::Unretained(sync_worker_.get()),
452 void SyncEngine::OnRefreshTokenInvalid() {
453 worker_task_runner_->PostTask(
455 base::Bind(&SyncWorker::OnRefreshTokenInvalid,
456 base::Unretained(sync_worker_.get())));
459 void SyncEngine::OnNetworkChanged(
460 net::NetworkChangeNotifier::ConnectionType type) {
461 worker_task_runner_->PostTask(
463 base::Bind(&SyncWorker::OnNetworkChanged,
464 base::Unretained(sync_worker_.get()),
468 drive::DriveServiceInterface* SyncEngine::GetDriveService() {
469 return drive_service_.get();
472 drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() {
473 return drive_uploader_.get();
476 MetadataDatabase* SyncEngine::GetMetadataDatabase() {
477 // TODO(peria): Post task
478 return sync_worker_->GetMetadataDatabase();
481 SyncEngine::SyncEngine(
482 scoped_ptr<drive::DriveServiceInterface> drive_service,
483 scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
484 base::SequencedTaskRunner* worker_task_runner,
485 drive::DriveNotificationManager* notification_manager,
486 ExtensionServiceInterface* extension_service,
487 SigninManagerBase* signin_manager)
488 : drive_service_(drive_service.Pass()),
489 drive_service_wrapper_(new DriveServiceWrapper(drive_service_.get())),
490 drive_uploader_(drive_uploader.Pass()),
491 drive_uploader_wrapper_(new DriveUploaderWrapper(drive_uploader_.get())),
492 notification_manager_(notification_manager),
493 extension_service_(extension_service),
494 signin_manager_(signin_manager),
495 worker_task_runner_(worker_task_runner),
496 weak_ptr_factory_(this) {}
498 void SyncEngine::OnPendingFileListUpdated(int item_count) {
502 OnRemoteChangeQueueUpdated(item_count));
505 void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
506 SyncFileStatus file_status,
507 SyncAction sync_action,
508 SyncDirection direction) {
509 FOR_EACH_OBSERVER(FileStatusObserver,
510 file_status_observers_,
512 url, file_status, sync_action, direction));
515 void SyncEngine::UpdateServiceState(RemoteServiceState state,
516 const std::string& description) {
518 Observer, service_observers_,
519 OnRemoteServiceStateUpdated(state, description));
522 void SyncEngine::UpdateRegisteredApps() {
523 if (!extension_service_)
526 MetadataDatabase* metadata_db = GetMetadataDatabase();
528 std::vector<std::string> app_ids;
529 metadata_db->GetRegisteredAppIDs(&app_ids);
531 // Update the status of every origin using status from ExtensionService.
532 for (std::vector<std::string>::const_iterator itr = app_ids.begin();
533 itr != app_ids.end(); ++itr) {
534 const std::string& app_id = *itr;
536 extensions::Extension::GetBaseURLFromExtensionId(app_id);
537 if (!extension_service_->GetInstalledExtension(app_id)) {
538 // Extension has been uninstalled.
539 // (At this stage we can't know if it was unpacked extension or not,
540 // so just purge the remote folder.)
541 UninstallOrigin(origin,
542 RemoteFileSyncService::UNINSTALL_AND_PURGE_REMOTE,
543 base::Bind(&EmptyStatusCallback));
547 if (!metadata_db->FindAppRootTracker(app_id, &tracker)) {
548 // App will register itself on first run.
551 bool is_app_enabled = extension_service_->IsExtensionEnabled(app_id);
552 bool is_app_root_tracker_enabled =
553 tracker.tracker_kind() == TRACKER_KIND_APP_ROOT;
554 if (is_app_enabled && !is_app_root_tracker_enabled)
555 EnableOrigin(origin, base::Bind(&EmptyStatusCallback));
556 else if (!is_app_enabled && is_app_root_tracker_enabled)
557 DisableOrigin(origin, base::Bind(&EmptyStatusCallback));
561 } // namespace drive_backend
562 } // namespace sync_file_system