Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / chrome / browser / sync_file_system / drive_backend / sync_engine.cc
index 6b5f6ff..9b50473 100644 (file)
 #include "chrome/browser/extensions/extension_service.h"
 #include "chrome/browser/profiles/profile.h"
 #include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
-#include "chrome/browser/signin/signin_manager.h"
 #include "chrome/browser/signin/signin_manager_factory.h"
+#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
 #include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
 #include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
+#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
+#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
+#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
+#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
 #include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
 #include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
 #include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
 #include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
+#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
+#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
 #include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
+#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
 #include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
 #include "chrome/browser/sync_file_system/file_status_observer.h"
 #include "chrome/browser/sync_file_system/logger.h"
 #include "chrome/browser/sync_file_system/syncable_file_system_util.h"
 #include "components/signin/core/browser/profile_oauth2_token_service.h"
+#include "components/signin/core/browser/signin_manager.h"
 #include "content/public/browser/browser_thread.h"
 #include "extensions/browser/extension_system.h"
 #include "extensions/browser/extension_system_provider.h"
@@ -50,6 +58,52 @@ class RemoteChangeProcessor;
 
 namespace drive_backend {
 
+class SyncEngine::WorkerObserver
+    : public SyncWorker::Observer {
+ public:
+  WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
+                 base::WeakPtr<SyncEngine> sync_engine)
+      : ui_task_runner_(ui_task_runner),
+        sync_engine_(sync_engine){
+  }
+
+  virtual ~WorkerObserver() {}
+
+  virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
+    ui_task_runner_->PostTask(
+        FROM_HERE,
+        base::Bind(&SyncEngine::OnPendingFileListUpdated,
+                   sync_engine_,
+                   item_count));
+  }
+
+  virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
+                                   SyncFileStatus file_status,
+                                   SyncAction sync_action,
+                                   SyncDirection direction) OVERRIDE {
+    ui_task_runner_->PostTask(
+        FROM_HERE,
+        base::Bind(&SyncEngine::OnFileStatusChanged,
+                   sync_engine_,
+                   url, file_status, sync_action, direction));
+  }
+
+
+  virtual void UpdateServiceState(RemoteServiceState state,
+                                  const std::string& description) OVERRIDE {
+    ui_task_runner_->PostTask(
+        FROM_HERE,
+        base::Bind(&SyncEngine::UpdateServiceState,
+                   sync_engine_, state, description));
+  }
+
+ private:
+  scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
+  base::WeakPtr<SyncEngine> sync_engine_;
+
+  DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
+};
+
 namespace {
 
 void EmptyStatusCallback(SyncStatusCode status) {}
@@ -58,13 +112,6 @@ void EmptyStatusCallback(SyncStatusCode status) {}
 
 scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
     content::BrowserContext* context) {
-  GURL base_drive_url(
-      google_apis::DriveApiUrlGenerator::kBaseUrlForProduction);
-  GURL base_download_url(
-      google_apis::DriveApiUrlGenerator::kBaseDownloadUrlForProduction);
-  GURL wapi_base_url(
-      google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction);
-
   scoped_refptr<base::SequencedWorkerPool> worker_pool(
       content::BrowserThread::GetBlockingPool());
   scoped_refptr<base::SequencedTaskRunner> drive_task_runner(
@@ -75,15 +122,18 @@ scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
   Profile* profile = Profile::FromBrowserContext(context);
   ProfileOAuth2TokenService* token_service =
       ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
-  SigninManagerBase* signin_manager =
-      SigninManagerFactory::GetForProfile(profile);
   scoped_ptr<drive::DriveServiceInterface> drive_service(
       new drive::DriveAPIService(
           token_service,
           context->GetRequestContext(),
           drive_task_runner.get(),
-          base_drive_url, base_download_url, wapi_base_url,
+          GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
+          GURL(google_apis::DriveApiUrlGenerator::
+               kBaseDownloadUrlForProduction),
+          GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
           std::string() /* custom_user_agent */));
+  SigninManagerBase* signin_manager =
+      SigninManagerFactory::GetForProfile(profile);
   drive_service->Initialize(signin_manager->GetAuthenticatedAccountId());
 
   scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
@@ -92,24 +142,28 @@ scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
   drive::DriveNotificationManager* notification_manager =
       drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
   ExtensionService* extension_service =
-      extensions::ExtensionSystem::Get(
-          context)->extension_service();
+      extensions::ExtensionSystem::Get(context)->extension_service();
 
-  scoped_refptr<base::SequencedTaskRunner> task_runner(
+  scoped_refptr<base::SequencedTaskRunner> file_task_runner(
       worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
           worker_pool->GetSequenceToken(),
           base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
 
+  // TODO(peria): Create another task runner to manage SyncWorker.
+  scoped_refptr<base::SingleThreadTaskRunner>
+      worker_task_runner = base::MessageLoopProxy::current();
+
   scoped_ptr<drive_backend::SyncEngine> sync_engine(
-      new SyncEngine(GetSyncFileSystemDir(context->GetPath()),
-                     task_runner.get(),
-                     drive_service.Pass(),
+      new SyncEngine(drive_service.Pass(),
                      drive_uploader.Pass(),
+                     worker_task_runner,
                      notification_manager,
                      extension_service,
-                     signin_manager,
-                     NULL));
-  sync_engine->Initialize();
+                     signin_manager));
+  sync_engine->Initialize(
+      GetSyncFileSystemDir(context->GetPath()),
+      file_task_runner.get(),
+      NULL);
 
   return sync_engine.Pass();
 }
@@ -125,29 +179,56 @@ void SyncEngine::AppendDependsOnFactories(
 
 SyncEngine::~SyncEngine() {
   net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
-  context_->GetDriveService()->RemoveObserver(this);
+  GetDriveService()->RemoveObserver(this);
   if (notification_manager_)
     notification_manager_->RemoveObserver(this);
-}
-
-void SyncEngine::Initialize() {
-  DCHECK(!task_manager_);
-  task_manager_.reset(new SyncTaskManager(
-      weak_ptr_factory_.GetWeakPtr(),
-      0 /* maximum_background_task */));
-  task_manager_->Initialize(SYNC_STATUS_OK);
 
-  PostInitializeTask();
+  // TODO(tzik): Destroy |sync_worker_| and |worker_observer_| on the worker.
+}
+
+void SyncEngine::Initialize(const base::FilePath& base_dir,
+                            base::SequencedTaskRunner* file_task_runner,
+                            leveldb::Env* env_override) {
+  // DriveServiceWrapper and DriveServiceOnWorker relay communications
+  // between DriveService and syncers in SyncWorker.
+  scoped_ptr<drive::DriveServiceInterface>
+      drive_service_on_worker(
+          new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
+                                   base::MessageLoopProxy::current(),
+                                   worker_task_runner_));
+  scoped_ptr<drive::DriveUploaderInterface>
+      drive_uploader_on_worker(
+          new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
+                                    base::MessageLoopProxy::current(),
+                                    worker_task_runner_));
+  scoped_ptr<SyncEngineContext>
+      sync_engine_context(
+          new SyncEngineContext(drive_service_on_worker.Pass(),
+                                drive_uploader_on_worker.Pass(),
+                                base::MessageLoopProxy::current(),
+                                worker_task_runner_,
+                                file_task_runner));
+
+  worker_observer_.reset(
+      new WorkerObserver(base::MessageLoopProxy::current(),
+                         weak_ptr_factory_.GetWeakPtr()));
+
+  base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
+  if (extension_service_)
+    extension_service_weak_ptr = extension_service_->AsWeakPtr();
+
+  // TODO(peria): Use PostTask on |worker_task_runner_| to call this function.
+  sync_worker_ = SyncWorker::CreateOnWorker(
+      base_dir,
+      worker_observer_.get(),
+      extension_service_weak_ptr,
+      sync_engine_context.Pass(),
+      env_override);
 
   if (notification_manager_)
     notification_manager_->AddObserver(this);
-  context_->GetDriveService()->AddObserver(this);
+  GetDriveService()->AddObserver(this);
   net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
-
-  net::NetworkChangeNotifier::ConnectionType type =
-      net::NetworkChangeNotifier::GetConnectionType();
-  network_available_ =
-      type != net::NetworkChangeNotifier::CONNECTION_NONE;
 }
 
 void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
@@ -159,77 +240,75 @@ void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
 }
 
 void SyncEngine::RegisterOrigin(
-    const GURL& origin,
-    const SyncStatusCallback& callback) {
-  if (!context_->GetMetadataDatabase() &&
-      context_->GetDriveService()->HasRefreshToken())
-    PostInitializeTask();
-
-  scoped_ptr<RegisterAppTask> task(
-      new RegisterAppTask(context_.get(), origin.host()));
-  if (task->CanFinishImmediately()) {
-    callback.Run(SYNC_STATUS_OK);
-    return;
-  }
-
-  task_manager_->ScheduleSyncTask(
+    const GURL& origin, const SyncStatusCallback& callback) {
+  worker_task_runner_->PostTask(
       FROM_HERE,
-      task.PassAs<SyncTask>(),
-      SyncTaskManager::PRIORITY_HIGH,
-      callback);
+      base::Bind(&SyncWorker::RegisterOrigin,
+                 base::Unretained(sync_worker_.get()),
+                 origin,
+                 RelayCallbackToCurrentThread(
+                     FROM_HERE, callback)));
 }
 
 void SyncEngine::EnableOrigin(
-    const GURL& origin,
-    const SyncStatusCallback& callback) {
-  task_manager_->ScheduleTask(
+    const GURL& origin, const SyncStatusCallback& callback) {
+  worker_task_runner_->PostTask(
       FROM_HERE,
-      base::Bind(&SyncEngine::DoEnableApp,
-                 weak_ptr_factory_.GetWeakPtr(),
-                 origin.host()),
-      SyncTaskManager::PRIORITY_HIGH,
-      callback);
+      base::Bind(&SyncWorker::EnableOrigin,
+                 base::Unretained(sync_worker_.get()),
+                 origin,
+                 RelayCallbackToCurrentThread(
+                     FROM_HERE, callback)));
 }
 
 void SyncEngine::DisableOrigin(
-    const GURL& origin,
-    const SyncStatusCallback& callback) {
-  task_manager_->ScheduleTask(
+    const GURL& origin, const SyncStatusCallback& callback) {
+  worker_task_runner_->PostTask(
       FROM_HERE,
-      base::Bind(&SyncEngine::DoDisableApp,
-                 weak_ptr_factory_.GetWeakPtr(),
-                 origin.host()),
-      SyncTaskManager::PRIORITY_HIGH,
-      callback);
+      base::Bind(&SyncWorker::DisableOrigin,
+                 base::Unretained(sync_worker_.get()),
+                 origin,
+                 RelayCallbackToCurrentThread(
+                     FROM_HERE, callback)));
 }
 
 void SyncEngine::UninstallOrigin(
     const GURL& origin,
     UninstallFlag flag,
     const SyncStatusCallback& callback) {
-  task_manager_->ScheduleSyncTask(
+  worker_task_runner_->PostTask(
       FROM_HERE,
-      scoped_ptr<SyncTask>(
-          new UninstallAppTask(context_.get(), origin.host(), flag)),
-      SyncTaskManager::PRIORITY_HIGH,
-      callback);
+      base::Bind(&SyncWorker::UninstallOrigin,
+                 base::Unretained(sync_worker_.get()),
+                 origin, flag,
+                 RelayCallbackToCurrentThread(
+                     FROM_HERE, callback)));
 }
 
-void SyncEngine::ProcessRemoteChange(
-    const SyncFileCallback& callback) {
-  RemoteToLocalSyncer* syncer = new RemoteToLocalSyncer(context_.get());
-  task_manager_->ScheduleSyncTask(
+void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
+  worker_task_runner_->PostTask(
       FROM_HERE,
-      scoped_ptr<SyncTask>(syncer),
-      SyncTaskManager::PRIORITY_MED,
-      base::Bind(&SyncEngine::DidProcessRemoteChange,
-                 weak_ptr_factory_.GetWeakPtr(),
-                 syncer, callback));
+      base::Bind(&SyncWorker::ProcessRemoteChange,
+                 base::Unretained(sync_worker_.get()),
+                 RelayCallbackToCurrentThread(
+                     FROM_HERE, callback)));
 }
 
-void SyncEngine::SetRemoteChangeProcessor(
-    RemoteChangeProcessor* processor) {
-  context_->SetRemoteChangeProcessor(processor);
+void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
+  remote_change_processor_ = processor;
+  remote_change_processor_wrapper_.reset(
+      new RemoteChangeProcessorWrapper(processor));
+
+  remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
+      remote_change_processor_wrapper_->AsWeakPtr(),
+      base::MessageLoopProxy::current(), /* ui_task_runner */
+      worker_task_runner_));
+
+  worker_task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&SyncWorker::SetRemoteChangeProcessor,
+                 base::Unretained(sync_worker_.get()),
+                 remote_change_processor_on_worker_.get()));
 }
 
 LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
@@ -242,51 +321,34 @@ bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) {
 }
 
 RemoteServiceState SyncEngine::GetCurrentState() const {
-  if (!sync_enabled_)
-    return REMOTE_SERVICE_DISABLED;
-  return service_state_;
+  // TODO(peria): Post task
+  return sync_worker_->GetCurrentState();
 }
 
 void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) {
-  DCHECK(status_map);
-  if (!extension_service_ || !context_->GetMetadataDatabase())
-    return;
-
-  std::vector<std::string> app_ids;
-  context_->GetMetadataDatabase()->GetRegisteredAppIDs(&app_ids);
-
-  for (std::vector<std::string>::const_iterator itr = app_ids.begin();
-       itr != app_ids.end(); ++itr) {
-    const std::string& app_id = *itr;
-    GURL origin =
-        extensions::Extension::GetBaseURLFromExtensionId(app_id);
-    (*status_map)[origin] =
-        context_->GetMetadataDatabase()->IsAppEnabled(app_id) ?
-        "Enabled" : "Disabled";
-  }
+  // TODO(peria): Make this route asynchronous.
+  sync_worker_->GetOriginStatusMap(status_map);
 }
 
 scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) {
-  if (!context_->GetMetadataDatabase())
-    return scoped_ptr<base::ListValue>();
-  return context_->GetMetadataDatabase()->DumpFiles(origin.host());
+  // TODO(peria): Make this route asynchronous.
+  return sync_worker_->DumpFiles(origin);
 }
 
 scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() {
-  if (!context_->GetMetadataDatabase())
-    return scoped_ptr<base::ListValue>();
-  return context_->GetMetadataDatabase()->DumpDatabase();
+  // TODO(peria): Make this route asynchronous.
+  return sync_worker_->DumpDatabase();
 }
 
 void SyncEngine::SetSyncEnabled(bool enabled) {
-  if (sync_enabled_ == enabled)
-    return;
-
-  RemoteServiceState old_state = GetCurrentState();
-  sync_enabled_ = enabled;
-  if (old_state == GetCurrentState())
-    return;
+  worker_task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&SyncWorker::SetSyncEnabled,
+                 base::Unretained(sync_worker_.get()),
+                 enabled));
+}
 
+void SyncEngine::UpdateSyncEnabled(bool enabled) {
   const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled";
   FOR_EACH_OBSERVER(
       Observer, service_observers_,
@@ -295,27 +357,27 @@ void SyncEngine::SetSyncEnabled(bool enabled) {
 
 SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy(
     ConflictResolutionPolicy policy) {
-  default_conflict_resolution_policy_ = policy;
-  return SYNC_STATUS_OK;
+  // TODO(peria): Make this route asynchronous.
+  return sync_worker_->SetDefaultConflictResolutionPolicy(policy);
 }
 
 SyncStatusCode SyncEngine::SetConflictResolutionPolicy(
     const GURL& origin,
     ConflictResolutionPolicy policy) {
-  NOTIMPLEMENTED();
-  default_conflict_resolution_policy_ = policy;
-  return SYNC_STATUS_OK;
+  // TODO(peria): Make this route asynchronous.
+  return sync_worker_->SetConflictResolutionPolicy(origin, policy);
 }
 
 ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy()
     const {
-  return default_conflict_resolution_policy_;
+  // TODO(peria): Make this route asynchronous.
+  return sync_worker_->GetDefaultConflictResolutionPolicy();
 }
 
 ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy(
     const GURL& origin) const {
-  NOTIMPLEMENTED();
-  return default_conflict_resolution_policy_;
+  // TODO(peria): Make this route asynchronous.
+  return sync_worker_->GetConflictResolutionPolicy(origin);
 }
 
 void SyncEngine::GetRemoteVersions(
@@ -334,14 +396,13 @@ void SyncEngine::DownloadRemoteVersion(
 }
 
 void SyncEngine::PromoteDemotedChanges() {
-  if (context_->GetMetadataDatabase() &&
-      context_->GetMetadataDatabase()->HasLowPriorityDirtyTracker()) {
-    context_->GetMetadataDatabase()->PromoteLowerPriorityTrackersToNormal();
+  MetadataDatabase* metadata_db = GetMetadataDatabase();
+  if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) {
+    metadata_db->PromoteLowerPriorityTrackersToNormal();
     FOR_EACH_OBSERVER(
         Observer,
         service_observers_,
-        OnRemoteChangeQueueUpdated(
-            context_->GetMetadataDatabase()->CountDirtyTracker()));
+        OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker()));
   }
 }
 
@@ -351,392 +412,121 @@ void SyncEngine::ApplyLocalChange(
     const SyncFileMetadata& local_metadata,
     const fileapi::FileSystemURL& url,
     const SyncStatusCallback& callback) {
-  LocalToRemoteSyncer* syncer = new LocalToRemoteSyncer(
-      context_.get(), local_metadata, local_change, local_path, url);
-  task_manager_->ScheduleSyncTask(
+  worker_task_runner_->PostTask(
       FROM_HERE,
-      scoped_ptr<SyncTask>(syncer),
-      SyncTaskManager::PRIORITY_MED,
-      base::Bind(&SyncEngine::DidApplyLocalChange,
-                 weak_ptr_factory_.GetWeakPtr(),
-                 syncer, callback));
+      base::Bind(&SyncWorker::ApplyLocalChange,
+                 base::Unretained(sync_worker_.get()),
+                 local_change,
+                 local_path,
+                 local_metadata,
+                 url,
+                 RelayCallbackToCurrentThread(
+                     FROM_HERE, callback)));
 }
 
-void SyncEngine::MaybeScheduleNextTask() {
-  if (GetCurrentState() == REMOTE_SERVICE_DISABLED)
-    return;
-
-  // TODO(tzik): Notify observer of OnRemoteChangeQueueUpdated.
-  // TODO(tzik): Add an interface to get the number of dirty trackers to
-  // MetadataDatabase.
-
-  MaybeStartFetchChanges();
-}
-
-void SyncEngine::NotifyLastOperationStatus(
-    SyncStatusCode sync_status,
-    bool used_network) {
-  UpdateServiceStateFromSyncStatusCode(sync_status, used_network);
-  if (context_->GetMetadataDatabase()) {
-    FOR_EACH_OBSERVER(
-        Observer,
-        service_observers_,
-        OnRemoteChangeQueueUpdated(
-            context_->GetMetadataDatabase()->CountDirtyTracker()));
-  }
+SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() {
+  // TODO(peria): Post task
+  return sync_worker_->GetSyncTaskManager();
 }
 
 void SyncEngine::OnNotificationReceived() {
-  if (service_state_ == REMOTE_SERVICE_TEMPORARY_UNAVAILABLE)
-    UpdateServiceState(REMOTE_SERVICE_OK, "Got push notification for Drive.");
-
-  should_check_remote_change_ = true;
-  MaybeScheduleNextTask();
+  worker_task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&SyncWorker::OnNotificationReceived,
+                 base::Unretained(sync_worker_.get())));
 }
 
-void SyncEngine::OnPushNotificationEnabled(bool enabled) {}
+void SyncEngine::OnPushNotificationEnabled(bool) {}
 
 void SyncEngine::OnReadyToSendRequests() {
-  if (service_state_ == REMOTE_SERVICE_OK)
-    return;
-  UpdateServiceState(REMOTE_SERVICE_OK, "Authenticated");
-
-  if (!context_->GetMetadataDatabase() && signin_manager_) {
-    context_->GetDriveService()->Initialize(
-        signin_manager_->GetAuthenticatedAccountId());
-    PostInitializeTask();
-    return;
-  }
+  const std::string account_id =
+      signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : "";
 
-  should_check_remote_change_ = true;
-  MaybeScheduleNextTask();
+  worker_task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&SyncWorker::OnReadyToSendRequests,
+                 base::Unretained(sync_worker_.get()),
+                 account_id));
 }
 
 void SyncEngine::OnRefreshTokenInvalid() {
-  UpdateServiceState(
-      REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
-      "Found invalid refresh token.");
+  worker_task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&SyncWorker::OnRefreshTokenInvalid,
+                 base::Unretained(sync_worker_.get())));
 }
 
 void SyncEngine::OnNetworkChanged(
     net::NetworkChangeNotifier::ConnectionType type) {
-  bool new_network_availability =
-      type != net::NetworkChangeNotifier::CONNECTION_NONE;
-
-  if (network_available_ && !new_network_availability) {
-    UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE, "Disconnected");
-  } else if (!network_available_ && new_network_availability) {
-    UpdateServiceState(REMOTE_SERVICE_OK, "Connected");
-    should_check_remote_change_ = true;
-    MaybeStartFetchChanges();
-  }
-  network_available_ = new_network_availability;
+  worker_task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&SyncWorker::OnNetworkChanged,
+                 base::Unretained(sync_worker_.get()),
+                 type));
 }
 
 drive::DriveServiceInterface* SyncEngine::GetDriveService() {
-  return context_->GetDriveService();
+  return drive_service_.get();
 }
 
 drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() {
-  return context_->GetDriveUploader();
+  return drive_uploader_.get();
 }
 
 MetadataDatabase* SyncEngine::GetMetadataDatabase() {
-  return context_->GetMetadataDatabase();
-}
-
-RemoteChangeProcessor* SyncEngine::GetRemoteChangeProcessor() {
-  return context_->GetRemoteChangeProcessor();
-}
-
-base::SequencedTaskRunner* SyncEngine::GetBlockingTaskRunner() {
-  return context_->GetBlockingTaskRunner();
-}
-
-SyncEngine::SyncEngine(const base::FilePath& base_dir,
-                       base::SequencedTaskRunner* task_runner,
-                       scoped_ptr<drive::DriveServiceInterface> drive_service,
-                       scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
-                       drive::DriveNotificationManager* notification_manager,
-                       ExtensionServiceInterface* extension_service,
-                       SigninManagerBase* signin_manager,
-                       leveldb::Env* env_override)
-    : base_dir_(base_dir),
-      env_override_(env_override),
+  // TODO(peria): Post task
+  return sync_worker_->GetMetadataDatabase();
+}
+
+SyncEngine::SyncEngine(
+    scoped_ptr<drive::DriveServiceInterface> drive_service,
+    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
+    base::SequencedTaskRunner* worker_task_runner,
+    drive::DriveNotificationManager* notification_manager,
+    ExtensionServiceInterface* extension_service,
+    SigninManagerBase* signin_manager)
+    : drive_service_(drive_service.Pass()),
+      drive_service_wrapper_(new DriveServiceWrapper(drive_service_.get())),
+      drive_uploader_(drive_uploader.Pass()),
+      drive_uploader_wrapper_(new DriveUploaderWrapper(drive_uploader_.get())),
       notification_manager_(notification_manager),
       extension_service_(extension_service),
       signin_manager_(signin_manager),
-      service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
-      should_check_conflict_(true),
-      should_check_remote_change_(true),
-      listing_remote_changes_(false),
-      sync_enabled_(false),
-      default_conflict_resolution_policy_(
-          CONFLICT_RESOLUTION_POLICY_LAST_WRITE_WIN),
-      network_available_(false),
-      context_(new SyncEngineContext(drive_service.Pass(),
-                                     drive_uploader.Pass(),
-                                     task_runner)),
-      weak_ptr_factory_(this) {
-}
-
-void SyncEngine::DoDisableApp(const std::string& app_id,
-                              const SyncStatusCallback& callback) {
-  if (context_->GetMetadataDatabase())
-    context_->GetMetadataDatabase()->DisableApp(app_id, callback);
-  else
-    callback.Run(SYNC_STATUS_OK);
-}
-
-void SyncEngine::DoEnableApp(const std::string& app_id,
-                             const SyncStatusCallback& callback) {
-  if (context_->GetMetadataDatabase())
-    context_->GetMetadataDatabase()->EnableApp(app_id, callback);
-  else
-    callback.Run(SYNC_STATUS_OK);
-}
-
-void SyncEngine::PostInitializeTask() {
-  DCHECK(!context_->GetMetadataDatabase());
-
-  // This initializer task may not run if MetadataDatabase in context_ is
-  // already initialized when it runs.
-  SyncEngineInitializer* initializer =
-      new SyncEngineInitializer(context_.get(),
-                                context_->GetBlockingTaskRunner(),
-                                context_->GetDriveService(),
-                                base_dir_.Append(kDatabaseName),
-                                env_override_);
-  task_manager_->ScheduleSyncTask(
-      FROM_HERE,
-      scoped_ptr<SyncTask>(initializer),
-      SyncTaskManager::PRIORITY_HIGH,
-      base::Bind(&SyncEngine::DidInitialize, weak_ptr_factory_.GetWeakPtr(),
-                 initializer));
-}
-
-void SyncEngine::DidInitialize(SyncEngineInitializer* initializer,
-                               SyncStatusCode status) {
-  if (status != SYNC_STATUS_OK) {
-    if (context_->GetDriveService()->HasRefreshToken()) {
-      UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE,
-                         "Could not initialize remote service");
-    } else {
-      UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
-                         "Authentication required.");
-    }
-    return;
-  }
-
-  scoped_ptr<MetadataDatabase> metadata_database =
-      initializer->PassMetadataDatabase();
-  if (metadata_database)
-    context_->SetMetadataDatabase(metadata_database.Pass());
-
-  UpdateRegisteredApps();
-}
-
-void SyncEngine::DidProcessRemoteChange(RemoteToLocalSyncer* syncer,
-                                        const SyncFileCallback& callback,
-                                        SyncStatusCode status) {
-  if (syncer->is_sync_root_deletion()) {
-    MetadataDatabase::ClearDatabase(context_->PassMetadataDatabase());
-    PostInitializeTask();
-    callback.Run(status, syncer->url());
-    return;
-  }
-
-  if (status == SYNC_STATUS_OK) {
-    if (syncer->sync_action() != SYNC_ACTION_NONE &&
-        syncer->url().is_valid()) {
-      FOR_EACH_OBSERVER(FileStatusObserver,
-                        file_status_observers_,
-                        OnFileStatusChanged(syncer->url(),
-                                            SYNC_FILE_STATUS_SYNCED,
-                                            syncer->sync_action(),
-                                            SYNC_DIRECTION_REMOTE_TO_LOCAL));
-    }
-
-    if (syncer->sync_action() == SYNC_ACTION_DELETED &&
-        syncer->url().is_valid() &&
-        fileapi::VirtualPath::IsRootPath(syncer->url().path())) {
-      RegisterOrigin(syncer->url().origin(), base::Bind(&EmptyStatusCallback));
-    }
-    should_check_conflict_ = true;
-  }
-  callback.Run(status, syncer->url());
-}
-
-void SyncEngine::DidApplyLocalChange(LocalToRemoteSyncer* syncer,
-                                     const SyncStatusCallback& callback,
-                                     SyncStatusCode status) {
-  if ((status == SYNC_STATUS_OK || status == SYNC_STATUS_RETRY) &&
-      syncer->url().is_valid() &&
-      syncer->sync_action() != SYNC_ACTION_NONE) {
-    fileapi::FileSystemURL updated_url = syncer->url();
-    if (!syncer->target_path().empty()) {
-      updated_url = CreateSyncableFileSystemURL(syncer->url().origin(),
-                                                syncer->target_path());
-    }
-    FOR_EACH_OBSERVER(FileStatusObserver,
-                      file_status_observers_,
-                      OnFileStatusChanged(updated_url,
-                                          SYNC_FILE_STATUS_SYNCED,
-                                          syncer->sync_action(),
-                                          SYNC_DIRECTION_LOCAL_TO_REMOTE));
-  }
-
-  if (status == SYNC_STATUS_UNKNOWN_ORIGIN && syncer->url().is_valid()) {
-    RegisterOrigin(syncer->url().origin(),
-                   base::Bind(&EmptyStatusCallback));
-  }
-
-  if (syncer->needs_remote_change_listing() &&
-      !listing_remote_changes_) {
-    task_manager_->ScheduleSyncTask(
-        FROM_HERE,
-        scoped_ptr<SyncTask>(new ListChangesTask(context_.get())),
-        SyncTaskManager::PRIORITY_HIGH,
-        base::Bind(&SyncEngine::DidFetchChanges,
-                   weak_ptr_factory_.GetWeakPtr()));
-    should_check_remote_change_ = false;
-    listing_remote_changes_ = true;
-    time_to_check_changes_ =
-        base::TimeTicks::Now() +
-        base::TimeDelta::FromSeconds(kListChangesRetryDelaySeconds);
-  }
-
-  if (status != SYNC_STATUS_OK &&
-      status != SYNC_STATUS_NO_CHANGE_TO_SYNC) {
-    callback.Run(status);
-    return;
-  }
-
-  if (status == SYNC_STATUS_OK)
-    should_check_conflict_ = true;
-
-  callback.Run(status);
-}
-
-void SyncEngine::MaybeStartFetchChanges() {
-  if (GetCurrentState() == REMOTE_SERVICE_DISABLED)
-    return;
-
-  if (!context_->GetMetadataDatabase())
-    return;
-
-  if (listing_remote_changes_)
-    return;
+      worker_task_runner_(worker_task_runner),
+      weak_ptr_factory_(this) {}
 
-  base::TimeTicks now = base::TimeTicks::Now();
-  if (!should_check_remote_change_ && now < time_to_check_changes_) {
-    if (!context_->GetMetadataDatabase()->HasDirtyTracker() &&
-        should_check_conflict_) {
-      should_check_conflict_ = false;
-      task_manager_->ScheduleSyncTaskIfIdle(
-          FROM_HERE,
-          scoped_ptr<SyncTask>(new ConflictResolver(context_.get())),
-          base::Bind(&SyncEngine::DidResolveConflict,
-                     weak_ptr_factory_.GetWeakPtr()));
-    }
-    return;
-  }
-
-  if (task_manager_->ScheduleSyncTaskIfIdle(
-          FROM_HERE,
-          scoped_ptr<SyncTask>(new ListChangesTask(context_.get())),
-          base::Bind(&SyncEngine::DidFetchChanges,
-                     weak_ptr_factory_.GetWeakPtr()))) {
-    should_check_remote_change_ = false;
-    listing_remote_changes_ = true;
-    time_to_check_changes_ =
-        now + base::TimeDelta::FromSeconds(kListChangesRetryDelaySeconds);
-  }
+void SyncEngine::OnPendingFileListUpdated(int item_count) {
+  FOR_EACH_OBSERVER(
+      Observer,
+      service_observers_,
+      OnRemoteChangeQueueUpdated(item_count));
 }
 
-void SyncEngine::DidResolveConflict(SyncStatusCode status) {
-  if (status == SYNC_STATUS_OK)
-    should_check_conflict_ = true;
-}
-
-void SyncEngine::DidFetchChanges(SyncStatusCode status) {
-  if (status == SYNC_STATUS_OK)
-    should_check_conflict_ = true;
-  listing_remote_changes_ = false;
-}
-
-void SyncEngine::UpdateServiceStateFromSyncStatusCode(
-      SyncStatusCode status,
-      bool used_network) {
-  switch (status) {
-    case SYNC_STATUS_OK:
-      if (used_network)
-        UpdateServiceState(REMOTE_SERVICE_OK, std::string());
-      break;
-
-    // Authentication error.
-    case SYNC_STATUS_AUTHENTICATION_FAILED:
-      UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
-                         "Authentication required");
-      break;
-
-    // OAuth token error.
-    case SYNC_STATUS_ACCESS_FORBIDDEN:
-      UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
-                         "Access forbidden");
-      break;
-
-    // Errors which could make the service temporarily unavailable.
-    case SYNC_STATUS_SERVICE_TEMPORARILY_UNAVAILABLE:
-    case SYNC_STATUS_NETWORK_ERROR:
-    case SYNC_STATUS_ABORT:
-    case SYNC_STATUS_FAILED:
-      if (context_->GetDriveService()->HasRefreshToken()) {
-        UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE,
-                           "Network or temporary service error.");
-      } else {
-        UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
-                           "Authentication required");
-      }
-      break;
-
-    // Errors which would require manual user intervention to resolve.
-    case SYNC_DATABASE_ERROR_CORRUPTION:
-    case SYNC_DATABASE_ERROR_IO_ERROR:
-    case SYNC_DATABASE_ERROR_FAILED:
-      UpdateServiceState(REMOTE_SERVICE_DISABLED,
-                         "Unrecoverable database error");
-      break;
-
-    default:
-      // Other errors don't affect service state
-      break;
-  }
+void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
+                                     SyncFileStatus file_status,
+                                     SyncAction sync_action,
+                                     SyncDirection direction) {
+  FOR_EACH_OBSERVER(FileStatusObserver,
+                    file_status_observers_,
+                    OnFileStatusChanged(
+                        url, file_status, sync_action, direction));
 }
 
 void SyncEngine::UpdateServiceState(RemoteServiceState state,
                                     const std::string& description) {
-  RemoteServiceState old_state = GetCurrentState();
-  service_state_ = state;
-
-  if (old_state == GetCurrentState())
-    return;
-
-  util::Log(logging::LOG_VERBOSE, FROM_HERE,
-            "Service state changed: %d->%d: %s",
-            old_state, GetCurrentState(), description.c_str());
   FOR_EACH_OBSERVER(
       Observer, service_observers_,
-      OnRemoteServiceStateUpdated(GetCurrentState(), description));
+      OnRemoteServiceStateUpdated(state, description));
 }
 
 void SyncEngine::UpdateRegisteredApps() {
   if (!extension_service_)
     return;
 
-  DCHECK(context_->GetMetadataDatabase());
+  MetadataDatabase* metadata_db = GetMetadataDatabase();
+  DCHECK(metadata_db);
   std::vector<std::string> app_ids;
-  context_->GetMetadataDatabase()->GetRegisteredAppIDs(&app_ids);
+  metadata_db->GetRegisteredAppIDs(&app_ids);
 
   // Update the status of every origin using status from ExtensionService.
   for (std::vector<std::string>::const_iterator itr = app_ids.begin();
@@ -754,8 +544,7 @@ void SyncEngine::UpdateRegisteredApps() {
       continue;
     }
     FileTracker tracker;
-    if (!context_->GetMetadataDatabase()->FindAppRootTracker(app_id,
-                                                             &tracker)) {
+    if (!metadata_db->FindAppRootTracker(app_id, &tracker)) {
       // App will register itself on first run.
       continue;
     }