Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / sync / engine / model_type_sync_proxy_impl.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/engine/model_type_sync_proxy_impl.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "sync/engine/model_type_entity.h"
10 #include "sync/engine/model_type_sync_worker.h"
11 #include "sync/internal_api/public/sync_context_proxy.h"
12 #include "sync/syncable/syncable_util.h"
13
14 namespace syncer {
15
16 ModelTypeSyncProxyImpl::ModelTypeSyncProxyImpl(ModelType type)
17     : type_(type),
18       is_preferred_(false),
19       is_connected_(false),
20       entities_deleter_(&entities_),
21       pending_updates_map_deleter_(&pending_updates_map_),
22       weak_ptr_factory_for_ui_(this),
23       weak_ptr_factory_for_sync_(this) {
24 }
25
26 ModelTypeSyncProxyImpl::~ModelTypeSyncProxyImpl() {
27 }
28
29 bool ModelTypeSyncProxyImpl::IsPreferred() const {
30   DCHECK(CalledOnValidThread());
31   return is_preferred_;
32 }
33
34 bool ModelTypeSyncProxyImpl::IsConnected() const {
35   DCHECK(CalledOnValidThread());
36   return is_connected_;
37 }
38
39 ModelType ModelTypeSyncProxyImpl::GetModelType() const {
40   DCHECK(CalledOnValidThread());
41   return type_;
42 }
43
44 void ModelTypeSyncProxyImpl::Enable(
45     scoped_ptr<SyncContextProxy> sync_context_proxy) {
46   DCHECK(CalledOnValidThread());
47   DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
48
49   is_preferred_ = true;
50
51   // TODO(rlarocque): At some point, this should be loaded from storage.
52   data_type_state_.progress_marker.set_data_type_id(
53       GetSpecificsFieldNumberFromModelType(type_));
54
55   UpdateResponseDataList saved_pending_updates = GetPendingUpdates();
56   sync_context_proxy_ = sync_context_proxy.Pass();
57   sync_context_proxy_->ConnectTypeToSync(
58       GetModelType(),
59       data_type_state_,
60       saved_pending_updates,
61       weak_ptr_factory_for_sync_.GetWeakPtr());
62 }
63
64 void ModelTypeSyncProxyImpl::Disable() {
65   DCHECK(CalledOnValidThread());
66   is_preferred_ = false;
67   Disconnect();
68
69   ClearSyncState();
70 }
71
72 void ModelTypeSyncProxyImpl::Disconnect() {
73   DCHECK(CalledOnValidThread());
74   DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
75   is_connected_ = false;
76
77   if (sync_context_proxy_) {
78     sync_context_proxy_->Disconnect(GetModelType());
79     sync_context_proxy_.reset();
80   }
81
82   weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
83   worker_.reset();
84
85   ClearTransientSyncState();
86 }
87
88 base::WeakPtr<ModelTypeSyncProxyImpl> ModelTypeSyncProxyImpl::AsWeakPtrForUI() {
89   DCHECK(CalledOnValidThread());
90   return weak_ptr_factory_for_ui_.GetWeakPtr();
91 }
92
93 void ModelTypeSyncProxyImpl::OnConnect(scoped_ptr<ModelTypeSyncWorker> worker) {
94   DCHECK(CalledOnValidThread());
95   DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
96
97   is_connected_ = true;
98   worker_ = worker.Pass();
99
100   FlushPendingCommitRequests();
101 }
102
103 void ModelTypeSyncProxyImpl::Put(const std::string& client_tag,
104                                  const sync_pb::EntitySpecifics& specifics) {
105   DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
106
107   const std::string client_tag_hash(
108       syncable::GenerateSyncableHash(type_, client_tag));
109
110   EntityMap::iterator it = entities_.find(client_tag_hash);
111   if (it == entities_.end()) {
112     scoped_ptr<ModelTypeEntity> entity(ModelTypeEntity::NewLocalItem(
113         client_tag, specifics, base::Time::Now()));
114     entities_.insert(std::make_pair(client_tag_hash, entity.release()));
115   } else {
116     ModelTypeEntity* entity = it->second;
117     entity->MakeLocalChange(specifics);
118   }
119
120   FlushPendingCommitRequests();
121 }
122
123 void ModelTypeSyncProxyImpl::Delete(const std::string& client_tag) {
124   const std::string client_tag_hash(
125       syncable::GenerateSyncableHash(type_, client_tag));
126
127   EntityMap::iterator it = entities_.find(client_tag_hash);
128   if (it == entities_.end()) {
129     // That's unusual, but not necessarily a bad thing.
130     // Missing is as good as deleted as far as the model is concerned.
131     DLOG(WARNING) << "Attempted to delete missing item."
132                   << " client tag: " << client_tag;
133   } else {
134     ModelTypeEntity* entity = it->second;
135     entity->Delete();
136   }
137
138   FlushPendingCommitRequests();
139 }
140
141 void ModelTypeSyncProxyImpl::FlushPendingCommitRequests() {
142   CommitRequestDataList commit_requests;
143
144   // Don't bother sending anything if there's no one to send to.
145   if (!IsConnected())
146     return;
147
148   // Don't send anything if the type is not ready to handle commits.
149   if (!data_type_state_.initial_sync_done)
150     return;
151
152   // TODO(rlarocque): Do something smarter than iterate here.
153   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
154        ++it) {
155     if (it->second->RequiresCommitRequest()) {
156       CommitRequestData request;
157       it->second->InitializeCommitRequestData(&request);
158       commit_requests.push_back(request);
159       it->second->SetCommitRequestInProgress();
160     }
161   }
162
163   if (!commit_requests.empty())
164     worker_->EnqueueForCommit(commit_requests);
165 }
166
167 void ModelTypeSyncProxyImpl::OnCommitCompleted(
168     const DataTypeState& type_state,
169     const CommitResponseDataList& response_list) {
170   data_type_state_ = type_state;
171
172   for (CommitResponseDataList::const_iterator list_it = response_list.begin();
173        list_it != response_list.end();
174        ++list_it) {
175     const CommitResponseData& response_data = *list_it;
176     const std::string& client_tag_hash = response_data.client_tag_hash;
177
178     EntityMap::iterator it = entities_.find(client_tag_hash);
179     if (it == entities_.end()) {
180       NOTREACHED() << "Received commit response for missing item."
181                    << " type: " << type_ << " client_tag: " << client_tag_hash;
182       return;
183     } else {
184       it->second->ReceiveCommitResponse(response_data.id,
185                                         response_data.sequence_number,
186                                         response_data.response_version,
187                                         data_type_state_.encryption_key_name);
188     }
189   }
190 }
191
192 void ModelTypeSyncProxyImpl::OnUpdateReceived(
193     const DataTypeState& data_type_state,
194     const UpdateResponseDataList& response_list,
195     const UpdateResponseDataList& pending_updates) {
196   bool got_new_encryption_requirements = data_type_state_.encryption_key_name !=
197                                          data_type_state.encryption_key_name;
198
199   data_type_state_ = data_type_state;
200
201   for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
202        list_it != response_list.end();
203        ++list_it) {
204     const UpdateResponseData& response_data = *list_it;
205     const std::string& client_tag_hash = response_data.client_tag_hash;
206
207     UpdateMap::iterator old_it = pending_updates_map_.find(client_tag_hash);
208     if (old_it != pending_updates_map_.end()) {
209       // If we're being asked to apply an update to this entity, this overrides
210       // the previous pending updates.
211       delete old_it->second;
212       pending_updates_map_.erase(old_it);
213     }
214
215     EntityMap::iterator it = entities_.find(client_tag_hash);
216     if (it == entities_.end()) {
217       scoped_ptr<ModelTypeEntity> entity =
218           ModelTypeEntity::FromServerUpdate(response_data.id,
219                                             response_data.client_tag_hash,
220                                             response_data.non_unique_name,
221                                             response_data.response_version,
222                                             response_data.specifics,
223                                             response_data.deleted,
224                                             response_data.ctime,
225                                             response_data.mtime,
226                                             response_data.encryption_key_name);
227       entities_.insert(std::make_pair(client_tag_hash, entity.release()));
228     } else {
229       ModelTypeEntity* entity = it->second;
230       entity->ApplyUpdateFromServer(response_data.response_version,
231                                     response_data.deleted,
232                                     response_data.specifics,
233                                     response_data.mtime,
234                                     response_data.encryption_key_name);
235
236       // TODO: Do something special when conflicts are detected.
237     }
238
239     // If the received entity has out of date encryption, we schedule another
240     // commit to fix it.
241     if (data_type_state_.encryption_key_name !=
242         response_data.encryption_key_name) {
243       DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
244                << response_data.encryption_key_name << " -> "
245                << data_type_state_.encryption_key_name;
246       EntityMap::iterator it2 = entities_.find(client_tag_hash);
247       it2->second->UpdateDesiredEncryptionKey(
248           data_type_state_.encryption_key_name);
249     }
250   }
251
252   // Save pending updates in the appropriate data structure.
253   for (UpdateResponseDataList::const_iterator list_it = pending_updates.begin();
254        list_it != pending_updates.end();
255        ++list_it) {
256     const UpdateResponseData& update = *list_it;
257     const std::string& client_tag_hash = update.client_tag_hash;
258
259     UpdateMap::iterator lookup_it = pending_updates_map_.find(client_tag_hash);
260     if (lookup_it == pending_updates_map_.end()) {
261       pending_updates_map_.insert(
262           std::make_pair(client_tag_hash, new UpdateResponseData(update)));
263     } else if (lookup_it->second->response_version <= update.response_version) {
264       delete lookup_it->second;
265       pending_updates_map_.erase(lookup_it);
266       pending_updates_map_.insert(
267           std::make_pair(client_tag_hash, new UpdateResponseData(update)));
268     } else {
269       // Received update is stale, do not overwrite existing.
270     }
271   }
272
273   if (got_new_encryption_requirements) {
274     for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
275          ++it) {
276       it->second->UpdateDesiredEncryptionKey(
277           data_type_state_.encryption_key_name);
278     }
279   }
280
281   // We may have new reasons to commit by the time this function is done.
282   FlushPendingCommitRequests();
283
284   // TODO: Inform the model of the new or updated data.
285   // TODO: Persist the new data on disk.
286 }
287
288 UpdateResponseDataList ModelTypeSyncProxyImpl::GetPendingUpdates() {
289   UpdateResponseDataList pending_updates_list;
290   for (UpdateMap::const_iterator it = pending_updates_map_.begin();
291        it != pending_updates_map_.end();
292        ++it) {
293     pending_updates_list.push_back(*it->second);
294   }
295   return pending_updates_list;
296 }
297
298 void ModelTypeSyncProxyImpl::ClearTransientSyncState() {
299   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
300        ++it) {
301     it->second->ClearTransientSyncState();
302   }
303 }
304
305 void ModelTypeSyncProxyImpl::ClearSyncState() {
306   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
307        ++it) {
308     it->second->ClearSyncState();
309   }
310   STLDeleteValues(&pending_updates_map_);
311   data_type_state_ = DataTypeState();
312 }
313
314 }  // namespace syncer