Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / sync / sessions / model_type_registry.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/sessions/model_type_registry.h"
6
7 #include "base/bind.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
19
20 namespace syncer {
21
22 namespace {
23
24 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
25  public:
26   ModelTypeSyncProxyWrapper(
27       const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
28       const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29   ~ModelTypeSyncProxyWrapper() override;
30
31   void OnCommitCompleted(const DataTypeState& type_state,
32                          const CommitResponseDataList& response_list) override;
33   void OnUpdateReceived(const DataTypeState& type_state,
34                         const UpdateResponseDataList& response_list,
35                         const UpdateResponseDataList& pending_updates) override;
36
37  private:
38   base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
39   scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
40 };
41
42 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
43     const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
44     const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
45     : processor_(proxy), processor_task_runner_(processor_task_runner) {
46 }
47
48 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
49 }
50
51 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
52     const DataTypeState& type_state,
53     const CommitResponseDataList& response_list) {
54   processor_task_runner_->PostTask(
55       FROM_HERE,
56       base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
57                  processor_,
58                  type_state,
59                  response_list));
60 }
61
62 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
63     const DataTypeState& type_state,
64     const UpdateResponseDataList& response_list,
65     const UpdateResponseDataList& pending_updates) {
66   processor_task_runner_->PostTask(
67       FROM_HERE,
68       base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
69                  processor_,
70                  type_state,
71                  response_list,
72                  pending_updates));
73 }
74
75 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
76  public:
77   ModelTypeSyncWorkerWrapper(
78       const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
79       const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
80   ~ModelTypeSyncWorkerWrapper() override;
81
82   void EnqueueForCommit(const CommitRequestDataList& list) override;
83
84  private:
85   base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
86   scoped_refptr<base::SequencedTaskRunner> sync_thread_;
87 };
88
89 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
90     const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
91     const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
92     : worker_(worker), sync_thread_(sync_thread) {
93 }
94
95 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
96 }
97
98 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
99     const CommitRequestDataList& list) {
100   sync_thread_->PostTask(
101       FROM_HERE,
102       base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
103 }
104
105 }  // namespace
106
107 ModelTypeRegistry::ModelTypeRegistry(
108     const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
109     syncable::Directory* directory,
110     NudgeHandler* nudge_handler)
111     : directory_(directory),
112       nudge_handler_(nudge_handler),
113       weak_ptr_factory_(this) {
114   for (size_t i = 0u; i < workers.size(); ++i) {
115     workers_map_.insert(
116         std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
117   }
118 }
119
120 ModelTypeRegistry::~ModelTypeRegistry() {
121 }
122
123 void ModelTypeRegistry::SetEnabledDirectoryTypes(
124     const ModelSafeRoutingInfo& routing_info) {
125   // Remove all existing directory processors and delete them.  The
126   // DebugInfoEmitters are not deleted here, since we want to preserve their
127   // counters.
128   for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
129        it.Good(); it.Inc()) {
130     size_t result1 = update_handler_map_.erase(it.Get());
131     size_t result2 = commit_contributor_map_.erase(it.Get());
132     DCHECK_EQ(1U, result1);
133     DCHECK_EQ(1U, result2);
134   }
135
136   // Clear the old instances of directory update handlers and commit
137   // contributors, deleting their contents in the processs.
138   directory_update_handlers_.clear();
139   directory_commit_contributors_.clear();
140
141   // Create new ones and add them to the appropriate containers.
142   for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
143        routing_iter != routing_info.end(); ++routing_iter) {
144     ModelType type = routing_iter->first;
145     ModelSafeGroup group = routing_iter->second;
146     std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
147         worker_it = workers_map_.find(group);
148     DCHECK(worker_it != workers_map_.end());
149     scoped_refptr<ModelSafeWorker> worker = worker_it->second;
150
151     // DebugInfoEmitters are never deleted.  Use existing one if we have it.
152     DirectoryTypeDebugInfoEmitter* emitter = NULL;
153     DirectoryTypeDebugInfoEmitterMap::iterator it =
154         directory_type_debug_info_emitter_map_.find(type);
155     if (it != directory_type_debug_info_emitter_map_.end()) {
156       emitter = it->second;
157     } else {
158       emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
159                                                   &type_debug_info_observers_);
160       directory_type_debug_info_emitter_map_.insert(
161           std::make_pair(type, emitter));
162       directory_type_debug_info_emitters_.push_back(emitter);
163     }
164
165     DirectoryCommitContributor* committer =
166         new DirectoryCommitContributor(directory_, type, emitter);
167     DirectoryUpdateHandler* updater =
168         new DirectoryUpdateHandler(directory_, type, worker, emitter);
169
170     // These containers take ownership of their contents.
171     directory_commit_contributors_.push_back(committer);
172     directory_update_handlers_.push_back(updater);
173
174     bool inserted1 =
175         update_handler_map_.insert(std::make_pair(type, updater)).second;
176     DCHECK(inserted1) << "Attempt to override existing type handler in map";
177
178     bool inserted2 =
179         commit_contributor_map_.insert(std::make_pair(type, committer)).second;
180     DCHECK(inserted2) << "Attempt to override existing type handler in map";
181   }
182
183   enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
184   DCHECK(Intersection(GetEnabledDirectoryTypes(),
185                       GetEnabledNonBlockingTypes()).Empty());
186 }
187
188 void ModelTypeRegistry::ConnectSyncTypeToWorker(
189     ModelType type,
190     const DataTypeState& data_type_state,
191     const UpdateResponseDataList& saved_pending_updates,
192     const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
193     const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
194   DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
195
196   // Initialize Worker -> Proxy communication channel.
197   scoped_ptr<ModelTypeSyncProxy> proxy(
198       new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
199   scoped_ptr<Cryptographer> cryptographer_copy;
200   if (encrypted_types_.Has(type))
201     cryptographer_copy.reset(new Cryptographer(*cryptographer_));
202
203   scoped_ptr<ModelTypeSyncWorkerImpl> worker(
204       new ModelTypeSyncWorkerImpl(type,
205                                   data_type_state,
206                                   saved_pending_updates,
207                                   cryptographer_copy.Pass(),
208                                   nudge_handler_,
209                                   proxy.Pass()));
210
211   // Initialize Proxy -> Worker communication channel.
212   scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
213       new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
214                                      scoped_refptr<base::SequencedTaskRunner>(
215                                          base::ThreadTaskRunnerHandle::Get())));
216   type_task_runner->PostTask(FROM_HERE,
217                              base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
218                                         proxy_impl,
219                                         base::Passed(&wrapped_worker)));
220
221   DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
222   DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
223
224   update_handler_map_.insert(std::make_pair(type, worker.get()));
225   commit_contributor_map_.insert(std::make_pair(type, worker.get()));
226
227   // The container takes ownership.
228   model_type_sync_workers_.push_back(worker.release());
229
230   DCHECK(Intersection(GetEnabledDirectoryTypes(),
231                       GetEnabledNonBlockingTypes()).Empty());
232 }
233
234 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
235   DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
236   DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
237   DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
238
239   size_t updaters_erased = update_handler_map_.erase(type);
240   size_t committers_erased = commit_contributor_map_.erase(type);
241
242   DCHECK_EQ(1U, updaters_erased);
243   DCHECK_EQ(1U, committers_erased);
244
245   // Remove from the ScopedVector, deleting the worker in the process.
246   for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
247            model_type_sync_workers_.begin();
248        it != model_type_sync_workers_.end();
249        ++it) {
250     if ((*it)->GetModelType() == type) {
251       model_type_sync_workers_.erase(it);
252       break;
253     }
254   }
255 }
256
257 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
258   return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
259 }
260
261 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
262   return &update_handler_map_;
263 }
264
265 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
266   return &commit_contributor_map_;
267 }
268
269 DirectoryTypeDebugInfoEmitterMap*
270 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
271   return &directory_type_debug_info_emitter_map_;
272 }
273
274 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
275     syncer::TypeDebugInfoObserver* observer) {
276   if (!type_debug_info_observers_.HasObserver(observer))
277     type_debug_info_observers_.AddObserver(observer);
278 }
279
280 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
281     syncer::TypeDebugInfoObserver* observer) {
282   type_debug_info_observers_.RemoveObserver(observer);
283 }
284
285 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
286     syncer::TypeDebugInfoObserver* observer) {
287   return type_debug_info_observers_.HasObserver(observer);
288 }
289
290 void ModelTypeRegistry::RequestEmitDebugInfo() {
291   for (DirectoryTypeDebugInfoEmitterMap::iterator it =
292        directory_type_debug_info_emitter_map_.begin();
293        it != directory_type_debug_info_emitter_map_.end(); ++it) {
294     it->second->EmitCommitCountersUpdate();
295     it->second->EmitUpdateCountersUpdate();
296     it->second->EmitStatusCountersUpdate();
297   }
298 }
299
300 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
301   return weak_ptr_factory_.GetWeakPtr();
302 }
303
304 void ModelTypeRegistry::OnPassphraseRequired(
305     PassphraseRequiredReason reason,
306     const sync_pb::EncryptedData& pending_keys) {
307 }
308
309 void ModelTypeRegistry::OnPassphraseAccepted() {
310 }
311
312 void ModelTypeRegistry::OnBootstrapTokenUpdated(
313     const std::string& bootstrap_token,
314     BootstrapTokenType type) {
315 }
316
317 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
318                                                 bool encrypt_everything) {
319   encrypted_types_ = encrypted_types;
320   OnEncryptionStateChanged();
321 }
322
323 void ModelTypeRegistry::OnEncryptionComplete() {
324 }
325
326 void ModelTypeRegistry::OnCryptographerStateChanged(
327     Cryptographer* cryptographer) {
328   cryptographer_.reset(new Cryptographer(*cryptographer));
329   OnEncryptionStateChanged();
330 }
331
332 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
333                                                 base::Time passphrase_time) {
334 }
335
336 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
337   return enabled_directory_types_;
338 }
339
340 void ModelTypeRegistry::OnEncryptionStateChanged() {
341   for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
342            model_type_sync_workers_.begin();
343        it != model_type_sync_workers_.end();
344        ++it) {
345     if (encrypted_types_.Has((*it)->GetModelType())) {
346       (*it)->UpdateCryptographer(
347           make_scoped_ptr(new Cryptographer(*cryptographer_)));
348     }
349   }
350 }
351
352 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
353   ModelTypeSet enabled_off_thread_types;
354   for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
355            model_type_sync_workers_.begin();
356        it != model_type_sync_workers_.end();
357        ++it) {
358     enabled_off_thread_types.Put((*it)->GetModelType());
359   }
360   return enabled_off_thread_types;
361 }
362
363 }  // namespace syncer