#include "sync/sessions/model_type_registry.h"
#include "base/bind.h"
-#include "base/message_loop/message_loop_proxy.h"
#include "base/observer_list.h"
+#include "base/thread_task_runner_handle.h"
#include "sync/engine/directory_commit_contributor.h"
#include "sync/engine/directory_update_handler.h"
-#include "sync/engine/non_blocking_sync_common.h"
-#include "sync/engine/non_blocking_type_processor.h"
-#include "sync/engine/non_blocking_type_processor_core.h"
-#include "sync/engine/non_blocking_type_processor_core_interface.h"
-#include "sync/engine/non_blocking_type_processor_interface.h"
+#include "sync/engine/model_type_sync_proxy.h"
+#include "sync/engine/model_type_sync_proxy_impl.h"
+#include "sync/engine/model_type_sync_worker.h"
+#include "sync/engine/model_type_sync_worker_impl.h"
+#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/sessions/directory_type_debug_info_emitter.h"
+#include "sync/util/cryptographer.h"
namespace syncer {
namespace {
-class NonBlockingTypeProcessorWrapper
- : public NonBlockingTypeProcessorInterface {
+class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
public:
- NonBlockingTypeProcessorWrapper(
- base::WeakPtr<NonBlockingTypeProcessor> processor,
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
- virtual ~NonBlockingTypeProcessorWrapper();
+ ModelTypeSyncProxyWrapper(
+ const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
+ const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
+ virtual ~ModelTypeSyncProxyWrapper();
- virtual void ReceiveCommitResponse(
+ virtual void OnCommitCompleted(
const DataTypeState& type_state,
const CommitResponseDataList& response_list) OVERRIDE;
- virtual void ReceiveUpdateResponse(
+ virtual void OnUpdateReceived(
const DataTypeState& type_state,
- const UpdateResponseDataList& response_list) OVERRIDE;
+ const UpdateResponseDataList& response_list,
+ const UpdateResponseDataList& pending_updates) OVERRIDE;
private:
- base::WeakPtr<NonBlockingTypeProcessor> processor_;
+ base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
};
-NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
- base::WeakPtr<NonBlockingTypeProcessor> processor,
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
- : processor_(processor), processor_task_runner_(processor_task_runner) {
+ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
+ const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
+ const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
+ : processor_(proxy), processor_task_runner_(processor_task_runner) {
}
-NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
+ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
}
-void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
+void ModelTypeSyncProxyWrapper::OnCommitCompleted(
const DataTypeState& type_state,
const CommitResponseDataList& response_list) {
processor_task_runner_->PostTask(
FROM_HERE,
- base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
+ base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
processor_,
type_state,
response_list));
}
-void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
+void ModelTypeSyncProxyWrapper::OnUpdateReceived(
const DataTypeState& type_state,
- const UpdateResponseDataList& response_list) {
+ const UpdateResponseDataList& response_list,
+ const UpdateResponseDataList& pending_updates) {
processor_task_runner_->PostTask(
FROM_HERE,
- base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
+ base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
processor_,
type_state,
- response_list));
+ response_list,
+ pending_updates));
}
-class NonBlockingTypeProcessorCoreWrapper
- : public NonBlockingTypeProcessorCoreInterface {
+class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
public:
- NonBlockingTypeProcessorCoreWrapper(
- base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread);
- virtual ~NonBlockingTypeProcessorCoreWrapper();
+ ModelTypeSyncWorkerWrapper(
+ const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
+ const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
+ virtual ~ModelTypeSyncWorkerWrapper();
- virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
+ virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
private:
- base::WeakPtr<NonBlockingTypeProcessorCore> core_;
+ base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
scoped_refptr<base::SequencedTaskRunner> sync_thread_;
};
-NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
- base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread)
- : core_(core), sync_thread_(sync_thread) {
+ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
+ const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
+ const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
+ : worker_(worker), sync_thread_(sync_thread) {
}
-NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
+ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
}
-void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
+void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
const CommitRequestDataList& list) {
sync_thread_->PostTask(
FROM_HERE,
- base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
+ base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
}
} // namespace
-ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
-
ModelTypeRegistry::ModelTypeRegistry(
const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
- syncable::Directory* directory)
- : directory_(directory) {
+ syncable::Directory* directory,
+ NudgeHandler* nudge_handler)
+ : directory_(directory),
+ cryptographer_provider_(directory_),
+ nudge_handler_(nudge_handler),
+ weak_ptr_factory_(this) {
for (size_t i = 0u; i < workers.size(); ++i) {
workers_map_.insert(
std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
GetEnabledNonBlockingTypes()).Empty());
}
-void ModelTypeRegistry::InitializeNonBlockingType(
+void ModelTypeRegistry::ConnectSyncTypeToWorker(
ModelType type,
const DataTypeState& data_type_state,
- scoped_refptr<base::SequencedTaskRunner> type_task_runner,
- base::WeakPtr<NonBlockingTypeProcessor> processor) {
+ const UpdateResponseDataList& saved_pending_updates,
+ const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
+ const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
- // Initialize CoreProcessor -> Processor communication channel.
- scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
- new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
- scoped_ptr<NonBlockingTypeProcessorCore> core(
- new NonBlockingTypeProcessorCore(
- type, data_type_state, processor_interface.Pass()));
-
- // Initialize Processor -> CoreProcessor communication channel.
- scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
- new NonBlockingTypeProcessorCoreWrapper(
- core->AsWeakPtr(),
- scoped_refptr<base::SequencedTaskRunner>(
- base::MessageLoopProxy::current())));
+ // Initialize Worker -> Proxy communication channel.
+ scoped_ptr<ModelTypeSyncProxy> proxy(
+ new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
+ scoped_ptr<ModelTypeSyncWorkerImpl> worker(
+ new ModelTypeSyncWorkerImpl(type,
+ data_type_state,
+ saved_pending_updates,
+ &cryptographer_provider_,
+ nudge_handler_,
+ proxy.Pass()));
+
+ // Initialize Proxy -> Worker communication channel.
+ scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
+ new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
+ scoped_refptr<base::SequencedTaskRunner>(
+ base::ThreadTaskRunnerHandle::Get())));
type_task_runner->PostTask(FROM_HERE,
- base::Bind(&NonBlockingTypeProcessor::OnConnect,
- processor,
- base::Passed(&core_interface)));
+ base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
+ proxy_impl,
+ base::Passed(&wrapped_worker)));
DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
- update_handler_map_.insert(std::make_pair(type, core.get()));
- commit_contributor_map_.insert(std::make_pair(type, core.get()));
+ update_handler_map_.insert(std::make_pair(type, worker.get()));
+ commit_contributor_map_.insert(std::make_pair(type, worker.get()));
// The container takes ownership.
- non_blocking_type_processor_cores_.push_back(core.release());
+ model_type_sync_workers_.push_back(worker.release());
DCHECK(Intersection(GetEnabledDirectoryTypes(),
GetEnabledNonBlockingTypes()).Empty());
}
-void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
+void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
DCHECK_EQ(1U, updaters_erased);
DCHECK_EQ(1U, committers_erased);
- // Remove from the ScopedVector, deleting the core in the process.
- for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
- non_blocking_type_processor_cores_.begin();
- it != non_blocking_type_processor_cores_.end(); ++it) {
+ // Remove from the ScopedVector, deleting the worker in the process.
+ for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
+ model_type_sync_workers_.begin();
+ it != model_type_sync_workers_.end();
+ ++it) {
if ((*it)->GetModelType() == type) {
- non_blocking_type_processor_cores_.erase(it);
+ model_type_sync_workers_.erase(it);
break;
}
}
}
}
+base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
+ return weak_ptr_factory_.GetWeakPtr();
+}
+
ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
return enabled_directory_types_;
}
ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
ModelTypeSet enabled_off_thread_types;
- for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
- non_blocking_type_processor_cores_.begin();
- it != non_blocking_type_processor_cores_.end(); ++it) {
+ for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
+ model_type_sync_workers_.begin();
+ it != model_type_sync_workers_.end();
+ ++it) {
enabled_off_thread_types.Put((*it)->GetModelType());
}
return enabled_off_thread_types;