Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / sync / sessions / model_type_registry.cc
index 9ceac74..c2ba5f4 100644 (file)
 #include "sync/sessions/model_type_registry.h"
 
 #include "base/bind.h"
-#include "base/message_loop/message_loop_proxy.h"
 #include "base/observer_list.h"
+#include "base/thread_task_runner_handle.h"
 #include "sync/engine/directory_commit_contributor.h"
 #include "sync/engine/directory_update_handler.h"
-#include "sync/engine/non_blocking_sync_common.h"
-#include "sync/engine/non_blocking_type_processor.h"
-#include "sync/engine/non_blocking_type_processor_core.h"
-#include "sync/engine/non_blocking_type_processor_core_interface.h"
-#include "sync/engine/non_blocking_type_processor_interface.h"
+#include "sync/engine/model_type_sync_proxy.h"
+#include "sync/engine/model_type_sync_proxy_impl.h"
+#include "sync/engine/model_type_sync_worker.h"
+#include "sync/engine/model_type_sync_worker_impl.h"
+#include "sync/internal_api/public/non_blocking_sync_common.h"
 #include "sync/sessions/directory_type_debug_info_emitter.h"
+#include "sync/util/cryptographer.h"
 
 namespace syncer {
 
 namespace {
 
-class NonBlockingTypeProcessorWrapper
-    : public NonBlockingTypeProcessorInterface {
+class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
  public:
-  NonBlockingTypeProcessorWrapper(
-      base::WeakPtr<NonBlockingTypeProcessor> processor,
-      scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
-  virtual ~NonBlockingTypeProcessorWrapper();
+  ModelTypeSyncProxyWrapper(
+      const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
+      const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
+  virtual ~ModelTypeSyncProxyWrapper();
 
-  virtual void ReceiveCommitResponse(
+  virtual void OnCommitCompleted(
       const DataTypeState& type_state,
       const CommitResponseDataList& response_list) OVERRIDE;
-  virtual void ReceiveUpdateResponse(
+  virtual void OnUpdateReceived(
       const DataTypeState& type_state,
-      const UpdateResponseDataList& response_list) OVERRIDE;
+      const UpdateResponseDataList& response_list,
+      const UpdateResponseDataList& pending_updates) OVERRIDE;
 
  private:
-  base::WeakPtr<NonBlockingTypeProcessor> processor_;
+  base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
   scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
 };
 
-NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
-    base::WeakPtr<NonBlockingTypeProcessor> processor,
-    scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
-    : processor_(processor), processor_task_runner_(processor_task_runner) {
+ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
+    const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
+    const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
+    : processor_(proxy), processor_task_runner_(processor_task_runner) {
 }
 
-NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
+ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
 }
 
-void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
+void ModelTypeSyncProxyWrapper::OnCommitCompleted(
     const DataTypeState& type_state,
     const CommitResponseDataList& response_list) {
   processor_task_runner_->PostTask(
       FROM_HERE,
-      base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
+      base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
                  processor_,
                  type_state,
                  response_list));
 }
 
-void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
+void ModelTypeSyncProxyWrapper::OnUpdateReceived(
     const DataTypeState& type_state,
-    const UpdateResponseDataList& response_list) {
+    const UpdateResponseDataList& response_list,
+    const UpdateResponseDataList& pending_updates) {
   processor_task_runner_->PostTask(
       FROM_HERE,
-      base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
+      base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
                  processor_,
                  type_state,
-                 response_list));
+                 response_list,
+                 pending_updates));
 }
 
-class NonBlockingTypeProcessorCoreWrapper
-    : public NonBlockingTypeProcessorCoreInterface {
+class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
  public:
-  NonBlockingTypeProcessorCoreWrapper(
-      base::WeakPtr<NonBlockingTypeProcessorCore> core,
-      scoped_refptr<base::SequencedTaskRunner> sync_thread);
-  virtual ~NonBlockingTypeProcessorCoreWrapper();
+  ModelTypeSyncWorkerWrapper(
+      const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
+      const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
+  virtual ~ModelTypeSyncWorkerWrapper();
 
-  virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
+  virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
 
  private:
-  base::WeakPtr<NonBlockingTypeProcessorCore> core_;
+  base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
   scoped_refptr<base::SequencedTaskRunner> sync_thread_;
 };
 
-NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
-    base::WeakPtr<NonBlockingTypeProcessorCore> core,
-    scoped_refptr<base::SequencedTaskRunner> sync_thread)
-    : core_(core), sync_thread_(sync_thread) {
+ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
+    const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
+    const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
+    : worker_(worker), sync_thread_(sync_thread) {
 }
 
-NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
+ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
 }
 
-void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
+void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
     const CommitRequestDataList& list) {
   sync_thread_->PostTask(
       FROM_HERE,
-      base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
+      base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
 }
 
 }  // namespace
 
-ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
-
 ModelTypeRegistry::ModelTypeRegistry(
     const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
-    syncable::Directory* directory)
-    : directory_(directory) {
+    syncable::Directory* directory,
+    NudgeHandler* nudge_handler)
+    : directory_(directory),
+      cryptographer_provider_(directory_),
+      nudge_handler_(nudge_handler),
+      weak_ptr_factory_(this) {
   for (size_t i = 0u; i < workers.size(); ++i) {
     workers_map_.insert(
         std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
@@ -183,45 +187,49 @@ void ModelTypeRegistry::SetEnabledDirectoryTypes(
                       GetEnabledNonBlockingTypes()).Empty());
 }
 
-void ModelTypeRegistry::InitializeNonBlockingType(
+void ModelTypeRegistry::ConnectSyncTypeToWorker(
     ModelType type,
     const DataTypeState& data_type_state,
-    scoped_refptr<base::SequencedTaskRunner> type_task_runner,
-    base::WeakPtr<NonBlockingTypeProcessor> processor) {
+    const UpdateResponseDataList& saved_pending_updates,
+    const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
+    const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
   DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
 
-  // Initialize CoreProcessor -> Processor communication channel.
-  scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
-      new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
-  scoped_ptr<NonBlockingTypeProcessorCore> core(
-      new NonBlockingTypeProcessorCore(
-          type, data_type_state, processor_interface.Pass()));
-
-  // Initialize Processor -> CoreProcessor communication channel.
-  scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
-      new NonBlockingTypeProcessorCoreWrapper(
-          core->AsWeakPtr(),
-          scoped_refptr<base::SequencedTaskRunner>(
-              base::MessageLoopProxy::current())));
+  // Initialize Worker -> Proxy communication channel.
+  scoped_ptr<ModelTypeSyncProxy> proxy(
+      new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
+  scoped_ptr<ModelTypeSyncWorkerImpl> worker(
+      new ModelTypeSyncWorkerImpl(type,
+                                  data_type_state,
+                                  saved_pending_updates,
+                                  &cryptographer_provider_,
+                                  nudge_handler_,
+                                  proxy.Pass()));
+
+  // Initialize Proxy -> Worker communication channel.
+  scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
+      new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
+                                     scoped_refptr<base::SequencedTaskRunner>(
+                                         base::ThreadTaskRunnerHandle::Get())));
   type_task_runner->PostTask(FROM_HERE,
-                             base::Bind(&NonBlockingTypeProcessor::OnConnect,
-                                        processor,
-                                        base::Passed(&core_interface)));
+                             base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
+                                        proxy_impl,
+                                        base::Passed(&wrapped_worker)));
 
   DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
   DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
 
-  update_handler_map_.insert(std::make_pair(type, core.get()));
-  commit_contributor_map_.insert(std::make_pair(type, core.get()));
+  update_handler_map_.insert(std::make_pair(type, worker.get()));
+  commit_contributor_map_.insert(std::make_pair(type, worker.get()));
 
   // The container takes ownership.
-  non_blocking_type_processor_cores_.push_back(core.release());
+  model_type_sync_workers_.push_back(worker.release());
 
   DCHECK(Intersection(GetEnabledDirectoryTypes(),
                       GetEnabledNonBlockingTypes()).Empty());
 }
 
-void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
+void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
   DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
   DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
   DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
@@ -232,12 +240,13 @@ void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
   DCHECK_EQ(1U, updaters_erased);
   DCHECK_EQ(1U, committers_erased);
 
-  // Remove from the ScopedVector, deleting the core in the process.
-  for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
-       non_blocking_type_processor_cores_.begin();
-       it != non_blocking_type_processor_cores_.end(); ++it) {
+  // Remove from the ScopedVector, deleting the worker in the process.
+  for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
+           model_type_sync_workers_.begin();
+       it != model_type_sync_workers_.end();
+       ++it) {
     if ((*it)->GetModelType() == type) {
-      non_blocking_type_processor_cores_.erase(it);
+      model_type_sync_workers_.erase(it);
       break;
     }
   }
@@ -286,15 +295,20 @@ void ModelTypeRegistry::RequestEmitDebugInfo() {
   }
 }
 
+base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
+  return weak_ptr_factory_.GetWeakPtr();
+}
+
 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
   return enabled_directory_types_;
 }
 
 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
   ModelTypeSet enabled_off_thread_types;
-  for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
-           non_blocking_type_processor_cores_.begin();
-       it != non_blocking_type_processor_cores_.end(); ++it) {
+  for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
+           model_type_sync_workers_.begin();
+       it != model_type_sync_workers_.end();
+       ++it) {
     enabled_off_thread_types.Put((*it)->GetModelType());
   }
   return enabled_off_thread_types;