1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/model_type_sync_proxy_impl.h"
8 #include "base/location.h"
9 #include "sync/engine/model_type_entity.h"
10 #include "sync/engine/model_type_sync_worker.h"
11 #include "sync/internal_api/public/sync_context_proxy.h"
12 #include "sync/syncable/syncable_util.h"
16 ModelTypeSyncProxyImpl::ModelTypeSyncProxyImpl(ModelType type)
20 entities_deleter_(&entities_),
21 pending_updates_map_deleter_(&pending_updates_map_),
22 weak_ptr_factory_for_ui_(this),
23 weak_ptr_factory_for_sync_(this) {
26 ModelTypeSyncProxyImpl::~ModelTypeSyncProxyImpl() {
29 bool ModelTypeSyncProxyImpl::IsPreferred() const {
30 DCHECK(CalledOnValidThread());
34 bool ModelTypeSyncProxyImpl::IsConnected() const {
35 DCHECK(CalledOnValidThread());
39 ModelType ModelTypeSyncProxyImpl::GetModelType() const {
40 DCHECK(CalledOnValidThread());
44 void ModelTypeSyncProxyImpl::Enable(
45 scoped_ptr<SyncContextProxy> sync_context_proxy) {
46 DCHECK(CalledOnValidThread());
47 DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
51 // TODO(rlarocque): At some point, this should be loaded from storage.
52 data_type_state_.progress_marker.set_data_type_id(
53 GetSpecificsFieldNumberFromModelType(type_));
55 UpdateResponseDataList saved_pending_updates = GetPendingUpdates();
56 sync_context_proxy_ = sync_context_proxy.Pass();
57 sync_context_proxy_->ConnectTypeToSync(
60 saved_pending_updates,
61 weak_ptr_factory_for_sync_.GetWeakPtr());
64 void ModelTypeSyncProxyImpl::Disable() {
65 DCHECK(CalledOnValidThread());
66 is_preferred_ = false;
72 void ModelTypeSyncProxyImpl::Disconnect() {
73 DCHECK(CalledOnValidThread());
74 DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
75 is_connected_ = false;
77 if (sync_context_proxy_) {
78 sync_context_proxy_->Disconnect(GetModelType());
79 sync_context_proxy_.reset();
82 weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
85 ClearTransientSyncState();
88 base::WeakPtr<ModelTypeSyncProxyImpl> ModelTypeSyncProxyImpl::AsWeakPtrForUI() {
89 DCHECK(CalledOnValidThread());
90 return weak_ptr_factory_for_ui_.GetWeakPtr();
93 void ModelTypeSyncProxyImpl::OnConnect(scoped_ptr<ModelTypeSyncWorker> worker) {
94 DCHECK(CalledOnValidThread());
95 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
98 worker_ = worker.Pass();
100 FlushPendingCommitRequests();
103 void ModelTypeSyncProxyImpl::Put(const std::string& client_tag,
104 const sync_pb::EntitySpecifics& specifics) {
105 DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
107 const std::string client_tag_hash(
108 syncable::GenerateSyncableHash(type_, client_tag));
110 EntityMap::iterator it = entities_.find(client_tag_hash);
111 if (it == entities_.end()) {
112 scoped_ptr<ModelTypeEntity> entity(ModelTypeEntity::NewLocalItem(
113 client_tag, specifics, base::Time::Now()));
114 entities_.insert(std::make_pair(client_tag_hash, entity.release()));
116 ModelTypeEntity* entity = it->second;
117 entity->MakeLocalChange(specifics);
120 FlushPendingCommitRequests();
123 void ModelTypeSyncProxyImpl::Delete(const std::string& client_tag) {
124 const std::string client_tag_hash(
125 syncable::GenerateSyncableHash(type_, client_tag));
127 EntityMap::iterator it = entities_.find(client_tag_hash);
128 if (it == entities_.end()) {
129 // That's unusual, but not necessarily a bad thing.
130 // Missing is as good as deleted as far as the model is concerned.
131 DLOG(WARNING) << "Attempted to delete missing item."
132 << " client tag: " << client_tag;
134 ModelTypeEntity* entity = it->second;
138 FlushPendingCommitRequests();
141 void ModelTypeSyncProxyImpl::FlushPendingCommitRequests() {
142 CommitRequestDataList commit_requests;
144 // Don't bother sending anything if there's no one to send to.
148 // Don't send anything if the type is not ready to handle commits.
149 if (!data_type_state_.initial_sync_done)
152 // TODO(rlarocque): Do something smarter than iterate here.
153 for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
155 if (it->second->RequiresCommitRequest()) {
156 CommitRequestData request;
157 it->second->InitializeCommitRequestData(&request);
158 commit_requests.push_back(request);
159 it->second->SetCommitRequestInProgress();
163 if (!commit_requests.empty())
164 worker_->EnqueueForCommit(commit_requests);
167 void ModelTypeSyncProxyImpl::OnCommitCompleted(
168 const DataTypeState& type_state,
169 const CommitResponseDataList& response_list) {
170 data_type_state_ = type_state;
172 for (CommitResponseDataList::const_iterator list_it = response_list.begin();
173 list_it != response_list.end();
175 const CommitResponseData& response_data = *list_it;
176 const std::string& client_tag_hash = response_data.client_tag_hash;
178 EntityMap::iterator it = entities_.find(client_tag_hash);
179 if (it == entities_.end()) {
180 NOTREACHED() << "Received commit response for missing item."
181 << " type: " << type_ << " client_tag: " << client_tag_hash;
184 it->second->ReceiveCommitResponse(response_data.id,
185 response_data.sequence_number,
186 response_data.response_version,
187 data_type_state_.encryption_key_name);
192 void ModelTypeSyncProxyImpl::OnUpdateReceived(
193 const DataTypeState& data_type_state,
194 const UpdateResponseDataList& response_list,
195 const UpdateResponseDataList& pending_updates) {
196 bool got_new_encryption_requirements = data_type_state_.encryption_key_name !=
197 data_type_state.encryption_key_name;
199 data_type_state_ = data_type_state;
201 for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
202 list_it != response_list.end();
204 const UpdateResponseData& response_data = *list_it;
205 const std::string& client_tag_hash = response_data.client_tag_hash;
207 UpdateMap::iterator old_it = pending_updates_map_.find(client_tag_hash);
208 if (old_it != pending_updates_map_.end()) {
209 // If we're being asked to apply an update to this entity, this overrides
210 // the previous pending updates.
211 delete old_it->second;
212 pending_updates_map_.erase(old_it);
215 EntityMap::iterator it = entities_.find(client_tag_hash);
216 if (it == entities_.end()) {
217 scoped_ptr<ModelTypeEntity> entity =
218 ModelTypeEntity::FromServerUpdate(response_data.id,
219 response_data.client_tag_hash,
220 response_data.non_unique_name,
221 response_data.response_version,
222 response_data.specifics,
223 response_data.deleted,
226 response_data.encryption_key_name);
227 entities_.insert(std::make_pair(client_tag_hash, entity.release()));
229 ModelTypeEntity* entity = it->second;
230 entity->ApplyUpdateFromServer(response_data.response_version,
231 response_data.deleted,
232 response_data.specifics,
234 response_data.encryption_key_name);
236 // TODO: Do something special when conflicts are detected.
239 // If the received entity has out of date encryption, we schedule another
241 if (data_type_state_.encryption_key_name !=
242 response_data.encryption_key_name) {
243 DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
244 << response_data.encryption_key_name << " -> "
245 << data_type_state_.encryption_key_name;
246 EntityMap::iterator it2 = entities_.find(client_tag_hash);
247 it2->second->UpdateDesiredEncryptionKey(
248 data_type_state_.encryption_key_name);
252 // Save pending updates in the appropriate data structure.
253 for (UpdateResponseDataList::const_iterator list_it = pending_updates.begin();
254 list_it != pending_updates.end();
256 const UpdateResponseData& update = *list_it;
257 const std::string& client_tag_hash = update.client_tag_hash;
259 UpdateMap::iterator lookup_it = pending_updates_map_.find(client_tag_hash);
260 if (lookup_it == pending_updates_map_.end()) {
261 pending_updates_map_.insert(
262 std::make_pair(client_tag_hash, new UpdateResponseData(update)));
263 } else if (lookup_it->second->response_version <= update.response_version) {
264 delete lookup_it->second;
265 pending_updates_map_.erase(lookup_it);
266 pending_updates_map_.insert(
267 std::make_pair(client_tag_hash, new UpdateResponseData(update)));
269 // Received update is stale, do not overwrite existing.
273 if (got_new_encryption_requirements) {
274 for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
276 it->second->UpdateDesiredEncryptionKey(
277 data_type_state_.encryption_key_name);
281 // We may have new reasons to commit by the time this function is done.
282 FlushPendingCommitRequests();
284 // TODO: Inform the model of the new or updated data.
285 // TODO: Persist the new data on disk.
288 UpdateResponseDataList ModelTypeSyncProxyImpl::GetPendingUpdates() {
289 UpdateResponseDataList pending_updates_list;
290 for (UpdateMap::const_iterator it = pending_updates_map_.begin();
291 it != pending_updates_map_.end();
293 pending_updates_list.push_back(*it->second);
295 return pending_updates_list;
298 void ModelTypeSyncProxyImpl::ClearTransientSyncState() {
299 for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
301 it->second->ClearTransientSyncState();
305 void ModelTypeSyncProxyImpl::ClearSyncState() {
306 for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
308 it->second->ClearSyncState();
310 STLDeleteValues(&pending_updates_map_);
311 data_type_state_ = DataTypeState();
314 } // namespace syncer