Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / sync / internal_api / sync_manager_impl.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/sync_manager_impl.h"
6
7 #include <string>
8
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
49
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
52
53 class GURL;
54
55 namespace syncer {
56
57 using sessions::SyncSessionContext;
58 using syncable::ImmutableWriteTransactionInfo;
59 using syncable::SPECIFICS;
60 using syncable::UNIQUE_POSITION;
61
62 namespace {
63
64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
65     ConfigureReason reason) {
66   switch (reason) {
67     case CONFIGURE_REASON_RECONFIGURATION:
68       return GetUpdatesCallerInfo::RECONFIGURATION;
69     case CONFIGURE_REASON_MIGRATION:
70       return GetUpdatesCallerInfo::MIGRATION;
71     case CONFIGURE_REASON_NEW_CLIENT:
72       return GetUpdatesCallerInfo::NEW_CLIENT;
73     case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
74     case CONFIGURE_REASON_CRYPTO:
75       return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
76     case CONFIGURE_REASON_PROGRAMMATIC:
77       return GetUpdatesCallerInfo::PROGRAMMATIC;
78     default:
79       NOTREACHED();
80   }
81   return GetUpdatesCallerInfo::UNKNOWN;
82 }
83
84 }  // namespace
85
86 SyncManagerImpl::SyncManagerImpl(const std::string& name)
87     : name_(name),
88       change_delegate_(NULL),
89       initialized_(false),
90       observing_network_connectivity_changes_(false),
91       report_unrecoverable_error_function_(NULL),
92       weak_ptr_factory_(this) {
93   // Pre-fill |notification_info_map_|.
94   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
95     notification_info_map_.insert(
96         std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
97   }
98 }
99
100 SyncManagerImpl::~SyncManagerImpl() {
101   DCHECK(thread_checker_.CalledOnValidThread());
102   CHECK(!initialized_);
103 }
104
105 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
106 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
107
108 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
109   base::DictionaryValue* value = new base::DictionaryValue();
110   value->SetInteger("totalCount", total_count);
111   value->SetString("payload", payload);
112   return value;
113 }
114
115 bool SyncManagerImpl::VisiblePositionsDiffer(
116     const syncable::EntryKernelMutation& mutation) const {
117   const syncable::EntryKernel& a = mutation.original;
118   const syncable::EntryKernel& b = mutation.mutated;
119   if (!b.ShouldMaintainPosition())
120     return false;
121   if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
122     return true;
123   if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
124     return true;
125   return false;
126 }
127
128 bool SyncManagerImpl::VisiblePropertiesDiffer(
129     const syncable::EntryKernelMutation& mutation,
130     Cryptographer* cryptographer) const {
131   const syncable::EntryKernel& a = mutation.original;
132   const syncable::EntryKernel& b = mutation.mutated;
133   const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
134   const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
135   DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
136             GetModelTypeFromSpecifics(b_specifics));
137   ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
138   // Suppress updates to items that aren't tracked by any browser model.
139   if (model_type < FIRST_REAL_MODEL_TYPE ||
140       !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
141     return false;
142   }
143   if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
144     return true;
145   if (!AreSpecificsEqual(cryptographer,
146                          a.ref(syncable::SPECIFICS),
147                          b.ref(syncable::SPECIFICS))) {
148     return true;
149   }
150   if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
151                                   b.ref(syncable::ATTACHMENT_METADATA))) {
152     return true;
153   }
154   // We only care if the name has changed if neither specifics is encrypted
155   // (encrypted nodes blow away the NON_UNIQUE_NAME).
156   if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
157       a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
158     return true;
159   if (VisiblePositionsDiffer(mutation))
160     return true;
161   return false;
162 }
163
164 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
165   return directory()->InitialSyncEndedTypes();
166 }
167
168 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
169     ModelTypeSet types) {
170   ModelTypeSet result;
171   for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
172     sync_pb::DataTypeProgressMarker marker;
173     directory()->GetDownloadProgress(i.Get(), &marker);
174
175     if (marker.token().empty())
176       result.Put(i.Get());
177   }
178   return result;
179 }
180
181 void SyncManagerImpl::ConfigureSyncer(
182     ConfigureReason reason,
183     ModelTypeSet to_download,
184     ModelTypeSet to_purge,
185     ModelTypeSet to_journal,
186     ModelTypeSet to_unapply,
187     const ModelSafeRoutingInfo& new_routing_info,
188     const base::Closure& ready_task,
189     const base::Closure& retry_task) {
190   DCHECK(thread_checker_.CalledOnValidThread());
191   DCHECK(!ready_task.is_null());
192   DCHECK(!retry_task.is_null());
193   DCHECK(initialized_);
194
195   DVLOG(1) << "Configuring -"
196            << "\n\t" << "current types: "
197            << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
198            << "\n\t" << "types to download: "
199            << ModelTypeSetToString(to_download)
200            << "\n\t" << "types to purge: "
201            << ModelTypeSetToString(to_purge)
202            << "\n\t" << "types to journal: "
203            << ModelTypeSetToString(to_journal)
204            << "\n\t" << "types to unapply: "
205            << ModelTypeSetToString(to_unapply);
206   if (!PurgeDisabledTypes(to_purge,
207                           to_journal,
208                           to_unapply)) {
209     // We failed to cleanup the types. Invoke the ready task without actually
210     // configuring any types. The caller should detect this as a configuration
211     // failure and act appropriately.
212     ready_task.Run();
213     return;
214   }
215
216   ConfigurationParams params(GetSourceFromReason(reason),
217                              to_download,
218                              new_routing_info,
219                              ready_task,
220                              retry_task);
221
222   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
223   scheduler_->ScheduleConfiguration(params);
224 }
225
226 void SyncManagerImpl::Init(InitArgs* args) {
227   CHECK(!initialized_);
228   DCHECK(thread_checker_.CalledOnValidThread());
229   DCHECK(args->post_factory.get());
230   DCHECK(!args->credentials.email.empty());
231   DCHECK(!args->credentials.sync_token.empty());
232   DCHECK(!args->credentials.scope_set.empty());
233   DCHECK(args->cancelation_signal);
234   DVLOG(1) << "SyncManager starting Init...";
235
236   weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
237
238   change_delegate_ = args->change_delegate;
239
240   AddObserver(&js_sync_manager_observer_);
241   SetJsEventHandler(args->event_handler);
242
243   AddObserver(&debug_info_event_listener_);
244
245   database_path_ = args->database_location.Append(
246       syncable::Directory::kSyncDatabaseFilename);
247   unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
248   report_unrecoverable_error_function_ =
249       args->report_unrecoverable_error_function;
250
251   allstatus_.SetHasKeystoreKey(
252       !args->restored_keystore_key_for_bootstrapping.empty());
253   sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
254       &share_,
255       args->encryptor,
256       args->restored_key_for_bootstrapping,
257       args->restored_keystore_key_for_bootstrapping));
258   sync_encryption_handler_->AddObserver(this);
259   sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
260   sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
261
262   base::FilePath absolute_db_path = database_path_;
263   DCHECK(absolute_db_path.IsAbsolute());
264
265   scoped_ptr<syncable::DirectoryBackingStore> backing_store =
266       args->internal_components_factory->BuildDirectoryBackingStore(
267           InternalComponentsFactory::STORAGE_ON_DISK,
268           args->credentials.email, absolute_db_path).Pass();
269
270   DCHECK(backing_store.get());
271   share_.directory.reset(
272       new syncable::Directory(
273           backing_store.release(),
274           unrecoverable_error_handler_.get(),
275           report_unrecoverable_error_function_,
276           sync_encryption_handler_.get(),
277           sync_encryption_handler_->GetCryptographerUnsafe()));
278   share_.sync_credentials = args->credentials;
279
280   // UserShare is accessible to a lot of code that doesn't need access to the
281   // sync token so clear sync_token from the UserShare.
282   share_.sync_credentials.sync_token = "";
283
284   const std::string& username = args->credentials.email;
285   DVLOG(1) << "Username: " << username;
286   if (!OpenDirectory(username)) {
287     NotifyInitializationFailure();
288     LOG(ERROR) << "Sync manager initialization failed!";
289     return;
290   }
291
292   connection_manager_.reset(new SyncAPIServerConnectionManager(
293       args->service_url.host() + args->service_url.path(),
294       args->service_url.EffectiveIntPort(),
295       args->service_url.SchemeIsSecure(),
296       args->post_factory.release(),
297       args->cancelation_signal));
298   connection_manager_->set_client_id(directory()->cache_guid());
299   connection_manager_->AddListener(this);
300
301   std::string sync_id = directory()->cache_guid();
302
303   DVLOG(1) << "Setting sync client ID: " << sync_id;
304   allstatus_.SetSyncId(sync_id);
305   DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
306   allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
307
308   model_type_registry_.reset(
309       new ModelTypeRegistry(args->workers, directory(), this));
310   sync_encryption_handler_->AddObserver(model_type_registry_.get());
311
312   // Bind the SyncContext WeakPtr to this thread.  This helps us crash earlier
313   // if the pointer is misused in debug mode.
314   base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
315   weak_core.get();
316
317   sync_context_proxy_.reset(
318       new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
319
320   // Build a SyncSessionContext and store the worker in it.
321   DVLOG(1) << "Sync is bringing up SyncSessionContext.";
322   std::vector<SyncEngineEventListener*> listeners;
323   listeners.push_back(&allstatus_);
324   listeners.push_back(this);
325   session_context_ =
326       args->internal_components_factory->BuildContext(
327                                              connection_manager_.get(),
328                                              directory(),
329                                              args->extensions_activity,
330                                              listeners,
331                                              &debug_info_event_listener_,
332                                              model_type_registry_.get(),
333                                              args->invalidator_client_id)
334           .Pass();
335   session_context_->set_account_name(args->credentials.email);
336   scheduler_ = args->internal_components_factory->BuildScheduler(
337       name_, session_context_.get(), args->cancelation_signal).Pass();
338
339   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
340
341   initialized_ = true;
342
343   net::NetworkChangeNotifier::AddIPAddressObserver(this);
344   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
345   observing_network_connectivity_changes_ = true;
346
347   UpdateCredentials(args->credentials);
348
349   NotifyInitializationSuccess();
350 }
351
352 void SyncManagerImpl::NotifyInitializationSuccess() {
353   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
354                     OnInitializationComplete(
355                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
356                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
357                         true, InitialSyncEndedTypes()));
358 }
359
360 void SyncManagerImpl::NotifyInitializationFailure() {
361   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
362                     OnInitializationComplete(
363                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
364                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
365                         false, ModelTypeSet()));
366 }
367
368 void SyncManagerImpl::OnPassphraseRequired(
369     PassphraseRequiredReason reason,
370     const sync_pb::EncryptedData& pending_keys) {
371   // Does nothing.
372 }
373
374 void SyncManagerImpl::OnPassphraseAccepted() {
375   // Does nothing.
376 }
377
378 void SyncManagerImpl::OnBootstrapTokenUpdated(
379     const std::string& bootstrap_token,
380     BootstrapTokenType type) {
381   if (type == KEYSTORE_BOOTSTRAP_TOKEN)
382     allstatus_.SetHasKeystoreKey(true);
383 }
384
385 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
386                                               bool encrypt_everything) {
387   allstatus_.SetEncryptedTypes(encrypted_types);
388 }
389
390 void SyncManagerImpl::OnEncryptionComplete() {
391   // Does nothing.
392 }
393
394 void SyncManagerImpl::OnCryptographerStateChanged(
395     Cryptographer* cryptographer) {
396   allstatus_.SetCryptographerReady(cryptographer->is_ready());
397   allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
398   allstatus_.SetKeystoreMigrationTime(
399       sync_encryption_handler_->migration_time());
400 }
401
402 void SyncManagerImpl::OnPassphraseTypeChanged(
403     PassphraseType type,
404     base::Time explicit_passphrase_time) {
405   allstatus_.SetPassphraseType(type);
406   allstatus_.SetKeystoreMigrationTime(
407       sync_encryption_handler_->migration_time());
408 }
409
410 void SyncManagerImpl::StartSyncingNormally(
411     const ModelSafeRoutingInfo& routing_info) {
412   // Start the sync scheduler.
413   // TODO(sync): We always want the newest set of routes when we switch back
414   // to normal mode. Figure out how to enforce set_routing_info is always
415   // appropriately set and that it's only modified when switching to normal
416   // mode.
417   DCHECK(thread_checker_.CalledOnValidThread());
418   session_context_->SetRoutingInfo(routing_info);
419   scheduler_->Start(SyncScheduler::NORMAL_MODE);
420 }
421
422 syncable::Directory* SyncManagerImpl::directory() {
423   return share_.directory.get();
424 }
425
426 const SyncScheduler* SyncManagerImpl::scheduler() const {
427   return scheduler_.get();
428 }
429
430 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
431   return connection_manager_->HasInvalidAuthToken();
432 }
433
434 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
435   DCHECK(!initialized_) << "Should only happen once";
436
437   // Set before Open().
438   change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
439   WeakHandle<syncable::TransactionObserver> transaction_observer(
440       MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
441
442   syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
443   open_result = directory()->Open(username, this, transaction_observer);
444   if (open_result != syncable::OPENED) {
445     LOG(ERROR) << "Could not open share for:" << username;
446     return false;
447   }
448
449   // Unapplied datatypes (those that do not have initial sync ended set) get
450   // re-downloaded during any configuration. But, it's possible for a datatype
451   // to have a progress marker but not have initial sync ended yet, making
452   // it a candidate for migration. This is a problem, as the DataTypeManager
453   // does not support a migration while it's already in the middle of a
454   // configuration. As a result, any partially synced datatype can stall the
455   // DTM, waiting for the configuration to complete, which it never will due
456   // to the migration error. In addition, a partially synced nigori will
457   // trigger the migration logic before the backend is initialized, resulting
458   // in crashes. We therefore detect and purge any partially synced types as
459   // part of initialization.
460   if (!PurgePartiallySyncedTypes())
461     return false;
462
463   return true;
464 }
465
466 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
467   ModelTypeSet partially_synced_types = ModelTypeSet::All();
468   partially_synced_types.RemoveAll(InitialSyncEndedTypes());
469   partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
470       ModelTypeSet::All()));
471
472   DVLOG(1) << "Purging partially synced types "
473            << ModelTypeSetToString(partially_synced_types);
474   UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
475                        partially_synced_types.Size());
476   if (partially_synced_types.Empty())
477     return true;
478   return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
479                                              ModelTypeSet(),
480                                              ModelTypeSet());
481 }
482
483 bool SyncManagerImpl::PurgeDisabledTypes(
484     ModelTypeSet to_purge,
485     ModelTypeSet to_journal,
486     ModelTypeSet to_unapply) {
487   if (to_purge.Empty())
488     return true;
489   DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
490   DCHECK(to_purge.HasAll(to_journal));
491   DCHECK(to_purge.HasAll(to_unapply));
492   return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
493 }
494
495 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
496   DCHECK(thread_checker_.CalledOnValidThread());
497   DCHECK(initialized_);
498   DCHECK(!credentials.email.empty());
499   DCHECK(!credentials.sync_token.empty());
500   DCHECK(!credentials.scope_set.empty());
501
502   observing_network_connectivity_changes_ = true;
503   if (!connection_manager_->SetAuthToken(credentials.sync_token))
504     return;  // Auth token is known to be invalid, so exit early.
505
506   scheduler_->OnCredentialsUpdated();
507
508   // TODO(zea): pass the credential age to the debug info event listener.
509 }
510
511 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
512   DCHECK(thread_checker_.CalledOnValidThread());
513   observers_.AddObserver(observer);
514 }
515
516 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
517   DCHECK(thread_checker_.CalledOnValidThread());
518   observers_.RemoveObserver(observer);
519 }
520
521 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
522   DCHECK(thread_checker_.CalledOnValidThread());
523
524   // Prevent any in-flight method calls from running.  Also
525   // invalidates |weak_handle_this_| and |change_observer_|.
526   weak_ptr_factory_.InvalidateWeakPtrs();
527   js_mutation_event_observer_.InvalidateWeakPtrs();
528
529   scheduler_.reset();
530   session_context_.reset();
531
532   if (model_type_registry_)
533     sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
534
535   model_type_registry_.reset();
536
537   if (sync_encryption_handler_) {
538     sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
539     sync_encryption_handler_->RemoveObserver(this);
540   }
541
542   SetJsEventHandler(WeakHandle<JsEventHandler>());
543   RemoveObserver(&js_sync_manager_observer_);
544
545   RemoveObserver(&debug_info_event_listener_);
546
547   // |connection_manager_| may end up being NULL here in tests (in synchronous
548   // initialization mode).
549   //
550   // TODO(akalin): Fix this behavior.
551   if (connection_manager_)
552     connection_manager_->RemoveListener(this);
553   connection_manager_.reset();
554
555   net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
556   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
557   observing_network_connectivity_changes_ = false;
558
559   if (initialized_ && directory()) {
560     directory()->SaveChanges();
561   }
562
563   share_.directory.reset();
564
565   change_delegate_ = NULL;
566
567   initialized_ = false;
568
569   // We reset these here, since only now we know they will not be
570   // accessed from other threads (since we shut down everything).
571   change_observer_.Reset();
572   weak_handle_this_.Reset();
573 }
574
575 void SyncManagerImpl::OnIPAddressChanged() {
576   if (!observing_network_connectivity_changes_) {
577     DVLOG(1) << "IP address change dropped.";
578     return;
579   }
580   DVLOG(1) << "IP address change detected.";
581   OnNetworkConnectivityChangedImpl();
582 }
583
584 void SyncManagerImpl::OnConnectionTypeChanged(
585   net::NetworkChangeNotifier::ConnectionType) {
586   if (!observing_network_connectivity_changes_) {
587     DVLOG(1) << "Connection type change dropped.";
588     return;
589   }
590   DVLOG(1) << "Connection type change detected.";
591   OnNetworkConnectivityChangedImpl();
592 }
593
594 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
595   DCHECK(thread_checker_.CalledOnValidThread());
596   scheduler_->OnConnectionStatusChange();
597 }
598
599 void SyncManagerImpl::OnServerConnectionEvent(
600     const ServerConnectionEvent& event) {
601   DCHECK(thread_checker_.CalledOnValidThread());
602   if (event.connection_code ==
603       HttpResponse::SERVER_CONNECTION_OK) {
604     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
605                       OnConnectionStatusChange(CONNECTION_OK));
606   }
607
608   if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
609     observing_network_connectivity_changes_ = false;
610     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
611                       OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
612   }
613
614   if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
615     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
616                       OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
617   }
618 }
619
620 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
621     ModelTypeSet models_with_changes) {
622   // This notification happens immediately after the transaction mutex is
623   // released. This allows work to be performed without blocking other threads
624   // from acquiring a transaction.
625   if (!change_delegate_)
626     return;
627
628   // Call commit.
629   for (ModelTypeSet::Iterator it = models_with_changes.First();
630        it.Good(); it.Inc()) {
631     change_delegate_->OnChangesComplete(it.Get());
632     change_observer_.Call(
633         FROM_HERE,
634         &SyncManager::ChangeObserver::OnChangesComplete,
635         it.Get());
636   }
637 }
638
639 ModelTypeSet
640 SyncManagerImpl::HandleTransactionEndingChangeEvent(
641     const ImmutableWriteTransactionInfo& write_transaction_info,
642     syncable::BaseTransaction* trans) {
643   // This notification happens immediately before a syncable WriteTransaction
644   // falls out of scope. It happens while the channel mutex is still held,
645   // and while the transaction mutex is held, so it cannot be re-entrant.
646   if (!change_delegate_ || change_records_.empty())
647     return ModelTypeSet();
648
649   // This will continue the WriteTransaction using a read only wrapper.
650   // This is the last chance for read to occur in the WriteTransaction
651   // that's closing. This special ReadTransaction will not close the
652   // underlying transaction.
653   ReadTransaction read_trans(GetUserShare(), trans);
654
655   ModelTypeSet models_with_changes;
656   for (ChangeRecordMap::const_iterator it = change_records_.begin();
657       it != change_records_.end(); ++it) {
658     DCHECK(!it->second.Get().empty());
659     ModelType type = ModelTypeFromInt(it->first);
660     change_delegate_->
661         OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
662                          &read_trans, it->second);
663     change_observer_.Call(FROM_HERE,
664         &SyncManager::ChangeObserver::OnChangesApplied,
665         type, write_transaction_info.Get().id, it->second);
666     models_with_changes.Put(type);
667   }
668   change_records_.clear();
669   return models_with_changes;
670 }
671
672 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
673     const ImmutableWriteTransactionInfo& write_transaction_info,
674     syncable::BaseTransaction* trans,
675     std::vector<int64>* entries_changed) {
676   // We have been notified about a user action changing a sync model.
677   LOG_IF(WARNING, !change_records_.empty()) <<
678       "CALCULATE_CHANGES called with unapplied old changes.";
679
680   // The mutated model type, or UNSPECIFIED if nothing was mutated.
681   ModelTypeSet mutated_model_types;
682
683   const syncable::ImmutableEntryKernelMutationMap& mutations =
684       write_transaction_info.Get().mutations;
685   for (syncable::EntryKernelMutationMap::const_iterator it =
686            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
687     if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
688       continue;
689     }
690
691     ModelType model_type =
692         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
693     if (model_type < FIRST_REAL_MODEL_TYPE) {
694       NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
695       continue;
696     }
697
698     // Found real mutation.
699     if (model_type != UNSPECIFIED) {
700       mutated_model_types.Put(model_type);
701       entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
702     }
703   }
704
705   // Nudge if necessary.
706   if (!mutated_model_types.Empty()) {
707     if (weak_handle_this_.IsInitialized()) {
708       weak_handle_this_.Call(FROM_HERE,
709                              &SyncManagerImpl::RequestNudgeForDataTypes,
710                              FROM_HERE,
711                              mutated_model_types);
712     } else {
713       NOTREACHED();
714     }
715   }
716 }
717
718 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
719     ModelType type, ChangeReorderBuffer* buffer,
720     Cryptographer* cryptographer, const syncable::EntryKernel& original,
721     bool existed_before, bool exists_now) {
722   // If this is a deletion and the datatype was encrypted, we need to decrypt it
723   // and attach it to the buffer.
724   if (!exists_now && existed_before) {
725     sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
726     if (type == PASSWORDS) {
727       // Passwords must use their own legacy ExtraPasswordChangeRecordData.
728       scoped_ptr<sync_pb::PasswordSpecificsData> data(
729           DecryptPasswordSpecifics(original_specifics, cryptographer));
730       if (!data) {
731         NOTREACHED();
732         return;
733       }
734       buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
735     } else if (original_specifics.has_encrypted()) {
736       // All other datatypes can just create a new unencrypted specifics and
737       // attach it.
738       const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
739       if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
740         NOTREACHED();
741         return;
742       }
743     }
744     buffer->SetSpecificsForId(id, original_specifics);
745   }
746 }
747
748 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
749     const ImmutableWriteTransactionInfo& write_transaction_info,
750     syncable::BaseTransaction* trans,
751     std::vector<int64>* entries_changed) {
752   // We only expect one notification per sync step, so change_buffers_ should
753   // contain no pending entries.
754   LOG_IF(WARNING, !change_records_.empty()) <<
755       "CALCULATE_CHANGES called with unapplied old changes.";
756
757   ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
758
759   Cryptographer* crypto = directory()->GetCryptographer(trans);
760   const syncable::ImmutableEntryKernelMutationMap& mutations =
761       write_transaction_info.Get().mutations;
762   for (syncable::EntryKernelMutationMap::const_iterator it =
763            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
764     bool existed_before = !it->second.original.ref(syncable::IS_DEL);
765     bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
766
767     // Omit items that aren't associated with a model.
768     ModelType type =
769         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
770     if (type < FIRST_REAL_MODEL_TYPE)
771       continue;
772
773     int64 handle = it->first;
774     if (exists_now && !existed_before)
775       change_buffers[type].PushAddedItem(handle);
776     else if (!exists_now && existed_before)
777       change_buffers[type].PushDeletedItem(handle);
778     else if (exists_now && existed_before &&
779              VisiblePropertiesDiffer(it->second, crypto)) {
780       change_buffers[type].PushUpdatedItem(handle);
781     }
782
783     SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
784                              it->second.original, existed_before, exists_now);
785   }
786
787   ReadTransaction read_trans(GetUserShare(), trans);
788   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
789     if (!change_buffers[i].IsEmpty()) {
790       if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
791                                                      &(change_records_[i]))) {
792         for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
793           entries_changed->push_back((change_records_[i].Get())[j].id);
794       }
795       if (change_records_[i].Get().empty())
796         change_records_.erase(i);
797     }
798   }
799 }
800
801 void SyncManagerImpl::RequestNudgeForDataTypes(
802     const tracked_objects::Location& nudge_location,
803     ModelTypeSet types) {
804   debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
805
806   scheduler_->ScheduleLocalNudge(types, nudge_location);
807 }
808
809 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
810   DCHECK(thread_checker_.CalledOnValidThread());
811   scheduler_->ScheduleInitialSyncNudge(type);
812 }
813
814 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
815   DCHECK(thread_checker_.CalledOnValidThread());
816   RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
817 }
818
819 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
820   DCHECK(thread_checker_.CalledOnValidThread());
821   RefreshTypes(ModelTypeSet(type));
822 }
823
824 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
825   DCHECK(thread_checker_.CalledOnValidThread());
826   // Only send an event if this is due to a cycle ending and this cycle
827   // concludes a canonical "sync" process; that is, based on what is known
828   // locally we are "all happy" and up-to-date.  There may be new changes on
829   // the server, but we'll get them on a subsequent sync.
830   //
831   // Notifications are sent at the end of every sync cycle, regardless of
832   // whether we should sync again.
833   if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
834     if (!initialized_) {
835       DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
836                << "initialized";
837       return;
838     }
839
840     DVLOG(1) << "Sending OnSyncCycleCompleted";
841     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
842                       OnSyncCycleCompleted(event.snapshot));
843   }
844 }
845
846 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
847   FOR_EACH_OBSERVER(
848       SyncManager::Observer, observers_,
849       OnActionableError(error));
850 }
851
852 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
853
854 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
855
856 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
857   FOR_EACH_OBSERVER(
858       SyncManager::Observer, observers_,
859       OnMigrationRequested(types));
860 }
861
862 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
863   protocol_event_buffer_.RecordProtocolEvent(event);
864   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
865                     OnProtocolEvent(event));
866 }
867
868 void SyncManagerImpl::SetJsEventHandler(
869     const WeakHandle<JsEventHandler>& event_handler) {
870   js_sync_manager_observer_.SetJsEventHandler(event_handler);
871   js_mutation_event_observer_.SetJsEventHandler(event_handler);
872   js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
873 }
874
875 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
876     syncer::ModelType type) {
877   DirectoryTypeDebugInfoEmitterMap* emitter_map =
878       model_type_registry_->directory_type_debug_info_emitter_map();
879   DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
880
881   if (it == emitter_map->end()) {
882     // This can happen in some cases.  The UI thread makes requests of us
883     // when it doesn't really know which types are enabled or disabled.
884     DLOG(WARNING) << "Asked to return debug info for invalid type "
885                   << ModelTypeToString(type);
886     return scoped_ptr<base::ListValue>(new base::ListValue());
887   }
888
889   return it->second->GetAllNodes();
890 }
891
892 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
893   DCHECK(thread_checker_.CalledOnValidThread());
894
895   DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
896   allstatus_.SetNotificationsEnabled(invalidator_enabled);
897   scheduler_->SetNotificationsEnabled(invalidator_enabled);
898 }
899
900 void SyncManagerImpl::OnIncomingInvalidation(
901     syncer::ModelType type,
902     scoped_ptr<InvalidationInterface> invalidation) {
903   DCHECK(thread_checker_.CalledOnValidThread());
904
905   scheduler_->ScheduleInvalidationNudge(
906       type,
907       invalidation.Pass(),
908       FROM_HERE);
909 }
910
911 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
912   DCHECK(thread_checker_.CalledOnValidThread());
913   if (types.Empty()) {
914     LOG(WARNING) << "Sync received refresh request with no types specified.";
915   } else {
916     scheduler_->ScheduleLocalRefreshRequest(
917         types, FROM_HERE);
918   }
919 }
920
921 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
922   return allstatus_.status();
923 }
924
925 void SyncManagerImpl::SaveChanges() {
926   directory()->SaveChanges();
927 }
928
929 UserShare* SyncManagerImpl::GetUserShare() {
930   DCHECK(initialized_);
931   return &share_;
932 }
933
934 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
935   DCHECK(initialized_);
936   return sync_context_proxy_.get();
937 }
938
939 const std::string SyncManagerImpl::cache_guid() {
940   DCHECK(initialized_);
941   return directory()->cache_guid();
942 }
943
944 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
945   ReadTransaction trans(FROM_HERE, GetUserShare());
946   ReadNode nigori_node(&trans);
947   if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
948     DVLOG(1) << "Couldn't find Nigori node.";
949     return false;
950   }
951   bool found_experiment = false;
952
953   ReadNode favicon_sync_node(&trans);
954   if (favicon_sync_node.InitByClientTagLookup(
955           syncer::EXPERIMENTS,
956           syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
957     experiments->favicon_sync_limit =
958         favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
959             favicon_sync_limit();
960     found_experiment = true;
961   }
962
963   ReadNode pre_commit_update_avoidance_node(&trans);
964   if (pre_commit_update_avoidance_node.InitByClientTagLookup(
965           syncer::EXPERIMENTS,
966           syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
967     session_context_->set_server_enabled_pre_commit_update_avoidance(
968         pre_commit_update_avoidance_node.GetExperimentsSpecifics().
969             pre_commit_update_avoidance().enabled());
970     // We don't bother setting found_experiment.  The frontend doesn't need to
971     // know about this.
972   }
973
974   ReadNode gcm_channel_node(&trans);
975   if (gcm_channel_node.InitByClientTagLookup(
976           syncer::EXPERIMENTS,
977           syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
978       gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
979     experiments->gcm_channel_state =
980         (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
981          syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
982     found_experiment = true;
983   }
984
985   ReadNode enhanced_bookmarks_node(&trans);
986   if (enhanced_bookmarks_node.InitByClientTagLookup(
987           syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
988           BaseNode::INIT_OK &&
989       enhanced_bookmarks_node.GetExperimentsSpecifics()
990           .has_enhanced_bookmarks()) {
991     const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
992         enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
993     if (enhanced_bookmarks.has_enabled())
994       experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
995     if (enhanced_bookmarks.has_extension_id()) {
996       experiments->enhanced_bookmarks_ext_id =
997           enhanced_bookmarks.extension_id();
998     }
999     found_experiment = true;
1000   }
1001
1002   ReadNode gcm_invalidations_node(&trans);
1003   if (gcm_invalidations_node.InitByClientTagLookup(
1004           syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1005       BaseNode::INIT_OK) {
1006     const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1007         gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1008     if (gcm_invalidations.has_enabled()) {
1009       experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1010       found_experiment = true;
1011     }
1012   }
1013
1014   return found_experiment;
1015 }
1016
1017 bool SyncManagerImpl::HasUnsyncedItems() {
1018   ReadTransaction trans(FROM_HERE, GetUserShare());
1019   return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1020 }
1021
1022 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1023   return sync_encryption_handler_.get();
1024 }
1025
1026 ScopedVector<syncer::ProtocolEvent>
1027     SyncManagerImpl::GetBufferedProtocolEvents() {
1028   return protocol_event_buffer_.GetBufferedProtocolEvents();
1029 }
1030
1031 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1032     syncer::TypeDebugInfoObserver* observer) {
1033   model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1034 }
1035
1036 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1037     syncer::TypeDebugInfoObserver* observer) {
1038   model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1039 }
1040
1041 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1042     syncer::TypeDebugInfoObserver* observer) {
1043   return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1044 }
1045
1046 void SyncManagerImpl::RequestEmitDebugInfo() {
1047   model_type_registry_->RequestEmitDebugInfo();
1048 }
1049
1050 }  // namespace syncer