1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/values.h"
19 #include "sync/engine/sync_scheduler.h"
20 #include "sync/engine/syncer_types.h"
21 #include "sync/internal_api/change_reorder_buffer.h"
22 #include "sync/internal_api/public/base/cancelation_signal.h"
23 #include "sync/internal_api/public/base/model_type.h"
24 #include "sync/internal_api/public/base_node.h"
25 #include "sync/internal_api/public/configure_reason.h"
26 #include "sync/internal_api/public/engine/polling_constants.h"
27 #include "sync/internal_api/public/http_post_provider_factory.h"
28 #include "sync/internal_api/public/internal_components_factory.h"
29 #include "sync/internal_api/public/read_node.h"
30 #include "sync/internal_api/public/read_transaction.h"
31 #include "sync/internal_api/public/user_share.h"
32 #include "sync/internal_api/public/util/experiments.h"
33 #include "sync/internal_api/public/write_node.h"
34 #include "sync/internal_api/public/write_transaction.h"
35 #include "sync/internal_api/syncapi_internal.h"
36 #include "sync/internal_api/syncapi_server_connection_manager.h"
37 #include "sync/js/js_arg_list.h"
38 #include "sync/js/js_event_details.h"
39 #include "sync/js/js_event_handler.h"
40 #include "sync/js/js_reply_handler.h"
41 #include "sync/notifier/invalidation_util.h"
42 #include "sync/notifier/invalidator.h"
43 #include "sync/notifier/object_id_invalidation_map.h"
44 #include "sync/protocol/proto_value_conversions.h"
45 #include "sync/protocol/sync.pb.h"
46 #include "sync/syncable/directory.h"
47 #include "sync/syncable/entry.h"
48 #include "sync/syncable/in_memory_directory_backing_store.h"
49 #include "sync/syncable/on_disk_directory_backing_store.h"
51 using base::TimeDelta;
52 using sync_pb::GetUpdatesCallerInfo;
56 using sessions::SyncSessionContext;
57 using syncable::ImmutableWriteTransactionInfo;
58 using syncable::SPECIFICS;
59 using syncable::UNIQUE_POSITION;
63 // Delays for syncer nudges.
64 static const int kDefaultNudgeDelayMilliseconds = 200;
65 static const int kPreferencesNudgeDelayMilliseconds = 2000;
66 static const int kSyncRefreshDelayMsec = 500;
67 static const int kSyncSchedulerDelayMsec = 250;
69 // Maximum count and size for traffic recorder.
70 static const unsigned int kMaxMessagesToRecord = 10;
71 static const unsigned int kMaxMessageSizeToRecord = 5 * 1024;
73 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
74 ConfigureReason reason) {
76 case CONFIGURE_REASON_RECONFIGURATION:
77 return GetUpdatesCallerInfo::RECONFIGURATION;
78 case CONFIGURE_REASON_MIGRATION:
79 return GetUpdatesCallerInfo::MIGRATION;
80 case CONFIGURE_REASON_NEW_CLIENT:
81 return GetUpdatesCallerInfo::NEW_CLIENT;
82 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
83 case CONFIGURE_REASON_CRYPTO:
84 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
88 return GetUpdatesCallerInfo::UNKNOWN;
93 // A class to calculate nudge delays for types.
96 static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type,
97 SyncManagerImpl* core) {
98 NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type);
99 return GetNudgeDelayTimeDeltaFromType(delay_type,
105 // Possible types of nudge delay for datatypes.
106 // Note: These are just hints. If a sync happens then all dirty entries
107 // would be committed as part of the sync.
108 enum NudgeDelayStrategy {
112 // Sync this change while syncing another change.
115 // The datatype does not use one of the predefined wait times but defines
116 // its own wait time logic for nudge.
120 static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) {
123 return ACCOMPANY_ONLY;
127 case FAVICON_TRACKING:
134 static TimeDelta GetNudgeDelayTimeDeltaFromType(
135 const NudgeDelayStrategy& delay_type, const ModelType& model_type,
136 const SyncManagerImpl* core) {
138 TimeDelta delay = TimeDelta::FromMilliseconds(
139 kDefaultNudgeDelayMilliseconds);
140 switch (delay_type) {
142 delay = TimeDelta::FromMilliseconds(
143 kDefaultNudgeDelayMilliseconds);
146 delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds);
149 switch (model_type) {
151 delay = TimeDelta::FromMilliseconds(
152 kPreferencesNudgeDelayMilliseconds);
156 case FAVICON_TRACKING:
157 delay = core->scheduler()->GetSessionsCommitDelay();
170 SyncManagerImpl::SyncManagerImpl(const std::string& name)
172 change_delegate_(NULL),
174 observing_network_connectivity_changes_(false),
175 invalidator_state_(DEFAULT_INVALIDATION_ERROR),
176 traffic_recorder_(kMaxMessagesToRecord, kMaxMessageSizeToRecord),
178 report_unrecoverable_error_function_(NULL),
179 weak_ptr_factory_(this) {
180 // Pre-fill |notification_info_map_|.
181 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
182 notification_info_map_.insert(
183 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
186 // Bind message handlers.
187 BindJsMessageHandler(
188 "getNotificationState",
189 &SyncManagerImpl::GetNotificationState);
190 BindJsMessageHandler(
191 "getNotificationInfo",
192 &SyncManagerImpl::GetNotificationInfo);
193 BindJsMessageHandler(
195 &SyncManagerImpl::GetAllNodes);
196 BindJsMessageHandler(
197 "getClientServerTraffic",
198 &SyncManagerImpl::GetClientServerTraffic);
201 SyncManagerImpl::~SyncManagerImpl() {
202 DCHECK(thread_checker_.CalledOnValidThread());
203 CHECK(!initialized_);
206 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
207 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
209 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
210 base::DictionaryValue* value = new base::DictionaryValue();
211 value->SetInteger("totalCount", total_count);
212 value->SetString("payload", payload);
216 bool SyncManagerImpl::VisiblePositionsDiffer(
217 const syncable::EntryKernelMutation& mutation) const {
218 const syncable::EntryKernel& a = mutation.original;
219 const syncable::EntryKernel& b = mutation.mutated;
220 if (!b.ShouldMaintainPosition())
222 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
224 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
229 bool SyncManagerImpl::VisiblePropertiesDiffer(
230 const syncable::EntryKernelMutation& mutation,
231 Cryptographer* cryptographer) const {
232 const syncable::EntryKernel& a = mutation.original;
233 const syncable::EntryKernel& b = mutation.mutated;
234 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
235 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
236 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
237 GetModelTypeFromSpecifics(b_specifics));
238 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
239 // Suppress updates to items that aren't tracked by any browser model.
240 if (model_type < FIRST_REAL_MODEL_TYPE ||
241 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
244 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
246 if (!AreSpecificsEqual(cryptographer,
247 a.ref(syncable::SPECIFICS),
248 b.ref(syncable::SPECIFICS))) {
251 // We only care if the name has changed if neither specifics is encrypted
252 // (encrypted nodes blow away the NON_UNIQUE_NAME).
253 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
254 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
256 if (VisiblePositionsDiffer(mutation))
261 void SyncManagerImpl::ThrowUnrecoverableError() {
262 DCHECK(thread_checker_.CalledOnValidThread());
263 ReadTransaction trans(FROM_HERE, GetUserShare());
264 trans.GetWrappedTrans()->OnUnrecoverableError(
265 FROM_HERE, "Simulating unrecoverable error for testing purposes.");
268 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
269 return directory()->InitialSyncEndedTypes();
272 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
273 ModelTypeSet types) {
275 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
276 sync_pb::DataTypeProgressMarker marker;
277 directory()->GetDownloadProgress(i.Get(), &marker);
279 if (marker.token().empty())
285 void SyncManagerImpl::ConfigureSyncer(
286 ConfigureReason reason,
287 ModelTypeSet to_download,
288 ModelTypeSet to_purge,
289 ModelTypeSet to_journal,
290 ModelTypeSet to_unapply,
291 const ModelSafeRoutingInfo& new_routing_info,
292 const base::Closure& ready_task,
293 const base::Closure& retry_task) {
294 DCHECK(thread_checker_.CalledOnValidThread());
295 DCHECK(!ready_task.is_null());
296 DCHECK(!retry_task.is_null());
298 DVLOG(1) << "Configuring -"
299 << "\n\t" << "current types: "
300 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
301 << "\n\t" << "types to download: "
302 << ModelTypeSetToString(to_download)
303 << "\n\t" << "types to purge: "
304 << ModelTypeSetToString(to_purge)
305 << "\n\t" << "types to journal: "
306 << ModelTypeSetToString(to_journal)
307 << "\n\t" << "types to unapply: "
308 << ModelTypeSetToString(to_unapply);
309 if (!PurgeDisabledTypes(to_purge,
312 // We failed to cleanup the types. Invoke the ready task without actually
313 // configuring any types. The caller should detect this as a configuration
314 // failure and act appropriately.
319 ConfigurationParams params(GetSourceFromReason(reason),
325 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
326 scheduler_->ScheduleConfiguration(params);
329 void SyncManagerImpl::Init(
330 const base::FilePath& database_location,
331 const WeakHandle<JsEventHandler>& event_handler,
332 const std::string& sync_server_and_path,
335 scoped_ptr<HttpPostProviderFactory> post_factory,
336 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
337 ExtensionsActivity* extensions_activity,
338 SyncManager::ChangeDelegate* change_delegate,
339 const SyncCredentials& credentials,
340 const std::string& invalidator_client_id,
341 const std::string& restored_key_for_bootstrapping,
342 const std::string& restored_keystore_key_for_bootstrapping,
343 InternalComponentsFactory* internal_components_factory,
344 Encryptor* encryptor,
345 scoped_ptr<UnrecoverableErrorHandler> unrecoverable_error_handler,
346 ReportUnrecoverableErrorFunction report_unrecoverable_error_function,
347 CancelationSignal* cancelation_signal) {
348 CHECK(!initialized_);
349 DCHECK(thread_checker_.CalledOnValidThread());
350 DCHECK(post_factory.get());
351 DCHECK(!credentials.email.empty());
352 DCHECK(!credentials.sync_token.empty());
353 DCHECK(cancelation_signal);
354 DVLOG(1) << "SyncManager starting Init...";
356 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
358 change_delegate_ = change_delegate;
360 AddObserver(&js_sync_manager_observer_);
361 SetJsEventHandler(event_handler);
363 AddObserver(&debug_info_event_listener_);
365 database_path_ = database_location.Append(
366 syncable::Directory::kSyncDatabaseFilename);
367 encryptor_ = encryptor;
368 unrecoverable_error_handler_ = unrecoverable_error_handler.Pass();
369 report_unrecoverable_error_function_ = report_unrecoverable_error_function;
371 allstatus_.SetHasKeystoreKey(
372 !restored_keystore_key_for_bootstrapping.empty());
373 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
376 restored_key_for_bootstrapping,
377 restored_keystore_key_for_bootstrapping));
378 sync_encryption_handler_->AddObserver(this);
379 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
380 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
382 base::FilePath absolute_db_path = database_path_;
383 DCHECK(absolute_db_path.IsAbsolute());
385 scoped_ptr<syncable::DirectoryBackingStore> backing_store =
386 internal_components_factory->BuildDirectoryBackingStore(
387 credentials.email, absolute_db_path).Pass();
389 DCHECK(backing_store.get());
390 const std::string& username = credentials.email;
391 share_.directory.reset(
392 new syncable::Directory(
393 backing_store.release(),
394 unrecoverable_error_handler_.get(),
395 report_unrecoverable_error_function_,
396 sync_encryption_handler_.get(),
397 sync_encryption_handler_->GetCryptographerUnsafe()));
399 DVLOG(1) << "Username: " << username;
400 if (!OpenDirectory(username)) {
401 NotifyInitializationFailure();
402 LOG(ERROR) << "Sync manager initialization failed!";
406 connection_manager_.reset(new SyncAPIServerConnectionManager(
407 sync_server_and_path, port, use_ssl,
408 post_factory.release(), cancelation_signal));
409 connection_manager_->set_client_id(directory()->cache_guid());
410 connection_manager_->AddListener(this);
412 std::string sync_id = directory()->cache_guid();
414 DVLOG(1) << "Setting sync client ID: " << sync_id;
415 allstatus_.SetSyncId(sync_id);
416 DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id;
417 allstatus_.SetInvalidatorClientId(invalidator_client_id);
419 model_type_registry_.reset(new ModelTypeRegistry(workers, directory()));
421 // Build a SyncSessionContext and store the worker in it.
422 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
423 std::vector<SyncEngineEventListener*> listeners;
424 listeners.push_back(&allstatus_);
425 listeners.push_back(this);
426 session_context_ = internal_components_factory->BuildContext(
427 connection_manager_.get(),
431 &debug_info_event_listener_,
433 model_type_registry_.get(),
434 invalidator_client_id).Pass();
435 session_context_->set_account_name(credentials.email);
436 scheduler_ = internal_components_factory->BuildScheduler(
437 name_, session_context_.get(), cancelation_signal).Pass();
439 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
443 net::NetworkChangeNotifier::AddIPAddressObserver(this);
444 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
445 observing_network_connectivity_changes_ = true;
447 UpdateCredentials(credentials);
449 NotifyInitializationSuccess();
452 void SyncManagerImpl::NotifyInitializationSuccess() {
453 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
454 OnInitializationComplete(
455 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
456 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
457 true, InitialSyncEndedTypes()));
460 void SyncManagerImpl::NotifyInitializationFailure() {
461 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
462 OnInitializationComplete(
463 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
464 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
465 false, ModelTypeSet()));
468 void SyncManagerImpl::OnPassphraseRequired(
469 PassphraseRequiredReason reason,
470 const sync_pb::EncryptedData& pending_keys) {
474 void SyncManagerImpl::OnPassphraseAccepted() {
478 void SyncManagerImpl::OnBootstrapTokenUpdated(
479 const std::string& bootstrap_token,
480 BootstrapTokenType type) {
481 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
482 allstatus_.SetHasKeystoreKey(true);
485 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
486 bool encrypt_everything) {
487 allstatus_.SetEncryptedTypes(encrypted_types);
490 void SyncManagerImpl::OnEncryptionComplete() {
494 void SyncManagerImpl::OnCryptographerStateChanged(
495 Cryptographer* cryptographer) {
496 allstatus_.SetCryptographerReady(cryptographer->is_ready());
497 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
498 allstatus_.SetKeystoreMigrationTime(
499 sync_encryption_handler_->migration_time());
502 void SyncManagerImpl::OnPassphraseTypeChanged(
504 base::Time explicit_passphrase_time) {
505 allstatus_.SetPassphraseType(type);
506 allstatus_.SetKeystoreMigrationTime(
507 sync_encryption_handler_->migration_time());
510 void SyncManagerImpl::StartSyncingNormally(
511 const ModelSafeRoutingInfo& routing_info) {
512 // Start the sync scheduler.
513 // TODO(sync): We always want the newest set of routes when we switch back
514 // to normal mode. Figure out how to enforce set_routing_info is always
515 // appropriately set and that it's only modified when switching to normal
517 DCHECK(thread_checker_.CalledOnValidThread());
518 session_context_->SetRoutingInfo(routing_info);
519 scheduler_->Start(SyncScheduler::NORMAL_MODE);
522 syncable::Directory* SyncManagerImpl::directory() {
523 return share_.directory.get();
526 const SyncScheduler* SyncManagerImpl::scheduler() const {
527 return scheduler_.get();
530 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
531 return connection_manager_->HasInvalidAuthToken();
534 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
535 DCHECK(!initialized_) << "Should only happen once";
537 // Set before Open().
538 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
539 WeakHandle<syncable::TransactionObserver> transaction_observer(
540 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
542 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
543 open_result = directory()->Open(username, this, transaction_observer);
544 if (open_result != syncable::OPENED) {
545 LOG(ERROR) << "Could not open share for:" << username;
549 // Unapplied datatypes (those that do not have initial sync ended set) get
550 // re-downloaded during any configuration. But, it's possible for a datatype
551 // to have a progress marker but not have initial sync ended yet, making
552 // it a candidate for migration. This is a problem, as the DataTypeManager
553 // does not support a migration while it's already in the middle of a
554 // configuration. As a result, any partially synced datatype can stall the
555 // DTM, waiting for the configuration to complete, which it never will due
556 // to the migration error. In addition, a partially synced nigori will
557 // trigger the migration logic before the backend is initialized, resulting
558 // in crashes. We therefore detect and purge any partially synced types as
559 // part of initialization.
560 if (!PurgePartiallySyncedTypes())
566 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
567 ModelTypeSet partially_synced_types = ModelTypeSet::All();
568 partially_synced_types.RemoveAll(InitialSyncEndedTypes());
569 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
570 ModelTypeSet::All()));
572 DVLOG(1) << "Purging partially synced types "
573 << ModelTypeSetToString(partially_synced_types);
574 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
575 partially_synced_types.Size());
576 if (partially_synced_types.Empty())
578 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
583 bool SyncManagerImpl::PurgeDisabledTypes(
584 ModelTypeSet to_purge,
585 ModelTypeSet to_journal,
586 ModelTypeSet to_unapply) {
587 if (to_purge.Empty())
589 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
590 DCHECK(to_purge.HasAll(to_journal));
591 DCHECK(to_purge.HasAll(to_unapply));
592 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
595 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
596 DCHECK(thread_checker_.CalledOnValidThread());
597 DCHECK(initialized_);
598 DCHECK(!credentials.email.empty());
599 DCHECK(!credentials.sync_token.empty());
601 observing_network_connectivity_changes_ = true;
602 if (!connection_manager_->SetAuthToken(credentials.sync_token))
603 return; // Auth token is known to be invalid, so exit early.
605 scheduler_->OnCredentialsUpdated();
607 // TODO(zea): pass the credential age to the debug info event listener.
610 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
611 DCHECK(thread_checker_.CalledOnValidThread());
612 observers_.AddObserver(observer);
615 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
616 DCHECK(thread_checker_.CalledOnValidThread());
617 observers_.RemoveObserver(observer);
620 void SyncManagerImpl::ShutdownOnSyncThread() {
621 DCHECK(thread_checker_.CalledOnValidThread());
623 // Prevent any in-flight method calls from running. Also
624 // invalidates |weak_handle_this_| and |change_observer_|.
625 weak_ptr_factory_.InvalidateWeakPtrs();
626 js_mutation_event_observer_.InvalidateWeakPtrs();
629 session_context_.reset();
630 model_type_registry_.reset();
632 if (sync_encryption_handler_) {
633 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
634 sync_encryption_handler_->RemoveObserver(this);
637 SetJsEventHandler(WeakHandle<JsEventHandler>());
638 RemoveObserver(&js_sync_manager_observer_);
640 RemoveObserver(&debug_info_event_listener_);
642 // |connection_manager_| may end up being NULL here in tests (in synchronous
643 // initialization mode).
645 // TODO(akalin): Fix this behavior.
646 if (connection_manager_)
647 connection_manager_->RemoveListener(this);
648 connection_manager_.reset();
650 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
651 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
652 observing_network_connectivity_changes_ = false;
654 if (initialized_ && directory()) {
655 directory()->SaveChanges();
658 share_.directory.reset();
660 change_delegate_ = NULL;
662 initialized_ = false;
664 // We reset these here, since only now we know they will not be
665 // accessed from other threads (since we shut down everything).
666 change_observer_.Reset();
667 weak_handle_this_.Reset();
670 void SyncManagerImpl::OnIPAddressChanged() {
671 if (!observing_network_connectivity_changes_) {
672 DVLOG(1) << "IP address change dropped.";
675 DVLOG(1) << "IP address change detected.";
676 OnNetworkConnectivityChangedImpl();
679 void SyncManagerImpl::OnConnectionTypeChanged(
680 net::NetworkChangeNotifier::ConnectionType) {
681 if (!observing_network_connectivity_changes_) {
682 DVLOG(1) << "Connection type change dropped.";
685 DVLOG(1) << "Connection type change detected.";
686 OnNetworkConnectivityChangedImpl();
689 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
690 DCHECK(thread_checker_.CalledOnValidThread());
691 scheduler_->OnConnectionStatusChange();
694 void SyncManagerImpl::OnServerConnectionEvent(
695 const ServerConnectionEvent& event) {
696 DCHECK(thread_checker_.CalledOnValidThread());
697 if (event.connection_code ==
698 HttpResponse::SERVER_CONNECTION_OK) {
699 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
700 OnConnectionStatusChange(CONNECTION_OK));
703 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
704 observing_network_connectivity_changes_ = false;
705 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
706 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
709 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
710 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
711 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
715 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
716 ModelTypeSet models_with_changes) {
717 // This notification happens immediately after the transaction mutex is
718 // released. This allows work to be performed without blocking other threads
719 // from acquiring a transaction.
720 if (!change_delegate_)
724 for (ModelTypeSet::Iterator it = models_with_changes.First();
725 it.Good(); it.Inc()) {
726 change_delegate_->OnChangesComplete(it.Get());
727 change_observer_.Call(
729 &SyncManager::ChangeObserver::OnChangesComplete,
735 SyncManagerImpl::HandleTransactionEndingChangeEvent(
736 const ImmutableWriteTransactionInfo& write_transaction_info,
737 syncable::BaseTransaction* trans) {
738 // This notification happens immediately before a syncable WriteTransaction
739 // falls out of scope. It happens while the channel mutex is still held,
740 // and while the transaction mutex is held, so it cannot be re-entrant.
741 if (!change_delegate_ || change_records_.empty())
742 return ModelTypeSet();
744 // This will continue the WriteTransaction using a read only wrapper.
745 // This is the last chance for read to occur in the WriteTransaction
746 // that's closing. This special ReadTransaction will not close the
747 // underlying transaction.
748 ReadTransaction read_trans(GetUserShare(), trans);
750 ModelTypeSet models_with_changes;
751 for (ChangeRecordMap::const_iterator it = change_records_.begin();
752 it != change_records_.end(); ++it) {
753 DCHECK(!it->second.Get().empty());
754 ModelType type = ModelTypeFromInt(it->first);
756 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
757 &read_trans, it->second);
758 change_observer_.Call(FROM_HERE,
759 &SyncManager::ChangeObserver::OnChangesApplied,
760 type, write_transaction_info.Get().id, it->second);
761 models_with_changes.Put(type);
763 change_records_.clear();
764 return models_with_changes;
767 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
768 const ImmutableWriteTransactionInfo& write_transaction_info,
769 syncable::BaseTransaction* trans,
770 std::vector<int64>* entries_changed) {
771 // We have been notified about a user action changing a sync model.
772 LOG_IF(WARNING, !change_records_.empty()) <<
773 "CALCULATE_CHANGES called with unapplied old changes.";
775 // The mutated model type, or UNSPECIFIED if nothing was mutated.
776 ModelTypeSet mutated_model_types;
778 const syncable::ImmutableEntryKernelMutationMap& mutations =
779 write_transaction_info.Get().mutations;
780 for (syncable::EntryKernelMutationMap::const_iterator it =
781 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
782 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
786 ModelType model_type =
787 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
788 if (model_type < FIRST_REAL_MODEL_TYPE) {
789 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
793 // Found real mutation.
794 if (model_type != UNSPECIFIED) {
795 mutated_model_types.Put(model_type);
796 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
800 // Nudge if necessary.
801 if (!mutated_model_types.Empty()) {
802 if (weak_handle_this_.IsInitialized()) {
803 weak_handle_this_.Call(FROM_HERE,
804 &SyncManagerImpl::RequestNudgeForDataTypes,
806 mutated_model_types);
813 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
814 ModelType type, ChangeReorderBuffer* buffer,
815 Cryptographer* cryptographer, const syncable::EntryKernel& original,
816 bool existed_before, bool exists_now) {
817 // If this is a deletion and the datatype was encrypted, we need to decrypt it
818 // and attach it to the buffer.
819 if (!exists_now && existed_before) {
820 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
821 if (type == PASSWORDS) {
822 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
823 scoped_ptr<sync_pb::PasswordSpecificsData> data(
824 DecryptPasswordSpecifics(original_specifics, cryptographer));
829 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
830 } else if (original_specifics.has_encrypted()) {
831 // All other datatypes can just create a new unencrypted specifics and
833 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
834 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
839 buffer->SetSpecificsForId(id, original_specifics);
843 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
844 const ImmutableWriteTransactionInfo& write_transaction_info,
845 syncable::BaseTransaction* trans,
846 std::vector<int64>* entries_changed) {
847 // We only expect one notification per sync step, so change_buffers_ should
848 // contain no pending entries.
849 LOG_IF(WARNING, !change_records_.empty()) <<
850 "CALCULATE_CHANGES called with unapplied old changes.";
852 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
854 Cryptographer* crypto = directory()->GetCryptographer(trans);
855 const syncable::ImmutableEntryKernelMutationMap& mutations =
856 write_transaction_info.Get().mutations;
857 for (syncable::EntryKernelMutationMap::const_iterator it =
858 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
859 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
860 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
862 // Omit items that aren't associated with a model.
864 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
865 if (type < FIRST_REAL_MODEL_TYPE)
868 int64 handle = it->first;
869 if (exists_now && !existed_before)
870 change_buffers[type].PushAddedItem(handle);
871 else if (!exists_now && existed_before)
872 change_buffers[type].PushDeletedItem(handle);
873 else if (exists_now && existed_before &&
874 VisiblePropertiesDiffer(it->second, crypto)) {
875 change_buffers[type].PushUpdatedItem(handle);
878 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
879 it->second.original, existed_before, exists_now);
882 ReadTransaction read_trans(GetUserShare(), trans);
883 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
884 if (!change_buffers[i].IsEmpty()) {
885 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
886 &(change_records_[i]))) {
887 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
888 entries_changed->push_back((change_records_[i].Get())[j].id);
890 if (change_records_[i].Get().empty())
891 change_records_.erase(i);
896 TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta(
897 const ModelType& model_type) {
898 return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this);
901 void SyncManagerImpl::RequestNudgeForDataTypes(
902 const tracked_objects::Location& nudge_location,
903 ModelTypeSet types) {
904 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
906 // TODO(lipalani) : Calculate the nudge delay based on all types.
907 base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta(
910 allstatus_.IncrementNudgeCounter(NUDGE_SOURCE_LOCAL);
911 scheduler_->ScheduleLocalNudge(nudge_delay,
916 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
917 DCHECK(thread_checker_.CalledOnValidThread());
918 // Only send an event if this is due to a cycle ending and this cycle
919 // concludes a canonical "sync" process; that is, based on what is known
920 // locally we are "all happy" and up-to-date. There may be new changes on
921 // the server, but we'll get them on a subsequent sync.
923 // Notifications are sent at the end of every sync cycle, regardless of
924 // whether we should sync again.
925 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
927 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
932 DVLOG(1) << "Sending OnSyncCycleCompleted";
933 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
934 OnSyncCycleCompleted(event.snapshot));
938 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
940 SyncManager::Observer, observers_,
941 OnActionableError(error));
944 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
946 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
948 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
950 SyncManager::Observer, observers_,
951 OnMigrationRequested(types));
954 void SyncManagerImpl::SetJsEventHandler(
955 const WeakHandle<JsEventHandler>& event_handler) {
956 js_event_handler_ = event_handler;
957 js_sync_manager_observer_.SetJsEventHandler(js_event_handler_);
958 js_mutation_event_observer_.SetJsEventHandler(js_event_handler_);
959 js_sync_encryption_handler_observer_.SetJsEventHandler(js_event_handler_);
962 void SyncManagerImpl::ProcessJsMessage(
963 const std::string& name, const JsArgList& args,
964 const WeakHandle<JsReplyHandler>& reply_handler) {
970 if (!reply_handler.IsInitialized()) {
971 DVLOG(1) << "Uninitialized reply handler; dropping unknown message "
972 << name << " with args " << args.ToString();
976 JsMessageHandler js_message_handler = js_message_handlers_[name];
977 if (js_message_handler.is_null()) {
978 DVLOG(1) << "Dropping unknown message " << name
979 << " with args " << args.ToString();
983 reply_handler.Call(FROM_HERE,
984 &JsReplyHandler::HandleJsReply,
985 name, js_message_handler.Run(args));
988 void SyncManagerImpl::BindJsMessageHandler(
989 const std::string& name,
990 UnboundJsMessageHandler unbound_message_handler) {
991 js_message_handlers_[name] =
992 base::Bind(unbound_message_handler, base::Unretained(this));
995 base::DictionaryValue* SyncManagerImpl::NotificationInfoToValue(
996 const NotificationInfoMap& notification_info) {
997 base::DictionaryValue* value = new base::DictionaryValue();
999 for (NotificationInfoMap::const_iterator it = notification_info.begin();
1000 it != notification_info.end(); ++it) {
1001 const std::string model_type_str = ModelTypeToString(it->first);
1002 value->Set(model_type_str, it->second.ToValue());
1008 std::string SyncManagerImpl::NotificationInfoToString(
1009 const NotificationInfoMap& notification_info) {
1010 scoped_ptr<base::DictionaryValue> value(
1011 NotificationInfoToValue(notification_info));
1013 base::JSONWriter::Write(value.get(), &str);
1017 JsArgList SyncManagerImpl::GetNotificationState(
1018 const JsArgList& args) {
1019 const std::string& notification_state =
1020 InvalidatorStateToString(invalidator_state_);
1021 DVLOG(1) << "GetNotificationState: " << notification_state;
1022 base::ListValue return_args;
1023 return_args.Append(new base::StringValue(notification_state));
1024 return JsArgList(&return_args);
1027 JsArgList SyncManagerImpl::GetNotificationInfo(
1028 const JsArgList& args) {
1029 DVLOG(1) << "GetNotificationInfo: "
1030 << NotificationInfoToString(notification_info_map_);
1031 base::ListValue return_args;
1032 return_args.Append(NotificationInfoToValue(notification_info_map_));
1033 return JsArgList(&return_args);
1036 JsArgList SyncManagerImpl::GetClientServerTraffic(
1037 const JsArgList& args) {
1038 base::ListValue return_args;
1039 base::ListValue* value = traffic_recorder_.ToValue();
1041 return_args.Append(value);
1042 return JsArgList(&return_args);
1045 JsArgList SyncManagerImpl::GetAllNodes(const JsArgList& args) {
1046 ReadTransaction trans(FROM_HERE, GetUserShare());
1047 base::ListValue return_args;
1048 scoped_ptr<base::ListValue> nodes(
1049 trans.GetDirectory()->GetAllNodeDetails(trans.GetWrappedTrans()));
1050 return_args.Append(nodes.release());
1051 return JsArgList(&return_args);
1054 void SyncManagerImpl::UpdateNotificationInfo(
1055 const ObjectIdInvalidationMap& invalidation_map) {
1056 ObjectIdSet ids = invalidation_map.GetObjectIds();
1057 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
1058 ModelType type = UNSPECIFIED;
1059 if (!ObjectIdToRealModelType(*it, &type)) {
1062 const SingleObjectInvalidationSet& type_invalidations =
1063 invalidation_map.ForObject(*it);
1064 for (SingleObjectInvalidationSet::const_iterator inv_it =
1065 type_invalidations.begin(); inv_it != type_invalidations.end();
1067 NotificationInfo* info = ¬ification_info_map_[type];
1068 info->total_count++;
1069 std::string payload =
1070 inv_it->is_unknown_version() ? "UNKNOWN" : inv_it->payload();
1071 info->payload = payload;
1076 void SyncManagerImpl::OnInvalidatorStateChange(InvalidatorState state) {
1077 DCHECK(thread_checker_.CalledOnValidThread());
1079 const std::string& state_str = InvalidatorStateToString(state);
1080 invalidator_state_ = state;
1081 DVLOG(1) << "Invalidator state changed to: " << state_str;
1082 const bool notifications_enabled =
1083 (invalidator_state_ == INVALIDATIONS_ENABLED);
1084 allstatus_.SetNotificationsEnabled(notifications_enabled);
1085 scheduler_->SetNotificationsEnabled(notifications_enabled);
1087 if (js_event_handler_.IsInitialized()) {
1088 base::DictionaryValue details;
1089 details.SetString("state", state_str);
1090 js_event_handler_.Call(FROM_HERE,
1091 &JsEventHandler::HandleJsEvent,
1092 "onNotificationStateChange",
1093 JsEventDetails(&details));
1097 void SyncManagerImpl::OnIncomingInvalidation(
1098 const ObjectIdInvalidationMap& invalidation_map) {
1099 DCHECK(thread_checker_.CalledOnValidThread());
1101 // We should never receive IDs from non-sync objects.
1102 ObjectIdSet ids = invalidation_map.GetObjectIds();
1103 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
1105 if (!ObjectIdToRealModelType(*it, &type)) {
1106 DLOG(WARNING) << "Notification has invalid id: " << ObjectIdToString(*it);
1110 if (invalidation_map.Empty()) {
1111 LOG(WARNING) << "Sync received invalidation without any type information.";
1113 allstatus_.IncrementNudgeCounter(NUDGE_SOURCE_NOTIFICATION);
1114 scheduler_->ScheduleInvalidationNudge(
1115 TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec),
1116 invalidation_map, FROM_HERE);
1117 allstatus_.IncrementNotificationsReceived();
1118 UpdateNotificationInfo(invalidation_map);
1119 debug_info_event_listener_.OnIncomingNotification(invalidation_map);
1122 if (js_event_handler_.IsInitialized()) {
1123 base::DictionaryValue details;
1124 base::ListValue* changed_types = new base::ListValue();
1125 details.Set("changedTypes", changed_types);
1127 ObjectIdSet id_set = invalidation_map.GetObjectIds();
1128 ModelTypeSet nudged_types = ObjectIdSetToModelTypeSet(id_set);
1129 DCHECK(!nudged_types.Empty());
1130 for (ModelTypeSet::Iterator it = nudged_types.First();
1131 it.Good(); it.Inc()) {
1132 const std::string model_type_str = ModelTypeToString(it.Get());
1133 changed_types->Append(new base::StringValue(model_type_str));
1135 details.SetString("source", "REMOTE_INVALIDATION");
1136 js_event_handler_.Call(FROM_HERE,
1137 &JsEventHandler::HandleJsEvent,
1138 "onIncomingNotification",
1139 JsEventDetails(&details));
1143 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
1144 DCHECK(thread_checker_.CalledOnValidThread());
1145 if (types.Empty()) {
1146 LOG(WARNING) << "Sync received refresh request with no types specified.";
1148 allstatus_.IncrementNudgeCounter(NUDGE_SOURCE_LOCAL_REFRESH);
1149 scheduler_->ScheduleLocalRefreshRequest(
1150 TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec),
1154 if (js_event_handler_.IsInitialized()) {
1155 base::DictionaryValue details;
1156 base::ListValue* changed_types = new base::ListValue();
1157 details.Set("changedTypes", changed_types);
1158 for (ModelTypeSet::Iterator it = types.First(); it.Good(); it.Inc()) {
1159 const std::string& model_type_str =
1160 ModelTypeToString(it.Get());
1161 changed_types->Append(new base::StringValue(model_type_str));
1163 details.SetString("source", "LOCAL_INVALIDATION");
1164 js_event_handler_.Call(FROM_HERE,
1165 &JsEventHandler::HandleJsEvent,
1166 "onIncomingNotification",
1167 JsEventDetails(&details));
1171 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
1172 return allstatus_.status();
1175 void SyncManagerImpl::SaveChanges() {
1176 directory()->SaveChanges();
1179 UserShare* SyncManagerImpl::GetUserShare() {
1180 DCHECK(initialized_);
1184 const std::string SyncManagerImpl::cache_guid() {
1185 DCHECK(initialized_);
1186 return directory()->cache_guid();
1189 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
1190 ReadTransaction trans(FROM_HERE, GetUserShare());
1191 ReadNode nigori_node(&trans);
1192 if (nigori_node.InitByTagLookup(kNigoriTag) != BaseNode::INIT_OK) {
1193 DVLOG(1) << "Couldn't find Nigori node.";
1196 bool found_experiment = false;
1198 ReadNode favicon_sync_node(&trans);
1199 if (favicon_sync_node.InitByClientTagLookup(
1200 syncer::EXPERIMENTS,
1201 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
1202 experiments->favicon_sync_limit =
1203 favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
1204 favicon_sync_limit();
1205 found_experiment = true;
1208 ReadNode pre_commit_update_avoidance_node(&trans);
1209 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
1210 syncer::EXPERIMENTS,
1211 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
1212 session_context_->set_server_enabled_pre_commit_update_avoidance(
1213 pre_commit_update_avoidance_node.GetExperimentsSpecifics().
1214 pre_commit_update_avoidance().enabled());
1215 // We don't bother setting found_experiment. The frontend doesn't need to
1219 ReadNode gcm_channel_node(&trans);
1220 if (gcm_channel_node.InitByClientTagLookup(
1221 syncer::EXPERIMENTS,
1222 syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
1223 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
1224 experiments->gcm_channel_state =
1225 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
1226 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
1227 found_experiment = true;
1230 ReadNode enhanced_bookmarks_node(&trans);
1231 if (enhanced_bookmarks_node.InitByClientTagLookup(
1232 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
1233 BaseNode::INIT_OK &&
1234 enhanced_bookmarks_node.GetExperimentsSpecifics()
1235 .has_enhanced_bookmarks()) {
1236 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
1237 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
1238 if (enhanced_bookmarks.has_enabled())
1239 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
1240 if (enhanced_bookmarks.has_extension_id()) {
1241 experiments->enhanced_bookmarks_ext_id =
1242 enhanced_bookmarks.extension_id();
1244 found_experiment = true;
1247 return found_experiment;
1250 bool SyncManagerImpl::HasUnsyncedItems() {
1251 ReadTransaction trans(FROM_HERE, GetUserShare());
1252 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1255 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1256 return sync_encryption_handler_.get();
1260 int SyncManagerImpl::GetDefaultNudgeDelay() {
1261 return kDefaultNudgeDelayMilliseconds;
1265 int SyncManagerImpl::GetPreferencesNudgeDelay() {
1266 return kPreferencesNudgeDelayMilliseconds;
1269 } // namespace syncer