1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/non_blocking_type_processor.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "sync/engine/model_thread_sync_entity.h"
11 #include "sync/engine/non_blocking_type_processor_core_interface.h"
12 #include "sync/internal_api/public/sync_core_proxy.h"
13 #include "sync/syncable/syncable_util.h"
17 NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
21 entities_deleter_(&entities_),
22 weak_ptr_factory_for_ui_(this),
23 weak_ptr_factory_for_sync_(this) {
26 NonBlockingTypeProcessor::~NonBlockingTypeProcessor() {
29 bool NonBlockingTypeProcessor::IsPreferred() const {
30 DCHECK(CalledOnValidThread());
34 bool NonBlockingTypeProcessor::IsConnected() const {
35 DCHECK(CalledOnValidThread());
39 ModelType NonBlockingTypeProcessor::GetModelType() const {
40 DCHECK(CalledOnValidThread());
44 void NonBlockingTypeProcessor::Enable(
45 scoped_ptr<SyncCoreProxy> sync_core_proxy) {
46 DCHECK(CalledOnValidThread());
47 DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
51 // TODO(rlarocque): At some point, this should be loaded from storage.
52 data_type_state_.progress_marker.set_data_type_id(
53 GetSpecificsFieldNumberFromModelType(type_));
55 sync_core_proxy_ = sync_core_proxy.Pass();
56 sync_core_proxy_->ConnectTypeToCore(GetModelType(),
58 weak_ptr_factory_for_sync_.GetWeakPtr());
61 void NonBlockingTypeProcessor::Disable() {
62 DCHECK(CalledOnValidThread());
63 is_preferred_ = false;
67 void NonBlockingTypeProcessor::Disconnect() {
68 DCHECK(CalledOnValidThread());
69 DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
70 is_connected_ = false;
72 if (sync_core_proxy_) {
73 sync_core_proxy_->Disconnect(GetModelType());
74 sync_core_proxy_.reset();
77 weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
78 core_interface_.reset();
81 base::WeakPtr<NonBlockingTypeProcessor>
82 NonBlockingTypeProcessor::AsWeakPtrForUI() {
83 DCHECK(CalledOnValidThread());
84 return weak_ptr_factory_for_ui_.GetWeakPtr();
87 void NonBlockingTypeProcessor::OnConnect(
88 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
89 DCHECK(CalledOnValidThread());
90 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
93 core_interface_ = core_interface.Pass();
95 FlushPendingCommitRequests();
98 void NonBlockingTypeProcessor::Put(const std::string& client_tag,
99 const sync_pb::EntitySpecifics& specifics) {
100 DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
102 const std::string client_tag_hash(
103 syncable::GenerateSyncableHash(type_, client_tag));
105 EntityMap::iterator it = entities_.find(client_tag_hash);
106 if (it == entities_.end()) {
107 scoped_ptr<ModelThreadSyncEntity> entity(
108 ModelThreadSyncEntity::NewLocalItem(
109 client_tag, specifics, base::Time::Now()));
110 entities_.insert(std::make_pair(client_tag_hash, entity.release()));
112 ModelThreadSyncEntity* entity = it->second;
113 entity->MakeLocalChange(specifics);
116 FlushPendingCommitRequests();
119 void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
120 const std::string client_tag_hash(
121 syncable::GenerateSyncableHash(type_, client_tag));
123 EntityMap::iterator it = entities_.find(client_tag_hash);
124 if (it == entities_.end()) {
125 // That's unusual, but not necessarily a bad thing.
126 // Missing is as good as deleted as far as the model is concerned.
127 DLOG(WARNING) << "Attempted to delete missing item."
128 << " client tag: " << client_tag;
130 ModelThreadSyncEntity* entity = it->second;
134 FlushPendingCommitRequests();
137 void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
138 CommitRequestDataList commit_requests;
140 // Don't bother sending anything if there's no one to send to.
144 // Don't send anything if the type is not ready to handle commits.
145 if (!data_type_state_.initial_sync_done)
148 // TODO(rlarocque): Do something smarter than iterate here.
149 for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
151 if (it->second->RequiresCommitRequest()) {
152 CommitRequestData request;
153 it->second->InitializeCommitRequestData(&request);
154 commit_requests.push_back(request);
155 it->second->SetCommitRequestInProgress();
159 if (!commit_requests.empty())
160 core_interface_->RequestCommits(commit_requests);
163 void NonBlockingTypeProcessor::OnCommitCompletion(
164 const DataTypeState& type_state,
165 const CommitResponseDataList& response_list) {
166 data_type_state_ = type_state;
168 for (CommitResponseDataList::const_iterator list_it = response_list.begin();
169 list_it != response_list.end();
171 const CommitResponseData& response_data = *list_it;
172 const std::string& client_tag_hash = response_data.client_tag_hash;
174 EntityMap::iterator it = entities_.find(client_tag_hash);
175 if (it == entities_.end()) {
176 NOTREACHED() << "Received commit response for missing item."
177 << " type: " << type_ << " client_tag: " << client_tag_hash;
180 it->second->ReceiveCommitResponse(response_data.id,
181 response_data.sequence_number,
182 response_data.response_version);
187 void NonBlockingTypeProcessor::OnUpdateReceived(
188 const DataTypeState& data_type_state,
189 const UpdateResponseDataList& response_list) {
190 bool initial_sync_just_finished =
191 !data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
193 data_type_state_ = data_type_state;
195 for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
196 list_it != response_list.end();
198 const UpdateResponseData& response_data = *list_it;
199 const std::string& client_tag_hash = response_data.client_tag_hash;
201 EntityMap::iterator it = entities_.find(client_tag_hash);
202 if (it == entities_.end()) {
203 scoped_ptr<ModelThreadSyncEntity> entity =
204 ModelThreadSyncEntity::FromServerUpdate(
206 response_data.client_tag_hash,
207 response_data.non_unique_name,
208 response_data.response_version,
209 response_data.specifics,
210 response_data.deleted,
212 response_data.mtime);
213 entities_.insert(std::make_pair(client_tag_hash, entity.release()));
215 ModelThreadSyncEntity* entity = it->second;
216 entity->ApplyUpdateFromServer(response_data.response_version,
217 response_data.deleted,
218 response_data.specifics,
219 response_data.mtime);
220 // TODO: Do something special when conflicts are detected.
224 if (initial_sync_just_finished)
225 FlushPendingCommitRequests();
227 // TODO: Inform the model of the new or updated data.
230 } // namespace syncer