Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / sync / internal_api / sync_manager_impl.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/sync_manager_impl.h"
6
7 #include <string>
8
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
49
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
52
53 class GURL;
54
55 namespace syncer {
56
57 using sessions::SyncSessionContext;
58 using syncable::ImmutableWriteTransactionInfo;
59 using syncable::SPECIFICS;
60 using syncable::UNIQUE_POSITION;
61
62 namespace {
63
64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
65     ConfigureReason reason) {
66   switch (reason) {
67     case CONFIGURE_REASON_RECONFIGURATION:
68       return GetUpdatesCallerInfo::RECONFIGURATION;
69     case CONFIGURE_REASON_MIGRATION:
70       return GetUpdatesCallerInfo::MIGRATION;
71     case CONFIGURE_REASON_NEW_CLIENT:
72       return GetUpdatesCallerInfo::NEW_CLIENT;
73     case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
74     case CONFIGURE_REASON_CRYPTO:
75       return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
76     case CONFIGURE_REASON_PROGRAMMATIC:
77       return GetUpdatesCallerInfo::PROGRAMMATIC;
78     default:
79       NOTREACHED();
80   }
81   return GetUpdatesCallerInfo::UNKNOWN;
82 }
83
84 }  // namespace
85
86 SyncManagerImpl::SyncManagerImpl(const std::string& name)
87     : name_(name),
88       change_delegate_(NULL),
89       initialized_(false),
90       observing_network_connectivity_changes_(false),
91       report_unrecoverable_error_function_(NULL),
92       weak_ptr_factory_(this) {
93   // Pre-fill |notification_info_map_|.
94   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
95     notification_info_map_.insert(
96         std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
97   }
98 }
99
100 SyncManagerImpl::~SyncManagerImpl() {
101   DCHECK(thread_checker_.CalledOnValidThread());
102   CHECK(!initialized_);
103 }
104
105 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
106 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
107
108 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
109   base::DictionaryValue* value = new base::DictionaryValue();
110   value->SetInteger("totalCount", total_count);
111   value->SetString("payload", payload);
112   return value;
113 }
114
115 bool SyncManagerImpl::VisiblePositionsDiffer(
116     const syncable::EntryKernelMutation& mutation) const {
117   const syncable::EntryKernel& a = mutation.original;
118   const syncable::EntryKernel& b = mutation.mutated;
119   if (!b.ShouldMaintainPosition())
120     return false;
121   if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
122     return true;
123   if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
124     return true;
125   return false;
126 }
127
128 bool SyncManagerImpl::VisiblePropertiesDiffer(
129     const syncable::EntryKernelMutation& mutation,
130     Cryptographer* cryptographer) const {
131   const syncable::EntryKernel& a = mutation.original;
132   const syncable::EntryKernel& b = mutation.mutated;
133   const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
134   const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
135   DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
136             GetModelTypeFromSpecifics(b_specifics));
137   ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
138   // Suppress updates to items that aren't tracked by any browser model.
139   if (model_type < FIRST_REAL_MODEL_TYPE ||
140       !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
141     return false;
142   }
143   if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
144     return true;
145   if (!AreSpecificsEqual(cryptographer,
146                          a.ref(syncable::SPECIFICS),
147                          b.ref(syncable::SPECIFICS))) {
148     return true;
149   }
150   if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
151                                   b.ref(syncable::ATTACHMENT_METADATA))) {
152     return true;
153   }
154   // We only care if the name has changed if neither specifics is encrypted
155   // (encrypted nodes blow away the NON_UNIQUE_NAME).
156   if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
157       a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
158     return true;
159   if (VisiblePositionsDiffer(mutation))
160     return true;
161   return false;
162 }
163
164 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
165   return directory()->InitialSyncEndedTypes();
166 }
167
168 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
169     ModelTypeSet types) {
170   ModelTypeSet result;
171   for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
172     sync_pb::DataTypeProgressMarker marker;
173     directory()->GetDownloadProgress(i.Get(), &marker);
174
175     if (marker.token().empty())
176       result.Put(i.Get());
177   }
178   return result;
179 }
180
181 void SyncManagerImpl::ConfigureSyncer(
182     ConfigureReason reason,
183     ModelTypeSet to_download,
184     ModelTypeSet to_purge,
185     ModelTypeSet to_journal,
186     ModelTypeSet to_unapply,
187     const ModelSafeRoutingInfo& new_routing_info,
188     const base::Closure& ready_task,
189     const base::Closure& retry_task) {
190   DCHECK(thread_checker_.CalledOnValidThread());
191   DCHECK(!ready_task.is_null());
192   DCHECK(initialized_);
193
194   DVLOG(1) << "Configuring -"
195            << "\n\t" << "current types: "
196            << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
197            << "\n\t" << "types to download: "
198            << ModelTypeSetToString(to_download)
199            << "\n\t" << "types to purge: "
200            << ModelTypeSetToString(to_purge)
201            << "\n\t" << "types to journal: "
202            << ModelTypeSetToString(to_journal)
203            << "\n\t" << "types to unapply: "
204            << ModelTypeSetToString(to_unapply);
205   if (!PurgeDisabledTypes(to_purge,
206                           to_journal,
207                           to_unapply)) {
208     // We failed to cleanup the types. Invoke the ready task without actually
209     // configuring any types. The caller should detect this as a configuration
210     // failure and act appropriately.
211     ready_task.Run();
212     return;
213   }
214
215   ConfigurationParams params(GetSourceFromReason(reason),
216                              to_download,
217                              new_routing_info,
218                              ready_task,
219                              retry_task);
220
221   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
222   scheduler_->ScheduleConfiguration(params);
223 }
224
225 void SyncManagerImpl::Init(InitArgs* args) {
226   CHECK(!initialized_);
227   DCHECK(thread_checker_.CalledOnValidThread());
228   DCHECK(args->post_factory.get());
229   DCHECK(!args->credentials.email.empty());
230   DCHECK(!args->credentials.sync_token.empty());
231   DCHECK(!args->credentials.scope_set.empty());
232   DCHECK(args->cancelation_signal);
233   DVLOG(1) << "SyncManager starting Init...";
234
235   weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
236
237   change_delegate_ = args->change_delegate;
238
239   AddObserver(&js_sync_manager_observer_);
240   SetJsEventHandler(args->event_handler);
241
242   AddObserver(&debug_info_event_listener_);
243
244   database_path_ = args->database_location.Append(
245       syncable::Directory::kSyncDatabaseFilename);
246   unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
247   report_unrecoverable_error_function_ =
248       args->report_unrecoverable_error_function;
249
250   allstatus_.SetHasKeystoreKey(
251       !args->restored_keystore_key_for_bootstrapping.empty());
252   sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
253       &share_,
254       args->encryptor,
255       args->restored_key_for_bootstrapping,
256       args->restored_keystore_key_for_bootstrapping));
257   sync_encryption_handler_->AddObserver(this);
258   sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
259   sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
260
261   base::FilePath absolute_db_path = database_path_;
262   DCHECK(absolute_db_path.IsAbsolute());
263
264   scoped_ptr<syncable::DirectoryBackingStore> backing_store =
265       args->internal_components_factory->BuildDirectoryBackingStore(
266           InternalComponentsFactory::STORAGE_ON_DISK,
267           args->credentials.email, absolute_db_path).Pass();
268
269   DCHECK(backing_store.get());
270   share_.directory.reset(
271       new syncable::Directory(
272           backing_store.release(),
273           unrecoverable_error_handler_.get(),
274           report_unrecoverable_error_function_,
275           sync_encryption_handler_.get(),
276           sync_encryption_handler_->GetCryptographerUnsafe()));
277   share_.sync_credentials = args->credentials;
278
279   // UserShare is accessible to a lot of code that doesn't need access to the
280   // sync token so clear sync_token from the UserShare.
281   share_.sync_credentials.sync_token = "";
282
283   const std::string& username = args->credentials.email;
284   DVLOG(1) << "Username: " << username;
285   if (!OpenDirectory(username)) {
286     NotifyInitializationFailure();
287     LOG(ERROR) << "Sync manager initialization failed!";
288     return;
289   }
290
291   connection_manager_.reset(new SyncAPIServerConnectionManager(
292       args->service_url.host() + args->service_url.path(),
293       args->service_url.EffectiveIntPort(),
294       args->service_url.SchemeIsSecure(),
295       args->post_factory.release(),
296       args->cancelation_signal));
297   connection_manager_->set_client_id(directory()->cache_guid());
298   connection_manager_->AddListener(this);
299
300   std::string sync_id = directory()->cache_guid();
301
302   DVLOG(1) << "Setting sync client ID: " << sync_id;
303   allstatus_.SetSyncId(sync_id);
304   DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
305   allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
306
307   model_type_registry_.reset(
308       new ModelTypeRegistry(args->workers, directory(), this));
309   sync_encryption_handler_->AddObserver(model_type_registry_.get());
310
311   // Bind the SyncContext WeakPtr to this thread.  This helps us crash earlier
312   // if the pointer is misused in debug mode.
313   base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
314   weak_core.get();
315
316   sync_context_proxy_.reset(
317       new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
318
319   // Build a SyncSessionContext and store the worker in it.
320   DVLOG(1) << "Sync is bringing up SyncSessionContext.";
321   std::vector<SyncEngineEventListener*> listeners;
322   listeners.push_back(&allstatus_);
323   listeners.push_back(this);
324   session_context_ =
325       args->internal_components_factory->BuildContext(
326                                              connection_manager_.get(),
327                                              directory(),
328                                              args->extensions_activity,
329                                              listeners,
330                                              &debug_info_event_listener_,
331                                              model_type_registry_.get(),
332                                              args->invalidator_client_id)
333           .Pass();
334   session_context_->set_account_name(args->credentials.email);
335   scheduler_ = args->internal_components_factory->BuildScheduler(
336       name_, session_context_.get(), args->cancelation_signal).Pass();
337
338   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
339
340   initialized_ = true;
341
342   net::NetworkChangeNotifier::AddIPAddressObserver(this);
343   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
344   observing_network_connectivity_changes_ = true;
345
346   UpdateCredentials(args->credentials);
347
348   NotifyInitializationSuccess();
349 }
350
351 void SyncManagerImpl::NotifyInitializationSuccess() {
352   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
353                     OnInitializationComplete(
354                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
355                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
356                         true, InitialSyncEndedTypes()));
357 }
358
359 void SyncManagerImpl::NotifyInitializationFailure() {
360   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
361                     OnInitializationComplete(
362                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
363                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
364                         false, ModelTypeSet()));
365 }
366
367 void SyncManagerImpl::OnPassphraseRequired(
368     PassphraseRequiredReason reason,
369     const sync_pb::EncryptedData& pending_keys) {
370   // Does nothing.
371 }
372
373 void SyncManagerImpl::OnPassphraseAccepted() {
374   // Does nothing.
375 }
376
377 void SyncManagerImpl::OnBootstrapTokenUpdated(
378     const std::string& bootstrap_token,
379     BootstrapTokenType type) {
380   if (type == KEYSTORE_BOOTSTRAP_TOKEN)
381     allstatus_.SetHasKeystoreKey(true);
382 }
383
384 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
385                                               bool encrypt_everything) {
386   allstatus_.SetEncryptedTypes(encrypted_types);
387 }
388
389 void SyncManagerImpl::OnEncryptionComplete() {
390   // Does nothing.
391 }
392
393 void SyncManagerImpl::OnCryptographerStateChanged(
394     Cryptographer* cryptographer) {
395   allstatus_.SetCryptographerReady(cryptographer->is_ready());
396   allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
397   allstatus_.SetKeystoreMigrationTime(
398       sync_encryption_handler_->migration_time());
399 }
400
401 void SyncManagerImpl::OnPassphraseTypeChanged(
402     PassphraseType type,
403     base::Time explicit_passphrase_time) {
404   allstatus_.SetPassphraseType(type);
405   allstatus_.SetKeystoreMigrationTime(
406       sync_encryption_handler_->migration_time());
407 }
408
409 void SyncManagerImpl::StartSyncingNormally(
410     const ModelSafeRoutingInfo& routing_info) {
411   // Start the sync scheduler.
412   // TODO(sync): We always want the newest set of routes when we switch back
413   // to normal mode. Figure out how to enforce set_routing_info is always
414   // appropriately set and that it's only modified when switching to normal
415   // mode.
416   DCHECK(thread_checker_.CalledOnValidThread());
417   session_context_->SetRoutingInfo(routing_info);
418   scheduler_->Start(SyncScheduler::NORMAL_MODE);
419 }
420
421 syncable::Directory* SyncManagerImpl::directory() {
422   return share_.directory.get();
423 }
424
425 const SyncScheduler* SyncManagerImpl::scheduler() const {
426   return scheduler_.get();
427 }
428
429 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
430   return connection_manager_->HasInvalidAuthToken();
431 }
432
433 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
434   DCHECK(!initialized_) << "Should only happen once";
435
436   // Set before Open().
437   change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
438   WeakHandle<syncable::TransactionObserver> transaction_observer(
439       MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
440
441   syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
442   open_result = directory()->Open(username, this, transaction_observer);
443   if (open_result != syncable::OPENED) {
444     LOG(ERROR) << "Could not open share for:" << username;
445     return false;
446   }
447
448   // Unapplied datatypes (those that do not have initial sync ended set) get
449   // re-downloaded during any configuration. But, it's possible for a datatype
450   // to have a progress marker but not have initial sync ended yet, making
451   // it a candidate for migration. This is a problem, as the DataTypeManager
452   // does not support a migration while it's already in the middle of a
453   // configuration. As a result, any partially synced datatype can stall the
454   // DTM, waiting for the configuration to complete, which it never will due
455   // to the migration error. In addition, a partially synced nigori will
456   // trigger the migration logic before the backend is initialized, resulting
457   // in crashes. We therefore detect and purge any partially synced types as
458   // part of initialization.
459   if (!PurgePartiallySyncedTypes())
460     return false;
461
462   return true;
463 }
464
465 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
466   ModelTypeSet partially_synced_types = ModelTypeSet::All();
467   partially_synced_types.RemoveAll(InitialSyncEndedTypes());
468   partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
469       ModelTypeSet::All()));
470
471   DVLOG(1) << "Purging partially synced types "
472            << ModelTypeSetToString(partially_synced_types);
473   UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
474                        partially_synced_types.Size());
475   if (partially_synced_types.Empty())
476     return true;
477   return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
478                                              ModelTypeSet(),
479                                              ModelTypeSet());
480 }
481
482 bool SyncManagerImpl::PurgeDisabledTypes(
483     ModelTypeSet to_purge,
484     ModelTypeSet to_journal,
485     ModelTypeSet to_unapply) {
486   if (to_purge.Empty())
487     return true;
488   DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
489   DCHECK(to_purge.HasAll(to_journal));
490   DCHECK(to_purge.HasAll(to_unapply));
491   return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
492 }
493
494 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
495   DCHECK(thread_checker_.CalledOnValidThread());
496   DCHECK(initialized_);
497   DCHECK(!credentials.email.empty());
498   DCHECK(!credentials.sync_token.empty());
499   DCHECK(!credentials.scope_set.empty());
500
501   observing_network_connectivity_changes_ = true;
502   if (!connection_manager_->SetAuthToken(credentials.sync_token))
503     return;  // Auth token is known to be invalid, so exit early.
504
505   scheduler_->OnCredentialsUpdated();
506
507   // TODO(zea): pass the credential age to the debug info event listener.
508 }
509
510 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
511   DCHECK(thread_checker_.CalledOnValidThread());
512   observers_.AddObserver(observer);
513 }
514
515 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
516   DCHECK(thread_checker_.CalledOnValidThread());
517   observers_.RemoveObserver(observer);
518 }
519
520 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
521   DCHECK(thread_checker_.CalledOnValidThread());
522
523   // Prevent any in-flight method calls from running.  Also
524   // invalidates |weak_handle_this_| and |change_observer_|.
525   weak_ptr_factory_.InvalidateWeakPtrs();
526   js_mutation_event_observer_.InvalidateWeakPtrs();
527
528   scheduler_.reset();
529   session_context_.reset();
530
531   if (model_type_registry_)
532     sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
533
534   model_type_registry_.reset();
535
536   if (sync_encryption_handler_) {
537     sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
538     sync_encryption_handler_->RemoveObserver(this);
539   }
540
541   SetJsEventHandler(WeakHandle<JsEventHandler>());
542   RemoveObserver(&js_sync_manager_observer_);
543
544   RemoveObserver(&debug_info_event_listener_);
545
546   // |connection_manager_| may end up being NULL here in tests (in synchronous
547   // initialization mode).
548   //
549   // TODO(akalin): Fix this behavior.
550   if (connection_manager_)
551     connection_manager_->RemoveListener(this);
552   connection_manager_.reset();
553
554   net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
555   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
556   observing_network_connectivity_changes_ = false;
557
558   if (initialized_ && directory()) {
559     directory()->SaveChanges();
560   }
561
562   share_.directory.reset();
563
564   change_delegate_ = NULL;
565
566   initialized_ = false;
567
568   // We reset these here, since only now we know they will not be
569   // accessed from other threads (since we shut down everything).
570   change_observer_.Reset();
571   weak_handle_this_.Reset();
572 }
573
574 void SyncManagerImpl::OnIPAddressChanged() {
575   if (!observing_network_connectivity_changes_) {
576     DVLOG(1) << "IP address change dropped.";
577     return;
578   }
579   DVLOG(1) << "IP address change detected.";
580   OnNetworkConnectivityChangedImpl();
581 }
582
583 void SyncManagerImpl::OnConnectionTypeChanged(
584   net::NetworkChangeNotifier::ConnectionType) {
585   if (!observing_network_connectivity_changes_) {
586     DVLOG(1) << "Connection type change dropped.";
587     return;
588   }
589   DVLOG(1) << "Connection type change detected.";
590   OnNetworkConnectivityChangedImpl();
591 }
592
593 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
594   DCHECK(thread_checker_.CalledOnValidThread());
595   scheduler_->OnConnectionStatusChange();
596 }
597
598 void SyncManagerImpl::OnServerConnectionEvent(
599     const ServerConnectionEvent& event) {
600   DCHECK(thread_checker_.CalledOnValidThread());
601   if (event.connection_code ==
602       HttpResponse::SERVER_CONNECTION_OK) {
603     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
604                       OnConnectionStatusChange(CONNECTION_OK));
605   }
606
607   if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
608     observing_network_connectivity_changes_ = false;
609     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
610                       OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
611   }
612
613   if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
614     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
615                       OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
616   }
617 }
618
619 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
620     ModelTypeSet models_with_changes) {
621   // This notification happens immediately after the transaction mutex is
622   // released. This allows work to be performed without blocking other threads
623   // from acquiring a transaction.
624   if (!change_delegate_)
625     return;
626
627   // Call commit.
628   for (ModelTypeSet::Iterator it = models_with_changes.First();
629        it.Good(); it.Inc()) {
630     change_delegate_->OnChangesComplete(it.Get());
631     change_observer_.Call(
632         FROM_HERE,
633         &SyncManager::ChangeObserver::OnChangesComplete,
634         it.Get());
635   }
636 }
637
638 ModelTypeSet
639 SyncManagerImpl::HandleTransactionEndingChangeEvent(
640     const ImmutableWriteTransactionInfo& write_transaction_info,
641     syncable::BaseTransaction* trans) {
642   // This notification happens immediately before a syncable WriteTransaction
643   // falls out of scope. It happens while the channel mutex is still held,
644   // and while the transaction mutex is held, so it cannot be re-entrant.
645   if (!change_delegate_ || change_records_.empty())
646     return ModelTypeSet();
647
648   // This will continue the WriteTransaction using a read only wrapper.
649   // This is the last chance for read to occur in the WriteTransaction
650   // that's closing. This special ReadTransaction will not close the
651   // underlying transaction.
652   ReadTransaction read_trans(GetUserShare(), trans);
653
654   ModelTypeSet models_with_changes;
655   for (ChangeRecordMap::const_iterator it = change_records_.begin();
656       it != change_records_.end(); ++it) {
657     DCHECK(!it->second.Get().empty());
658     ModelType type = ModelTypeFromInt(it->first);
659     change_delegate_->
660         OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
661                          &read_trans, it->second);
662     change_observer_.Call(FROM_HERE,
663         &SyncManager::ChangeObserver::OnChangesApplied,
664         type, write_transaction_info.Get().id, it->second);
665     models_with_changes.Put(type);
666   }
667   change_records_.clear();
668   return models_with_changes;
669 }
670
671 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
672     const ImmutableWriteTransactionInfo& write_transaction_info,
673     syncable::BaseTransaction* trans,
674     std::vector<int64>* entries_changed) {
675   // We have been notified about a user action changing a sync model.
676   LOG_IF(WARNING, !change_records_.empty()) <<
677       "CALCULATE_CHANGES called with unapplied old changes.";
678
679   // The mutated model type, or UNSPECIFIED if nothing was mutated.
680   ModelTypeSet mutated_model_types;
681
682   const syncable::ImmutableEntryKernelMutationMap& mutations =
683       write_transaction_info.Get().mutations;
684   for (syncable::EntryKernelMutationMap::const_iterator it =
685            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
686     if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
687       continue;
688     }
689
690     ModelType model_type =
691         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
692     if (model_type < FIRST_REAL_MODEL_TYPE) {
693       NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
694       continue;
695     }
696
697     // Found real mutation.
698     if (model_type != UNSPECIFIED) {
699       mutated_model_types.Put(model_type);
700       entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
701     }
702   }
703
704   // Nudge if necessary.
705   if (!mutated_model_types.Empty()) {
706     if (weak_handle_this_.IsInitialized()) {
707       weak_handle_this_.Call(FROM_HERE,
708                              &SyncManagerImpl::RequestNudgeForDataTypes,
709                              FROM_HERE,
710                              mutated_model_types);
711     } else {
712       NOTREACHED();
713     }
714   }
715 }
716
717 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
718     ModelType type, ChangeReorderBuffer* buffer,
719     Cryptographer* cryptographer, const syncable::EntryKernel& original,
720     bool existed_before, bool exists_now) {
721   // If this is a deletion and the datatype was encrypted, we need to decrypt it
722   // and attach it to the buffer.
723   if (!exists_now && existed_before) {
724     sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
725     if (type == PASSWORDS) {
726       // Passwords must use their own legacy ExtraPasswordChangeRecordData.
727       scoped_ptr<sync_pb::PasswordSpecificsData> data(
728           DecryptPasswordSpecifics(original_specifics, cryptographer));
729       if (!data) {
730         NOTREACHED();
731         return;
732       }
733       buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
734     } else if (original_specifics.has_encrypted()) {
735       // All other datatypes can just create a new unencrypted specifics and
736       // attach it.
737       const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
738       if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
739         NOTREACHED();
740         return;
741       }
742     }
743     buffer->SetSpecificsForId(id, original_specifics);
744   }
745 }
746
747 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
748     const ImmutableWriteTransactionInfo& write_transaction_info,
749     syncable::BaseTransaction* trans,
750     std::vector<int64>* entries_changed) {
751   // We only expect one notification per sync step, so change_buffers_ should
752   // contain no pending entries.
753   LOG_IF(WARNING, !change_records_.empty()) <<
754       "CALCULATE_CHANGES called with unapplied old changes.";
755
756   ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
757
758   Cryptographer* crypto = directory()->GetCryptographer(trans);
759   const syncable::ImmutableEntryKernelMutationMap& mutations =
760       write_transaction_info.Get().mutations;
761   for (syncable::EntryKernelMutationMap::const_iterator it =
762            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
763     bool existed_before = !it->second.original.ref(syncable::IS_DEL);
764     bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
765
766     // Omit items that aren't associated with a model.
767     ModelType type =
768         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
769     if (type < FIRST_REAL_MODEL_TYPE)
770       continue;
771
772     int64 handle = it->first;
773     if (exists_now && !existed_before)
774       change_buffers[type].PushAddedItem(handle);
775     else if (!exists_now && existed_before)
776       change_buffers[type].PushDeletedItem(handle);
777     else if (exists_now && existed_before &&
778              VisiblePropertiesDiffer(it->second, crypto)) {
779       change_buffers[type].PushUpdatedItem(handle);
780     }
781
782     SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
783                              it->second.original, existed_before, exists_now);
784   }
785
786   ReadTransaction read_trans(GetUserShare(), trans);
787   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
788     if (!change_buffers[i].IsEmpty()) {
789       if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
790                                                      &(change_records_[i]))) {
791         for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
792           entries_changed->push_back((change_records_[i].Get())[j].id);
793       }
794       if (change_records_[i].Get().empty())
795         change_records_.erase(i);
796     }
797   }
798 }
799
800 void SyncManagerImpl::RequestNudgeForDataTypes(
801     const tracked_objects::Location& nudge_location,
802     ModelTypeSet types) {
803   debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
804
805   scheduler_->ScheduleLocalNudge(types, nudge_location);
806 }
807
808 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
809   DCHECK(thread_checker_.CalledOnValidThread());
810   scheduler_->ScheduleInitialSyncNudge(type);
811 }
812
813 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
814   DCHECK(thread_checker_.CalledOnValidThread());
815   RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
816 }
817
818 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
819   DCHECK(thread_checker_.CalledOnValidThread());
820   RefreshTypes(ModelTypeSet(type));
821 }
822
823 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
824   DCHECK(thread_checker_.CalledOnValidThread());
825   // Only send an event if this is due to a cycle ending and this cycle
826   // concludes a canonical "sync" process; that is, based on what is known
827   // locally we are "all happy" and up-to-date.  There may be new changes on
828   // the server, but we'll get them on a subsequent sync.
829   //
830   // Notifications are sent at the end of every sync cycle, regardless of
831   // whether we should sync again.
832   if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
833     if (!initialized_) {
834       DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
835                << "initialized";
836       return;
837     }
838
839     DVLOG(1) << "Sending OnSyncCycleCompleted";
840     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
841                       OnSyncCycleCompleted(event.snapshot));
842   }
843 }
844
845 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
846   FOR_EACH_OBSERVER(
847       SyncManager::Observer, observers_,
848       OnActionableError(error));
849 }
850
851 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
852
853 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
854
855 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
856   FOR_EACH_OBSERVER(
857       SyncManager::Observer, observers_,
858       OnMigrationRequested(types));
859 }
860
861 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
862   protocol_event_buffer_.RecordProtocolEvent(event);
863   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
864                     OnProtocolEvent(event));
865 }
866
867 void SyncManagerImpl::SetJsEventHandler(
868     const WeakHandle<JsEventHandler>& event_handler) {
869   js_sync_manager_observer_.SetJsEventHandler(event_handler);
870   js_mutation_event_observer_.SetJsEventHandler(event_handler);
871   js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
872 }
873
874 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
875     syncer::ModelType type) {
876   DirectoryTypeDebugInfoEmitterMap* emitter_map =
877       model_type_registry_->directory_type_debug_info_emitter_map();
878   DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
879
880   if (it == emitter_map->end()) {
881     // This can happen in some cases.  The UI thread makes requests of us
882     // when it doesn't really know which types are enabled or disabled.
883     DLOG(WARNING) << "Asked to return debug info for invalid type "
884                   << ModelTypeToString(type);
885     return scoped_ptr<base::ListValue>(new base::ListValue());
886   }
887
888   return it->second->GetAllNodes();
889 }
890
891 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
892   DCHECK(thread_checker_.CalledOnValidThread());
893
894   DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
895   allstatus_.SetNotificationsEnabled(invalidator_enabled);
896   scheduler_->SetNotificationsEnabled(invalidator_enabled);
897 }
898
899 void SyncManagerImpl::OnIncomingInvalidation(
900     syncer::ModelType type,
901     scoped_ptr<InvalidationInterface> invalidation) {
902   DCHECK(thread_checker_.CalledOnValidThread());
903
904   scheduler_->ScheduleInvalidationNudge(
905       type,
906       invalidation.Pass(),
907       FROM_HERE);
908 }
909
910 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
911   DCHECK(thread_checker_.CalledOnValidThread());
912   if (types.Empty()) {
913     LOG(WARNING) << "Sync received refresh request with no types specified.";
914   } else {
915     scheduler_->ScheduleLocalRefreshRequest(
916         types, FROM_HERE);
917   }
918 }
919
920 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
921   return allstatus_.status();
922 }
923
924 void SyncManagerImpl::SaveChanges() {
925   directory()->SaveChanges();
926 }
927
928 UserShare* SyncManagerImpl::GetUserShare() {
929   DCHECK(initialized_);
930   return &share_;
931 }
932
933 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
934   DCHECK(initialized_);
935   return sync_context_proxy_.get();
936 }
937
938 const std::string SyncManagerImpl::cache_guid() {
939   DCHECK(initialized_);
940   return directory()->cache_guid();
941 }
942
943 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
944   ReadTransaction trans(FROM_HERE, GetUserShare());
945   ReadNode nigori_node(&trans);
946   if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
947     DVLOG(1) << "Couldn't find Nigori node.";
948     return false;
949   }
950   bool found_experiment = false;
951
952   ReadNode favicon_sync_node(&trans);
953   if (favicon_sync_node.InitByClientTagLookup(
954           syncer::EXPERIMENTS,
955           syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
956     experiments->favicon_sync_limit =
957         favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
958             favicon_sync_limit();
959     found_experiment = true;
960   }
961
962   ReadNode pre_commit_update_avoidance_node(&trans);
963   if (pre_commit_update_avoidance_node.InitByClientTagLookup(
964           syncer::EXPERIMENTS,
965           syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
966     session_context_->set_server_enabled_pre_commit_update_avoidance(
967         pre_commit_update_avoidance_node.GetExperimentsSpecifics().
968             pre_commit_update_avoidance().enabled());
969     // We don't bother setting found_experiment.  The frontend doesn't need to
970     // know about this.
971   }
972
973   ReadNode gcm_channel_node(&trans);
974   if (gcm_channel_node.InitByClientTagLookup(
975           syncer::EXPERIMENTS,
976           syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
977       gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
978     experiments->gcm_channel_state =
979         (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
980          syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
981     found_experiment = true;
982   }
983
984   ReadNode enhanced_bookmarks_node(&trans);
985   if (enhanced_bookmarks_node.InitByClientTagLookup(
986           syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
987           BaseNode::INIT_OK &&
988       enhanced_bookmarks_node.GetExperimentsSpecifics()
989           .has_enhanced_bookmarks()) {
990     const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
991         enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
992     if (enhanced_bookmarks.has_enabled())
993       experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
994     if (enhanced_bookmarks.has_extension_id()) {
995       experiments->enhanced_bookmarks_ext_id =
996           enhanced_bookmarks.extension_id();
997     }
998     found_experiment = true;
999   }
1000
1001   ReadNode gcm_invalidations_node(&trans);
1002   if (gcm_invalidations_node.InitByClientTagLookup(
1003           syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1004       BaseNode::INIT_OK) {
1005     const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1006         gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1007     if (gcm_invalidations.has_enabled()) {
1008       experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1009       found_experiment = true;
1010     }
1011   }
1012
1013   return found_experiment;
1014 }
1015
1016 bool SyncManagerImpl::HasUnsyncedItems() {
1017   ReadTransaction trans(FROM_HERE, GetUserShare());
1018   return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1019 }
1020
1021 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1022   return sync_encryption_handler_.get();
1023 }
1024
1025 ScopedVector<syncer::ProtocolEvent>
1026     SyncManagerImpl::GetBufferedProtocolEvents() {
1027   return protocol_event_buffer_.GetBufferedProtocolEvents();
1028 }
1029
1030 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1031     syncer::TypeDebugInfoObserver* observer) {
1032   model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1033 }
1034
1035 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1036     syncer::TypeDebugInfoObserver* observer) {
1037   model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1038 }
1039
1040 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1041     syncer::TypeDebugInfoObserver* observer) {
1042   return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1043 }
1044
1045 void SyncManagerImpl::RequestEmitDebugInfo() {
1046   model_type_registry_->RequestEmitDebugInfo();
1047 }
1048
1049 }  // namespace syncer