e12ec8f7c236ca49a40fcbcb601500b191550517
[platform/framework/web/crosswalk.git] / src / sync / internal_api / sync_manager_impl.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/sync_manager_impl.h"
6
7 #include <string>
8
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
49
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
52
53 class GURL;
54
55 namespace syncer {
56
57 using sessions::SyncSessionContext;
58 using syncable::ImmutableWriteTransactionInfo;
59 using syncable::SPECIFICS;
60 using syncable::UNIQUE_POSITION;
61
62 namespace {
63
64 // Delays for syncer nudges.
65 static const int kDefaultNudgeDelayMilliseconds = 200;
66 static const int kPreferencesNudgeDelayMilliseconds = 2000;
67 static const int kSyncRefreshDelayMsec = 500;
68 static const int kSyncSchedulerDelayMsec = 250;
69
70 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
71     ConfigureReason reason) {
72   switch (reason) {
73     case CONFIGURE_REASON_RECONFIGURATION:
74       return GetUpdatesCallerInfo::RECONFIGURATION;
75     case CONFIGURE_REASON_MIGRATION:
76       return GetUpdatesCallerInfo::MIGRATION;
77     case CONFIGURE_REASON_NEW_CLIENT:
78       return GetUpdatesCallerInfo::NEW_CLIENT;
79     case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
80     case CONFIGURE_REASON_CRYPTO:
81       return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
82     default:
83       NOTREACHED();
84   }
85   return GetUpdatesCallerInfo::UNKNOWN;
86 }
87
88 }  // namespace
89
90 // A class to calculate nudge delays for types.
91 class NudgeStrategy {
92  public:
93   static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type,
94                                           SyncManagerImpl* core) {
95     NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type);
96     return GetNudgeDelayTimeDeltaFromType(delay_type,
97                                           model_type,
98                                           core);
99   }
100
101  private:
102   // Possible types of nudge delay for datatypes.
103   // Note: These are just hints. If a sync happens then all dirty entries
104   // would be committed as part of the sync.
105   enum NudgeDelayStrategy {
106     // Sync right away.
107     IMMEDIATE,
108
109     // Sync this change while syncing another change.
110     ACCOMPANY_ONLY,
111
112     // The datatype does not use one of the predefined wait times but defines
113     // its own wait time logic for nudge.
114     CUSTOM,
115   };
116
117   static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) {
118     switch (type) {
119      case AUTOFILL:
120        return ACCOMPANY_ONLY;
121      case PREFERENCES:
122      case SESSIONS:
123      case FAVICON_IMAGES:
124      case FAVICON_TRACKING:
125        return CUSTOM;
126      default:
127        return IMMEDIATE;
128     }
129   }
130
131   static TimeDelta GetNudgeDelayTimeDeltaFromType(
132       const NudgeDelayStrategy& delay_type, const ModelType& model_type,
133       const SyncManagerImpl* core) {
134     CHECK(core);
135     TimeDelta delay = TimeDelta::FromMilliseconds(
136        kDefaultNudgeDelayMilliseconds);
137     switch (delay_type) {
138      case IMMEDIATE:
139        delay = TimeDelta::FromMilliseconds(
140            kDefaultNudgeDelayMilliseconds);
141        break;
142      case ACCOMPANY_ONLY:
143        delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds);
144        break;
145      case CUSTOM:
146        switch (model_type) {
147          case PREFERENCES:
148            delay = TimeDelta::FromMilliseconds(
149                kPreferencesNudgeDelayMilliseconds);
150            break;
151          case SESSIONS:
152          case FAVICON_IMAGES:
153          case FAVICON_TRACKING:
154            delay = core->scheduler()->GetSessionsCommitDelay();
155            break;
156          default:
157            NOTREACHED();
158        }
159        break;
160      default:
161        NOTREACHED();
162     }
163     return delay;
164   }
165 };
166
167 SyncManagerImpl::SyncManagerImpl(const std::string& name)
168     : name_(name),
169       change_delegate_(NULL),
170       initialized_(false),
171       observing_network_connectivity_changes_(false),
172       report_unrecoverable_error_function_(NULL),
173       weak_ptr_factory_(this) {
174   // Pre-fill |notification_info_map_|.
175   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
176     notification_info_map_.insert(
177         std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
178   }
179 }
180
181 SyncManagerImpl::~SyncManagerImpl() {
182   DCHECK(thread_checker_.CalledOnValidThread());
183   CHECK(!initialized_);
184 }
185
186 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
187 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
188
189 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
190   base::DictionaryValue* value = new base::DictionaryValue();
191   value->SetInteger("totalCount", total_count);
192   value->SetString("payload", payload);
193   return value;
194 }
195
196 bool SyncManagerImpl::VisiblePositionsDiffer(
197     const syncable::EntryKernelMutation& mutation) const {
198   const syncable::EntryKernel& a = mutation.original;
199   const syncable::EntryKernel& b = mutation.mutated;
200   if (!b.ShouldMaintainPosition())
201     return false;
202   if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
203     return true;
204   if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
205     return true;
206   return false;
207 }
208
209 bool SyncManagerImpl::VisiblePropertiesDiffer(
210     const syncable::EntryKernelMutation& mutation,
211     Cryptographer* cryptographer) const {
212   const syncable::EntryKernel& a = mutation.original;
213   const syncable::EntryKernel& b = mutation.mutated;
214   const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
215   const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
216   DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
217             GetModelTypeFromSpecifics(b_specifics));
218   ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
219   // Suppress updates to items that aren't tracked by any browser model.
220   if (model_type < FIRST_REAL_MODEL_TYPE ||
221       !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
222     return false;
223   }
224   if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
225     return true;
226   if (!AreSpecificsEqual(cryptographer,
227                          a.ref(syncable::SPECIFICS),
228                          b.ref(syncable::SPECIFICS))) {
229     return true;
230   }
231   if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
232                                   b.ref(syncable::ATTACHMENT_METADATA))) {
233     return true;
234   }
235   // We only care if the name has changed if neither specifics is encrypted
236   // (encrypted nodes blow away the NON_UNIQUE_NAME).
237   if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
238       a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
239     return true;
240   if (VisiblePositionsDiffer(mutation))
241     return true;
242   return false;
243 }
244
245 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
246   return directory()->InitialSyncEndedTypes();
247 }
248
249 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
250     ModelTypeSet types) {
251   ModelTypeSet result;
252   for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
253     sync_pb::DataTypeProgressMarker marker;
254     directory()->GetDownloadProgress(i.Get(), &marker);
255
256     if (marker.token().empty())
257       result.Put(i.Get());
258   }
259   return result;
260 }
261
262 void SyncManagerImpl::ConfigureSyncer(
263     ConfigureReason reason,
264     ModelTypeSet to_download,
265     ModelTypeSet to_purge,
266     ModelTypeSet to_journal,
267     ModelTypeSet to_unapply,
268     const ModelSafeRoutingInfo& new_routing_info,
269     const base::Closure& ready_task,
270     const base::Closure& retry_task) {
271   DCHECK(thread_checker_.CalledOnValidThread());
272   DCHECK(!ready_task.is_null());
273   DCHECK(!retry_task.is_null());
274   DCHECK(initialized_);
275
276   DVLOG(1) << "Configuring -"
277            << "\n\t" << "current types: "
278            << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
279            << "\n\t" << "types to download: "
280            << ModelTypeSetToString(to_download)
281            << "\n\t" << "types to purge: "
282            << ModelTypeSetToString(to_purge)
283            << "\n\t" << "types to journal: "
284            << ModelTypeSetToString(to_journal)
285            << "\n\t" << "types to unapply: "
286            << ModelTypeSetToString(to_unapply);
287   if (!PurgeDisabledTypes(to_purge,
288                           to_journal,
289                           to_unapply)) {
290     // We failed to cleanup the types. Invoke the ready task without actually
291     // configuring any types. The caller should detect this as a configuration
292     // failure and act appropriately.
293     ready_task.Run();
294     return;
295   }
296
297   ConfigurationParams params(GetSourceFromReason(reason),
298                              to_download,
299                              new_routing_info,
300                              ready_task,
301                              retry_task);
302
303   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
304   scheduler_->ScheduleConfiguration(params);
305 }
306
307 void SyncManagerImpl::Init(InitArgs* args) {
308   CHECK(!initialized_);
309   DCHECK(thread_checker_.CalledOnValidThread());
310   DCHECK(args->post_factory.get());
311   DCHECK(!args->credentials.email.empty());
312   DCHECK(!args->credentials.sync_token.empty());
313   DCHECK(!args->credentials.scope_set.empty());
314   DCHECK(args->cancelation_signal);
315   DVLOG(1) << "SyncManager starting Init...";
316
317   weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
318
319   change_delegate_ = args->change_delegate;
320
321   AddObserver(&js_sync_manager_observer_);
322   SetJsEventHandler(args->event_handler);
323
324   AddObserver(&debug_info_event_listener_);
325
326   database_path_ = args->database_location.Append(
327       syncable::Directory::kSyncDatabaseFilename);
328   unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
329   report_unrecoverable_error_function_ =
330       args->report_unrecoverable_error_function;
331
332   allstatus_.SetHasKeystoreKey(
333       !args->restored_keystore_key_for_bootstrapping.empty());
334   sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
335       &share_,
336       args->encryptor,
337       args->restored_key_for_bootstrapping,
338       args->restored_keystore_key_for_bootstrapping));
339   sync_encryption_handler_->AddObserver(this);
340   sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
341   sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
342
343   base::FilePath absolute_db_path = database_path_;
344   DCHECK(absolute_db_path.IsAbsolute());
345
346   scoped_ptr<syncable::DirectoryBackingStore> backing_store =
347       args->internal_components_factory->BuildDirectoryBackingStore(
348           InternalComponentsFactory::STORAGE_ON_DISK,
349           args->credentials.email, absolute_db_path).Pass();
350
351   DCHECK(backing_store.get());
352   share_.directory.reset(
353       new syncable::Directory(
354           backing_store.release(),
355           unrecoverable_error_handler_.get(),
356           report_unrecoverable_error_function_,
357           sync_encryption_handler_.get(),
358           sync_encryption_handler_->GetCryptographerUnsafe()));
359   share_.sync_credentials = args->credentials;
360
361   // UserShare is accessible to a lot of code that doesn't need access to the
362   // sync token so clear sync_token from the UserShare.
363   share_.sync_credentials.sync_token = "";
364
365   const std::string& username = args->credentials.email;
366   DVLOG(1) << "Username: " << username;
367   if (!OpenDirectory(username)) {
368     NotifyInitializationFailure();
369     LOG(ERROR) << "Sync manager initialization failed!";
370     return;
371   }
372
373   connection_manager_.reset(new SyncAPIServerConnectionManager(
374       args->service_url.host() + args->service_url.path(),
375       args->service_url.EffectiveIntPort(),
376       args->service_url.SchemeIsSecure(),
377       args->post_factory.release(),
378       args->cancelation_signal));
379   connection_manager_->set_client_id(directory()->cache_guid());
380   connection_manager_->AddListener(this);
381
382   std::string sync_id = directory()->cache_guid();
383
384   DVLOG(1) << "Setting sync client ID: " << sync_id;
385   allstatus_.SetSyncId(sync_id);
386   DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
387   allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
388
389   model_type_registry_.reset(
390       new ModelTypeRegistry(args->workers, directory(), this));
391
392   // Bind the SyncContext WeakPtr to this thread.  This helps us crash earlier
393   // if the pointer is misused in debug mode.
394   base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
395   weak_core.get();
396
397   sync_context_proxy_.reset(
398       new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
399
400   // Build a SyncSessionContext and store the worker in it.
401   DVLOG(1) << "Sync is bringing up SyncSessionContext.";
402   std::vector<SyncEngineEventListener*> listeners;
403   listeners.push_back(&allstatus_);
404   listeners.push_back(this);
405   session_context_ =
406       args->internal_components_factory->BuildContext(
407                                              connection_manager_.get(),
408                                              directory(),
409                                              args->extensions_activity,
410                                              listeners,
411                                              &debug_info_event_listener_,
412                                              model_type_registry_.get(),
413                                              args->invalidator_client_id)
414           .Pass();
415   session_context_->set_account_name(args->credentials.email);
416   scheduler_ = args->internal_components_factory->BuildScheduler(
417       name_, session_context_.get(), args->cancelation_signal).Pass();
418
419   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
420
421   initialized_ = true;
422
423   net::NetworkChangeNotifier::AddIPAddressObserver(this);
424   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
425   observing_network_connectivity_changes_ = true;
426
427   UpdateCredentials(args->credentials);
428
429   NotifyInitializationSuccess();
430 }
431
432 void SyncManagerImpl::NotifyInitializationSuccess() {
433   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
434                     OnInitializationComplete(
435                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
436                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
437                         true, InitialSyncEndedTypes()));
438 }
439
440 void SyncManagerImpl::NotifyInitializationFailure() {
441   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
442                     OnInitializationComplete(
443                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
444                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
445                         false, ModelTypeSet()));
446 }
447
448 void SyncManagerImpl::OnPassphraseRequired(
449     PassphraseRequiredReason reason,
450     const sync_pb::EncryptedData& pending_keys) {
451   // Does nothing.
452 }
453
454 void SyncManagerImpl::OnPassphraseAccepted() {
455   // Does nothing.
456 }
457
458 void SyncManagerImpl::OnBootstrapTokenUpdated(
459     const std::string& bootstrap_token,
460     BootstrapTokenType type) {
461   if (type == KEYSTORE_BOOTSTRAP_TOKEN)
462     allstatus_.SetHasKeystoreKey(true);
463 }
464
465 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
466                                               bool encrypt_everything) {
467   allstatus_.SetEncryptedTypes(encrypted_types);
468 }
469
470 void SyncManagerImpl::OnEncryptionComplete() {
471   // Does nothing.
472 }
473
474 void SyncManagerImpl::OnCryptographerStateChanged(
475     Cryptographer* cryptographer) {
476   allstatus_.SetCryptographerReady(cryptographer->is_ready());
477   allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
478   allstatus_.SetKeystoreMigrationTime(
479       sync_encryption_handler_->migration_time());
480 }
481
482 void SyncManagerImpl::OnPassphraseTypeChanged(
483     PassphraseType type,
484     base::Time explicit_passphrase_time) {
485   allstatus_.SetPassphraseType(type);
486   allstatus_.SetKeystoreMigrationTime(
487       sync_encryption_handler_->migration_time());
488 }
489
490 void SyncManagerImpl::StartSyncingNormally(
491     const ModelSafeRoutingInfo& routing_info) {
492   // Start the sync scheduler.
493   // TODO(sync): We always want the newest set of routes when we switch back
494   // to normal mode. Figure out how to enforce set_routing_info is always
495   // appropriately set and that it's only modified when switching to normal
496   // mode.
497   DCHECK(thread_checker_.CalledOnValidThread());
498   session_context_->SetRoutingInfo(routing_info);
499   scheduler_->Start(SyncScheduler::NORMAL_MODE);
500 }
501
502 syncable::Directory* SyncManagerImpl::directory() {
503   return share_.directory.get();
504 }
505
506 const SyncScheduler* SyncManagerImpl::scheduler() const {
507   return scheduler_.get();
508 }
509
510 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
511   return connection_manager_->HasInvalidAuthToken();
512 }
513
514 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
515   DCHECK(!initialized_) << "Should only happen once";
516
517   // Set before Open().
518   change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
519   WeakHandle<syncable::TransactionObserver> transaction_observer(
520       MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
521
522   syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
523   open_result = directory()->Open(username, this, transaction_observer);
524   if (open_result != syncable::OPENED) {
525     LOG(ERROR) << "Could not open share for:" << username;
526     return false;
527   }
528
529   // Unapplied datatypes (those that do not have initial sync ended set) get
530   // re-downloaded during any configuration. But, it's possible for a datatype
531   // to have a progress marker but not have initial sync ended yet, making
532   // it a candidate for migration. This is a problem, as the DataTypeManager
533   // does not support a migration while it's already in the middle of a
534   // configuration. As a result, any partially synced datatype can stall the
535   // DTM, waiting for the configuration to complete, which it never will due
536   // to the migration error. In addition, a partially synced nigori will
537   // trigger the migration logic before the backend is initialized, resulting
538   // in crashes. We therefore detect and purge any partially synced types as
539   // part of initialization.
540   if (!PurgePartiallySyncedTypes())
541     return false;
542
543   return true;
544 }
545
546 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
547   ModelTypeSet partially_synced_types = ModelTypeSet::All();
548   partially_synced_types.RemoveAll(InitialSyncEndedTypes());
549   partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
550       ModelTypeSet::All()));
551
552   DVLOG(1) << "Purging partially synced types "
553            << ModelTypeSetToString(partially_synced_types);
554   UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
555                        partially_synced_types.Size());
556   if (partially_synced_types.Empty())
557     return true;
558   return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
559                                              ModelTypeSet(),
560                                              ModelTypeSet());
561 }
562
563 bool SyncManagerImpl::PurgeDisabledTypes(
564     ModelTypeSet to_purge,
565     ModelTypeSet to_journal,
566     ModelTypeSet to_unapply) {
567   if (to_purge.Empty())
568     return true;
569   DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
570   DCHECK(to_purge.HasAll(to_journal));
571   DCHECK(to_purge.HasAll(to_unapply));
572   return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
573 }
574
575 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
576   DCHECK(thread_checker_.CalledOnValidThread());
577   DCHECK(initialized_);
578   DCHECK(!credentials.email.empty());
579   DCHECK(!credentials.sync_token.empty());
580   DCHECK(!credentials.scope_set.empty());
581
582   observing_network_connectivity_changes_ = true;
583   if (!connection_manager_->SetAuthToken(credentials.sync_token))
584     return;  // Auth token is known to be invalid, so exit early.
585
586   scheduler_->OnCredentialsUpdated();
587
588   // TODO(zea): pass the credential age to the debug info event listener.
589 }
590
591 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
592   DCHECK(thread_checker_.CalledOnValidThread());
593   observers_.AddObserver(observer);
594 }
595
596 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
597   DCHECK(thread_checker_.CalledOnValidThread());
598   observers_.RemoveObserver(observer);
599 }
600
601 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
602   DCHECK(thread_checker_.CalledOnValidThread());
603
604   // Prevent any in-flight method calls from running.  Also
605   // invalidates |weak_handle_this_| and |change_observer_|.
606   weak_ptr_factory_.InvalidateWeakPtrs();
607   js_mutation_event_observer_.InvalidateWeakPtrs();
608
609   scheduler_.reset();
610   session_context_.reset();
611   model_type_registry_.reset();
612
613   if (sync_encryption_handler_) {
614     sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
615     sync_encryption_handler_->RemoveObserver(this);
616   }
617
618   SetJsEventHandler(WeakHandle<JsEventHandler>());
619   RemoveObserver(&js_sync_manager_observer_);
620
621   RemoveObserver(&debug_info_event_listener_);
622
623   // |connection_manager_| may end up being NULL here in tests (in synchronous
624   // initialization mode).
625   //
626   // TODO(akalin): Fix this behavior.
627   if (connection_manager_)
628     connection_manager_->RemoveListener(this);
629   connection_manager_.reset();
630
631   net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
632   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
633   observing_network_connectivity_changes_ = false;
634
635   if (initialized_ && directory()) {
636     directory()->SaveChanges();
637   }
638
639   share_.directory.reset();
640
641   change_delegate_ = NULL;
642
643   initialized_ = false;
644
645   // We reset these here, since only now we know they will not be
646   // accessed from other threads (since we shut down everything).
647   change_observer_.Reset();
648   weak_handle_this_.Reset();
649 }
650
651 void SyncManagerImpl::OnIPAddressChanged() {
652   if (!observing_network_connectivity_changes_) {
653     DVLOG(1) << "IP address change dropped.";
654     return;
655   }
656   DVLOG(1) << "IP address change detected.";
657   OnNetworkConnectivityChangedImpl();
658 }
659
660 void SyncManagerImpl::OnConnectionTypeChanged(
661   net::NetworkChangeNotifier::ConnectionType) {
662   if (!observing_network_connectivity_changes_) {
663     DVLOG(1) << "Connection type change dropped.";
664     return;
665   }
666   DVLOG(1) << "Connection type change detected.";
667   OnNetworkConnectivityChangedImpl();
668 }
669
670 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
671   DCHECK(thread_checker_.CalledOnValidThread());
672   scheduler_->OnConnectionStatusChange();
673 }
674
675 void SyncManagerImpl::OnServerConnectionEvent(
676     const ServerConnectionEvent& event) {
677   DCHECK(thread_checker_.CalledOnValidThread());
678   if (event.connection_code ==
679       HttpResponse::SERVER_CONNECTION_OK) {
680     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
681                       OnConnectionStatusChange(CONNECTION_OK));
682   }
683
684   if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
685     observing_network_connectivity_changes_ = false;
686     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
687                       OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
688   }
689
690   if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
691     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
692                       OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
693   }
694 }
695
696 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
697     ModelTypeSet models_with_changes) {
698   // This notification happens immediately after the transaction mutex is
699   // released. This allows work to be performed without blocking other threads
700   // from acquiring a transaction.
701   if (!change_delegate_)
702     return;
703
704   // Call commit.
705   for (ModelTypeSet::Iterator it = models_with_changes.First();
706        it.Good(); it.Inc()) {
707     change_delegate_->OnChangesComplete(it.Get());
708     change_observer_.Call(
709         FROM_HERE,
710         &SyncManager::ChangeObserver::OnChangesComplete,
711         it.Get());
712   }
713 }
714
715 ModelTypeSet
716 SyncManagerImpl::HandleTransactionEndingChangeEvent(
717     const ImmutableWriteTransactionInfo& write_transaction_info,
718     syncable::BaseTransaction* trans) {
719   // This notification happens immediately before a syncable WriteTransaction
720   // falls out of scope. It happens while the channel mutex is still held,
721   // and while the transaction mutex is held, so it cannot be re-entrant.
722   if (!change_delegate_ || change_records_.empty())
723     return ModelTypeSet();
724
725   // This will continue the WriteTransaction using a read only wrapper.
726   // This is the last chance for read to occur in the WriteTransaction
727   // that's closing. This special ReadTransaction will not close the
728   // underlying transaction.
729   ReadTransaction read_trans(GetUserShare(), trans);
730
731   ModelTypeSet models_with_changes;
732   for (ChangeRecordMap::const_iterator it = change_records_.begin();
733       it != change_records_.end(); ++it) {
734     DCHECK(!it->second.Get().empty());
735     ModelType type = ModelTypeFromInt(it->first);
736     change_delegate_->
737         OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
738                          &read_trans, it->second);
739     change_observer_.Call(FROM_HERE,
740         &SyncManager::ChangeObserver::OnChangesApplied,
741         type, write_transaction_info.Get().id, it->second);
742     models_with_changes.Put(type);
743   }
744   change_records_.clear();
745   return models_with_changes;
746 }
747
748 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
749     const ImmutableWriteTransactionInfo& write_transaction_info,
750     syncable::BaseTransaction* trans,
751     std::vector<int64>* entries_changed) {
752   // We have been notified about a user action changing a sync model.
753   LOG_IF(WARNING, !change_records_.empty()) <<
754       "CALCULATE_CHANGES called with unapplied old changes.";
755
756   // The mutated model type, or UNSPECIFIED if nothing was mutated.
757   ModelTypeSet mutated_model_types;
758
759   const syncable::ImmutableEntryKernelMutationMap& mutations =
760       write_transaction_info.Get().mutations;
761   for (syncable::EntryKernelMutationMap::const_iterator it =
762            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
763     if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
764       continue;
765     }
766
767     ModelType model_type =
768         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
769     if (model_type < FIRST_REAL_MODEL_TYPE) {
770       NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
771       continue;
772     }
773
774     // Found real mutation.
775     if (model_type != UNSPECIFIED) {
776       mutated_model_types.Put(model_type);
777       entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
778     }
779   }
780
781   // Nudge if necessary.
782   if (!mutated_model_types.Empty()) {
783     if (weak_handle_this_.IsInitialized()) {
784       weak_handle_this_.Call(FROM_HERE,
785                              &SyncManagerImpl::RequestNudgeForDataTypes,
786                              FROM_HERE,
787                              mutated_model_types);
788     } else {
789       NOTREACHED();
790     }
791   }
792 }
793
794 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
795     ModelType type, ChangeReorderBuffer* buffer,
796     Cryptographer* cryptographer, const syncable::EntryKernel& original,
797     bool existed_before, bool exists_now) {
798   // If this is a deletion and the datatype was encrypted, we need to decrypt it
799   // and attach it to the buffer.
800   if (!exists_now && existed_before) {
801     sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
802     if (type == PASSWORDS) {
803       // Passwords must use their own legacy ExtraPasswordChangeRecordData.
804       scoped_ptr<sync_pb::PasswordSpecificsData> data(
805           DecryptPasswordSpecifics(original_specifics, cryptographer));
806       if (!data) {
807         NOTREACHED();
808         return;
809       }
810       buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
811     } else if (original_specifics.has_encrypted()) {
812       // All other datatypes can just create a new unencrypted specifics and
813       // attach it.
814       const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
815       if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
816         NOTREACHED();
817         return;
818       }
819     }
820     buffer->SetSpecificsForId(id, original_specifics);
821   }
822 }
823
824 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
825     const ImmutableWriteTransactionInfo& write_transaction_info,
826     syncable::BaseTransaction* trans,
827     std::vector<int64>* entries_changed) {
828   // We only expect one notification per sync step, so change_buffers_ should
829   // contain no pending entries.
830   LOG_IF(WARNING, !change_records_.empty()) <<
831       "CALCULATE_CHANGES called with unapplied old changes.";
832
833   ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
834
835   Cryptographer* crypto = directory()->GetCryptographer(trans);
836   const syncable::ImmutableEntryKernelMutationMap& mutations =
837       write_transaction_info.Get().mutations;
838   for (syncable::EntryKernelMutationMap::const_iterator it =
839            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
840     bool existed_before = !it->second.original.ref(syncable::IS_DEL);
841     bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
842
843     // Omit items that aren't associated with a model.
844     ModelType type =
845         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
846     if (type < FIRST_REAL_MODEL_TYPE)
847       continue;
848
849     int64 handle = it->first;
850     if (exists_now && !existed_before)
851       change_buffers[type].PushAddedItem(handle);
852     else if (!exists_now && existed_before)
853       change_buffers[type].PushDeletedItem(handle);
854     else if (exists_now && existed_before &&
855              VisiblePropertiesDiffer(it->second, crypto)) {
856       change_buffers[type].PushUpdatedItem(handle);
857     }
858
859     SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
860                              it->second.original, existed_before, exists_now);
861   }
862
863   ReadTransaction read_trans(GetUserShare(), trans);
864   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
865     if (!change_buffers[i].IsEmpty()) {
866       if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
867                                                      &(change_records_[i]))) {
868         for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
869           entries_changed->push_back((change_records_[i].Get())[j].id);
870       }
871       if (change_records_[i].Get().empty())
872         change_records_.erase(i);
873     }
874   }
875 }
876
877 TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta(
878     const ModelType& model_type) {
879   return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this);
880 }
881
882 void SyncManagerImpl::RequestNudgeForDataTypes(
883     const tracked_objects::Location& nudge_location,
884     ModelTypeSet types) {
885   debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
886
887   // TODO(lipalani) : Calculate the nudge delay based on all types.
888   base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta(
889       types.First().Get(),
890       this);
891   scheduler_->ScheduleLocalNudge(nudge_delay,
892                                  types,
893                                  nudge_location);
894 }
895
896 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
897   DCHECK(thread_checker_.CalledOnValidThread());
898   scheduler_->ScheduleInitialSyncNudge(type);
899 }
900
901 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
902   DCHECK(thread_checker_.CalledOnValidThread());
903   RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
904 }
905
906 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
907   DCHECK(thread_checker_.CalledOnValidThread());
908   RefreshTypes(ModelTypeSet(type));
909 }
910
911 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
912   DCHECK(thread_checker_.CalledOnValidThread());
913   // Only send an event if this is due to a cycle ending and this cycle
914   // concludes a canonical "sync" process; that is, based on what is known
915   // locally we are "all happy" and up-to-date.  There may be new changes on
916   // the server, but we'll get them on a subsequent sync.
917   //
918   // Notifications are sent at the end of every sync cycle, regardless of
919   // whether we should sync again.
920   if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
921     if (!initialized_) {
922       DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
923                << "initialized";
924       return;
925     }
926
927     DVLOG(1) << "Sending OnSyncCycleCompleted";
928     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
929                       OnSyncCycleCompleted(event.snapshot));
930   }
931 }
932
933 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
934   FOR_EACH_OBSERVER(
935       SyncManager::Observer, observers_,
936       OnActionableError(error));
937 }
938
939 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
940
941 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
942
943 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
944   FOR_EACH_OBSERVER(
945       SyncManager::Observer, observers_,
946       OnMigrationRequested(types));
947 }
948
949 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
950   protocol_event_buffer_.RecordProtocolEvent(event);
951   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
952                     OnProtocolEvent(event));
953 }
954
955 void SyncManagerImpl::SetJsEventHandler(
956     const WeakHandle<JsEventHandler>& event_handler) {
957   js_sync_manager_observer_.SetJsEventHandler(event_handler);
958   js_mutation_event_observer_.SetJsEventHandler(event_handler);
959   js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
960 }
961
962 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
963     syncer::ModelType type) {
964   DirectoryTypeDebugInfoEmitterMap* emitter_map =
965       model_type_registry_->directory_type_debug_info_emitter_map();
966   DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
967
968   if (it == emitter_map->end()) {
969     // This can happen in some cases.  The UI thread makes requests of us
970     // when it doesn't really know which types are enabled or disabled.
971     DLOG(WARNING) << "Asked to return debug info for invalid type "
972                   << ModelTypeToString(type);
973     return scoped_ptr<base::ListValue>();
974   }
975
976   return it->second->GetAllNodes();
977 }
978
979 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
980   DCHECK(thread_checker_.CalledOnValidThread());
981
982   DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
983   allstatus_.SetNotificationsEnabled(invalidator_enabled);
984   scheduler_->SetNotificationsEnabled(invalidator_enabled);
985 }
986
987 void SyncManagerImpl::OnIncomingInvalidation(
988     syncer::ModelType type,
989     scoped_ptr<InvalidationInterface> invalidation) {
990   DCHECK(thread_checker_.CalledOnValidThread());
991
992   scheduler_->ScheduleInvalidationNudge(
993       TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec),
994       type,
995       invalidation.Pass(),
996       FROM_HERE);
997 }
998
999 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
1000   DCHECK(thread_checker_.CalledOnValidThread());
1001   if (types.Empty()) {
1002     LOG(WARNING) << "Sync received refresh request with no types specified.";
1003   } else {
1004     scheduler_->ScheduleLocalRefreshRequest(
1005         TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec),
1006         types, FROM_HERE);
1007   }
1008 }
1009
1010 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
1011   return allstatus_.status();
1012 }
1013
1014 void SyncManagerImpl::SaveChanges() {
1015   directory()->SaveChanges();
1016 }
1017
1018 UserShare* SyncManagerImpl::GetUserShare() {
1019   DCHECK(initialized_);
1020   return &share_;
1021 }
1022
1023 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
1024   DCHECK(initialized_);
1025   return sync_context_proxy_.get();
1026 }
1027
1028 const std::string SyncManagerImpl::cache_guid() {
1029   DCHECK(initialized_);
1030   return directory()->cache_guid();
1031 }
1032
1033 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
1034   ReadTransaction trans(FROM_HERE, GetUserShare());
1035   ReadNode nigori_node(&trans);
1036   if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
1037     DVLOG(1) << "Couldn't find Nigori node.";
1038     return false;
1039   }
1040   bool found_experiment = false;
1041
1042   ReadNode favicon_sync_node(&trans);
1043   if (favicon_sync_node.InitByClientTagLookup(
1044           syncer::EXPERIMENTS,
1045           syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
1046     experiments->favicon_sync_limit =
1047         favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
1048             favicon_sync_limit();
1049     found_experiment = true;
1050   }
1051
1052   ReadNode pre_commit_update_avoidance_node(&trans);
1053   if (pre_commit_update_avoidance_node.InitByClientTagLookup(
1054           syncer::EXPERIMENTS,
1055           syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
1056     session_context_->set_server_enabled_pre_commit_update_avoidance(
1057         pre_commit_update_avoidance_node.GetExperimentsSpecifics().
1058             pre_commit_update_avoidance().enabled());
1059     // We don't bother setting found_experiment.  The frontend doesn't need to
1060     // know about this.
1061   }
1062
1063   ReadNode gcm_channel_node(&trans);
1064   if (gcm_channel_node.InitByClientTagLookup(
1065           syncer::EXPERIMENTS,
1066           syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
1067       gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
1068     experiments->gcm_channel_state =
1069         (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
1070          syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
1071     found_experiment = true;
1072   }
1073
1074   ReadNode enhanced_bookmarks_node(&trans);
1075   if (enhanced_bookmarks_node.InitByClientTagLookup(
1076           syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
1077           BaseNode::INIT_OK &&
1078       enhanced_bookmarks_node.GetExperimentsSpecifics()
1079           .has_enhanced_bookmarks()) {
1080     const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
1081         enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
1082     if (enhanced_bookmarks.has_enabled())
1083       experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
1084     if (enhanced_bookmarks.has_extension_id()) {
1085       experiments->enhanced_bookmarks_ext_id =
1086           enhanced_bookmarks.extension_id();
1087     }
1088     found_experiment = true;
1089   }
1090
1091   ReadNode gcm_invalidations_node(&trans);
1092   if (gcm_invalidations_node.InitByClientTagLookup(
1093           syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1094       BaseNode::INIT_OK) {
1095     const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1096         gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1097     if (gcm_invalidations.has_enabled()) {
1098       experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1099       found_experiment = true;
1100     }
1101   }
1102
1103   return found_experiment;
1104 }
1105
1106 bool SyncManagerImpl::HasUnsyncedItems() {
1107   ReadTransaction trans(FROM_HERE, GetUserShare());
1108   return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1109 }
1110
1111 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1112   return sync_encryption_handler_.get();
1113 }
1114
1115 ScopedVector<syncer::ProtocolEvent>
1116     SyncManagerImpl::GetBufferedProtocolEvents() {
1117   return protocol_event_buffer_.GetBufferedProtocolEvents();
1118 }
1119
1120 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1121     syncer::TypeDebugInfoObserver* observer) {
1122   model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1123 }
1124
1125 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1126     syncer::TypeDebugInfoObserver* observer) {
1127   model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1128 }
1129
1130 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1131     syncer::TypeDebugInfoObserver* observer) {
1132   return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1133 }
1134
1135 void SyncManagerImpl::RequestEmitDebugInfo() {
1136   model_type_registry_->RequestEmitDebugInfo();
1137 }
1138
1139 // static.
1140 int SyncManagerImpl::GetDefaultNudgeDelay() {
1141   return kDefaultNudgeDelayMilliseconds;
1142 }
1143
1144 // static.
1145 int SyncManagerImpl::GetPreferencesNudgeDelay() {
1146   return kPreferencesNudgeDelayMilliseconds;
1147 }
1148
1149 }  // namespace syncer