Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / google_apis / gcm / engine / mcs_client.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/engine/mcs_client.h"
6
7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/time/clock.h"
12 #include "base/time/time.h"
13 #include "google_apis/gcm/base/mcs_util.h"
14 #include "google_apis/gcm/base/socket_stream.h"
15 #include "google_apis/gcm/engine/connection_factory.h"
16
17 using namespace google::protobuf::io;
18
19 namespace gcm {
20
21 namespace {
22
23 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
24
25 // The category of messages intended for the GCM client itself from MCS.
26 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
27
28 // The from field for messages originating in the GCM client.
29 const char kGCMFromField[] = "gcm@android.com";
30
31 // MCS status message types.
32 // TODO(zea): handle these at the GCMClient layer.
33 const char kIdleNotification[] = "IdleNotification";
34 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
35 // const char kPowerNotification[] = "PowerNotification";
36 // const char kDataActiveNotification[] = "DataActiveNotification";
37
38 // The number of unacked messages to allow before sending a stream ack.
39 // Applies to both incoming and outgoing messages.
40 // TODO(zea): make this server configurable.
41 const int kUnackedMessageBeforeStreamAck = 10;
42
43 // The global maximum number of pending messages to have in the send queue.
44 const size_t kMaxSendQueueSize = 10 * 1024;
45
46 // The maximum message size that can be sent to the server.
47 const int kMaxMessageBytes = 4 * 1024;  // 4KB, like the server.
48
49 // Helper for converting a proto persistent id list to a vector of strings.
50 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
51                                     std::vector<std::string>* id_list) {
52   mcs_proto::SelectiveAck selective_ack;
53   if (!selective_ack.ParseFromString(bytes))
54     return false;
55   std::vector<std::string> new_list;
56   for (int i = 0; i < selective_ack.id_size(); ++i) {
57     DCHECK(!selective_ack.id(i).empty());
58     new_list.push_back(selective_ack.id(i));
59   }
60   id_list->swap(new_list);
61   return true;
62 }
63
64 }  // namespace
65
66 class CollapseKey {
67  public:
68   explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
69   ~CollapseKey();
70
71   // Comparison operator for use in maps.
72   bool operator<(const CollapseKey& right) const;
73
74   // Whether the message had a valid collapse key.
75   bool IsValid() const;
76
77   std::string token() const { return token_; }
78   std::string app_id() const { return app_id_; }
79   int64 device_user_id() const { return device_user_id_; }
80
81  private:
82   const std::string token_;
83   const std::string app_id_;
84   const int64 device_user_id_;
85 };
86
87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
88     : token_(message.token()),
89       app_id_(message.category()),
90       device_user_id_(message.device_user_id()) {}
91
92 CollapseKey::~CollapseKey() {}
93
94 bool CollapseKey::IsValid() const {
95   // Device user id is optional, but the application id and token are not.
96   return !token_.empty() && !app_id_.empty();
97 }
98
99 bool CollapseKey::operator<(const CollapseKey& right) const {
100   if (device_user_id_ != right.device_user_id())
101     return device_user_id_ < right.device_user_id();
102   if (app_id_ != right.app_id())
103     return app_id_ < right.app_id();
104   return token_ < right.token();
105 }
106
107 struct ReliablePacketInfo {
108   ReliablePacketInfo();
109   ~ReliablePacketInfo();
110
111   // The stream id with which the message was sent.
112   uint32 stream_id;
113
114   // If reliable delivery was requested, the persistent id of the message.
115   std::string persistent_id;
116
117   // The type of message itself (for easier lookup).
118   uint8 tag;
119
120   // The protobuf of the message itself.
121   MCSProto protobuf;
122 };
123
124 ReliablePacketInfo::ReliablePacketInfo()
125   : stream_id(0), tag(0) {
126 }
127 ReliablePacketInfo::~ReliablePacketInfo() {}
128
129 MCSClient::MCSClient(const std::string& version_string,
130                      base::Clock* clock,
131                      ConnectionFactory* connection_factory,
132                      GCMStore* gcm_store)
133     : version_string_(version_string),
134       clock_(clock),
135       state_(UNINITIALIZED),
136       android_id_(0),
137       security_token_(0),
138       connection_factory_(connection_factory),
139       connection_handler_(NULL),
140       last_device_to_server_stream_id_received_(0),
141       last_server_to_device_stream_id_received_(0),
142       stream_id_out_(0),
143       stream_id_in_(0),
144       gcm_store_(gcm_store),
145       weak_ptr_factory_(this) {
146 }
147
148 MCSClient::~MCSClient() {
149 }
150
151 void MCSClient::Initialize(
152     const ErrorCallback& error_callback,
153     const OnMessageReceivedCallback& message_received_callback,
154     const OnMessageSentCallback& message_sent_callback,
155     scoped_ptr<GCMStore::LoadResult> load_result) {
156   DCHECK_EQ(state_, UNINITIALIZED);
157
158   state_ = LOADED;
159   mcs_error_callback_ = error_callback;
160   message_received_callback_ = message_received_callback;
161   message_sent_callback_ = message_sent_callback;
162
163   connection_factory_->Initialize(
164       base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
165                  weak_ptr_factory_.GetWeakPtr()),
166       base::Bind(&MCSClient::HandlePacketFromWire,
167                  weak_ptr_factory_.GetWeakPtr()),
168       base::Bind(&MCSClient::MaybeSendMessage,
169                  weak_ptr_factory_.GetWeakPtr()));
170   connection_handler_ = connection_factory_->GetConnectionHandler();
171
172   stream_id_out_ = 1;  // Login request is hardcoded to id 1.
173
174   android_id_ = load_result->device_android_id;
175   security_token_ = load_result->device_security_token;
176
177   if (android_id_ == 0) {
178     DVLOG(1) << "No device credentials found, assuming new client.";
179     // No need to try and load RMQ data in that case.
180     return;
181   }
182
183   // |android_id_| is non-zero, so should |security_token_|.
184   DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
185                                  << " is non-zero.";
186
187   DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
188            << " incoming acks pending and "
189            << load_result->outgoing_messages.size()
190            << " outgoing messages pending.";
191
192   restored_unackeds_server_ids_ = load_result->incoming_messages;
193
194   // First go through and order the outgoing messages by recency.
195   std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
196   std::vector<PersistentId> expired_ttl_ids;
197   for (GCMStore::OutgoingMessageMap::iterator iter =
198            load_result->outgoing_messages.begin();
199        iter != load_result->outgoing_messages.end(); ++iter) {
200     uint64 timestamp = 0;
201     if (!base::StringToUint64(iter->first, &timestamp)) {
202       LOG(ERROR) << "Invalid restored message.";
203       // TODO(fgorski): Error: data unreadable
204       mcs_error_callback_.Run();
205       return;
206     }
207
208     // Check if the TTL has expired for this message.
209     if (HasTTLExpired(*iter->second, clock_)) {
210       expired_ttl_ids.push_back(iter->first);
211       NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
212       continue;
213     }
214
215     ordered_messages[timestamp] = iter->second.release();
216   }
217
218   if (!expired_ttl_ids.empty()) {
219     gcm_store_->RemoveOutgoingMessages(
220         expired_ttl_ids,
221         base::Bind(&MCSClient::OnGCMUpdateFinished,
222                    weak_ptr_factory_.GetWeakPtr()));
223   }
224
225   // Now go through and add the outgoing messages to the send queue in their
226   // appropriate order (oldest at front, most recent at back).
227   for (std::map<uint64, google::protobuf::MessageLite*>::iterator
228            iter = ordered_messages.begin();
229        iter != ordered_messages.end(); ++iter) {
230     ReliablePacketInfo* packet_info = new ReliablePacketInfo();
231     packet_info->protobuf.reset(iter->second);
232     packet_info->tag = GetMCSProtoTag(*iter->second);
233     packet_info->persistent_id = base::Uint64ToString(iter->first);
234     to_send_.push_back(make_linked_ptr(packet_info));
235
236     if (packet_info->tag == kDataMessageStanzaTag) {
237       mcs_proto::DataMessageStanza* data_message =
238           reinterpret_cast<mcs_proto::DataMessageStanza*>(
239               packet_info->protobuf.get());
240       CollapseKey collapse_key(*data_message);
241       if (collapse_key.IsValid())
242         collapse_key_map_[collapse_key] = packet_info;
243     }
244   }
245 }
246
247 void MCSClient::Login(uint64 android_id, uint64 security_token) {
248   DCHECK_EQ(state_, LOADED);
249   DCHECK(android_id_ == 0 || android_id_ == android_id);
250   DCHECK(security_token_ == 0 || security_token_ == security_token);
251
252   if (android_id != android_id_ && security_token != security_token_) {
253     DCHECK(android_id);
254     DCHECK(security_token);
255     android_id_ = android_id;
256     security_token_ = security_token;
257   }
258
259   DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
260
261   state_ = CONNECTING;
262   connection_factory_->Connect();
263 }
264
265 void MCSClient::SendMessage(const MCSMessage& message) {
266   int ttl = GetTTL(message.GetProtobuf());
267   DCHECK_GE(ttl, 0);
268   if (to_send_.size() > kMaxSendQueueSize) {
269     NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
270     return;
271   }
272   if (message.size() > kMaxMessageBytes) {
273     NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
274     return;
275   }
276
277   scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
278   packet_info->tag = message.tag();
279   packet_info->protobuf = message.CloneProtobuf();
280
281   if (ttl > 0) {
282     DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
283
284     // First check if this message should replace a pending message with the
285     // same collapse key.
286     mcs_proto::DataMessageStanza* data_message =
287         reinterpret_cast<mcs_proto::DataMessageStanza*>(
288             packet_info->protobuf.get());
289     CollapseKey collapse_key(*data_message);
290     if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
291       ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
292       DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
293                << original_packet->persistent_id;
294       original_packet->protobuf = packet_info->protobuf.Pass();
295       SetPersistentId(original_packet->persistent_id,
296                       original_packet->protobuf.get());
297       gcm_store_->OverwriteOutgoingMessage(
298           original_packet->persistent_id,
299           message,
300           base::Bind(&MCSClient::OnGCMUpdateFinished,
301                      weak_ptr_factory_.GetWeakPtr()));
302
303       // The message is already queued, return.
304       return;
305     } else {
306       PersistentId persistent_id = GetNextPersistentId();
307       DVLOG(1) << "Setting persistent id to " << persistent_id;
308       packet_info->persistent_id = persistent_id;
309       SetPersistentId(persistent_id, packet_info->protobuf.get());
310       if (!gcm_store_->AddOutgoingMessage(
311                persistent_id,
312                MCSMessage(message.tag(), *(packet_info->protobuf)),
313                base::Bind(&MCSClient::OnGCMUpdateFinished,
314                           weak_ptr_factory_.GetWeakPtr()))) {
315         NotifyMessageSendStatus(message.GetProtobuf(),
316                                 APP_QUEUE_SIZE_LIMIT_REACHED);
317         return;
318       }
319     }
320
321     if (collapse_key.IsValid())
322       collapse_key_map_[collapse_key] = packet_info.get();
323   } else if (!connection_factory_->IsEndpointReachable()) {
324     DVLOG(1) << "No active connection, dropping message.";
325     NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
326     return;
327   }
328
329   to_send_.push_back(make_linked_ptr(packet_info.release()));
330
331   // Notify that the messages has been succsfully queued for sending.
332   // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
333   NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
334
335   MaybeSendMessage();
336 }
337
338 void MCSClient::Destroy() {
339   gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished,
340                                  weak_ptr_factory_.GetWeakPtr()));
341 }
342
343 void MCSClient::ResetStateAndBuildLoginRequest(
344     mcs_proto::LoginRequest* request) {
345   DCHECK(android_id_);
346   DCHECK(security_token_);
347   stream_id_in_ = 0;
348   stream_id_out_ = 1;
349   last_device_to_server_stream_id_received_ = 0;
350   last_server_to_device_stream_id_received_ = 0;
351
352   heartbeat_manager_.Stop();
353
354   // Add any pending acknowledgments to the list of ids.
355   for (StreamIdToPersistentIdMap::const_iterator iter =
356            unacked_server_ids_.begin();
357        iter != unacked_server_ids_.end(); ++iter) {
358     restored_unackeds_server_ids_.push_back(iter->second);
359   }
360   unacked_server_ids_.clear();
361
362   // Any acknowledged server ids which have not been confirmed by the server
363   // are treated like unacknowledged ids.
364   for (std::map<StreamId, PersistentIdList>::const_iterator iter =
365            acked_server_ids_.begin();
366        iter != acked_server_ids_.end(); ++iter) {
367     restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
368                                          iter->second.begin(),
369                                          iter->second.end());
370   }
371   acked_server_ids_.clear();
372
373   // Then build the request, consuming all pending acknowledgments.
374   request->Swap(BuildLoginRequest(android_id_,
375                                   security_token_,
376                                   version_string_).get());
377   for (PersistentIdList::const_iterator iter =
378            restored_unackeds_server_ids_.begin();
379        iter != restored_unackeds_server_ids_.end(); ++iter) {
380     request->add_received_persistent_id(*iter);
381   }
382   acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
383   restored_unackeds_server_ids_.clear();
384
385   // Push all unacknowledged messages to front of send queue. No need to save
386   // to RMQ, as all messages that reach this point should already have been
387   // saved as necessary.
388   while (!to_resend_.empty()) {
389     to_send_.push_front(to_resend_.back());
390     to_resend_.pop_back();
391   }
392
393   // Drop all TTL == 0 or expired TTL messages from the queue.
394   std::deque<MCSPacketInternal> new_to_send;
395   std::vector<PersistentId> expired_ttl_ids;
396   while (!to_send_.empty()) {
397     MCSPacketInternal packet = PopMessageForSend();
398     if (GetTTL(*packet->protobuf) > 0 &&
399         !HasTTLExpired(*packet->protobuf, clock_)) {
400       new_to_send.push_back(packet);
401     } else {
402       // If the TTL was 0 there is no persistent id, so no need to remove the
403       // message from the persistent store.
404       if (!packet->persistent_id.empty())
405         expired_ttl_ids.push_back(packet->persistent_id);
406       NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
407     }
408   }
409
410   if (!expired_ttl_ids.empty()) {
411     DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
412              << " messages expired.";
413     gcm_store_->RemoveOutgoingMessages(
414         expired_ttl_ids,
415         base::Bind(&MCSClient::OnGCMUpdateFinished,
416                    weak_ptr_factory_.GetWeakPtr()));
417   }
418
419   to_send_.swap(new_to_send);
420
421   DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
422            << " incoming acks pending, and " << to_send_.size()
423            << " pending outgoing messages.";
424
425   state_ = CONNECTING;
426 }
427
428 void MCSClient::SendHeartbeat() {
429   SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
430 }
431
432 void MCSClient::OnGCMUpdateFinished(bool success) {
433   LOG_IF(ERROR, !success) << "GCM Update failed!";
434   UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
435   // TODO(zea): Rebuild the store from scratch in case of persistence failure?
436 }
437
438 void MCSClient::MaybeSendMessage() {
439   if (to_send_.empty())
440     return;
441
442   // If the connection has been reset, do nothing. On reconnection
443   // MaybeSendMessage will be automatically invoked again.
444   // TODO(zea): consider doing TTL expiration at connection reset time, rather
445   // than reconnect time.
446   if (!connection_factory_->IsEndpointReachable())
447     return;
448
449   MCSPacketInternal packet = PopMessageForSend();
450   if (HasTTLExpired(*packet->protobuf, clock_)) {
451     DCHECK(!packet->persistent_id.empty());
452     DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
453     NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
454     gcm_store_->RemoveOutgoingMessage(
455         packet->persistent_id,
456         base::Bind(&MCSClient::OnGCMUpdateFinished,
457                    weak_ptr_factory_.GetWeakPtr()));
458     base::MessageLoop::current()->PostTask(
459             FROM_HERE,
460             base::Bind(&MCSClient::MaybeSendMessage,
461                        weak_ptr_factory_.GetWeakPtr()));
462     return;
463   }
464   DVLOG(1) << "Pending output message found, sending.";
465   if (!packet->persistent_id.empty())
466     to_resend_.push_back(packet);
467   SendPacketToWire(packet.get());
468 }
469
470 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
471   packet_info->stream_id = ++stream_id_out_;
472   DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
473
474   // Set the queued time as necessary.
475   if (packet_info->tag == kDataMessageStanzaTag) {
476     mcs_proto::DataMessageStanza* data_message =
477         reinterpret_cast<mcs_proto::DataMessageStanza*>(
478             packet_info->protobuf.get());
479     uint64 sent = data_message->sent();
480     DCHECK_GT(sent, 0U);
481     int queued = (clock_->Now().ToInternalValue() /
482         base::Time::kMicrosecondsPerSecond) - sent;
483     DVLOG(1) << "Message was queued for " << queued << " seconds.";
484     data_message->set_queued(queued);
485   }
486
487   // Set the proper last received stream id to acknowledge received server
488   // packets.
489   DVLOG(1) << "Setting last stream id received to "
490            << stream_id_in_;
491   SetLastStreamIdReceived(stream_id_in_,
492                           packet_info->protobuf.get());
493   if (stream_id_in_ != last_server_to_device_stream_id_received_) {
494     last_server_to_device_stream_id_received_ = stream_id_in_;
495     // Mark all acknowledged server messages as such. Note: they're not dropped,
496     // as it may be that they'll need to be re-acked if this message doesn't
497     // make it.
498     PersistentIdList persistent_id_list;
499     for (StreamIdToPersistentIdMap::const_iterator iter =
500              unacked_server_ids_.begin();
501          iter != unacked_server_ids_.end(); ++iter) {
502       DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
503       persistent_id_list.push_back(iter->second);
504     }
505     unacked_server_ids_.clear();
506     acked_server_ids_[stream_id_out_] = persistent_id_list;
507   }
508
509   connection_handler_->SendMessage(*packet_info->protobuf);
510 }
511
512 void MCSClient::HandleMCSDataMesssage(
513     scoped_ptr<google::protobuf::MessageLite> protobuf) {
514   mcs_proto::DataMessageStanza* data_message =
515       reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
516   // TODO(zea): implement a proper status manager rather than hardcoding these
517   // values.
518   scoped_ptr<mcs_proto::DataMessageStanza> response(
519       new mcs_proto::DataMessageStanza());
520   response->set_from(kGCMFromField);
521   response->set_sent(clock_->Now().ToInternalValue() /
522                          base::Time::kMicrosecondsPerSecond);
523   response->set_ttl(0);
524   bool send = false;
525   for (int i = 0; i < data_message->app_data_size(); ++i) {
526     const mcs_proto::AppData& app_data = data_message->app_data(i);
527     if (app_data.key() == kIdleNotification) {
528       // Tell the MCS server the client is not idle.
529       send = true;
530       mcs_proto::AppData data;
531       data.set_key(kIdleNotification);
532       data.set_value("false");
533       response->add_app_data()->CopyFrom(data);
534       response->set_category(kMCSCategory);
535     }
536   }
537
538   if (send) {
539     SendMessage(
540         MCSMessage(kDataMessageStanzaTag,
541                    response.PassAs<const google::protobuf::MessageLite>()));
542   }
543 }
544
545 void MCSClient::HandlePacketFromWire(
546     scoped_ptr<google::protobuf::MessageLite> protobuf) {
547   if (!protobuf.get())
548     return;
549   uint8 tag = GetMCSProtoTag(*protobuf);
550   PersistentId persistent_id = GetPersistentId(*protobuf);
551   StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
552
553   if (last_stream_id_received != 0) {
554     last_device_to_server_stream_id_received_ = last_stream_id_received;
555
556     // Process device to server messages that have now been acknowledged by the
557     // server. Because messages are stored in order, just pop off all that have
558     // a stream id lower than server's last received stream id.
559     HandleStreamAck(last_stream_id_received);
560
561     // Process server_to_device_messages that the server now knows were
562     // acknowledged. Again, they're in order, so just keep going until the
563     // stream id is reached.
564     StreamIdList acked_stream_ids_to_remove;
565     for (std::map<StreamId, PersistentIdList>::iterator iter =
566              acked_server_ids_.begin();
567          iter != acked_server_ids_.end() &&
568              iter->first <= last_stream_id_received; ++iter) {
569       acked_stream_ids_to_remove.push_back(iter->first);
570     }
571     for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
572          iter != acked_stream_ids_to_remove.end(); ++iter) {
573       acked_server_ids_.erase(*iter);
574     }
575   }
576
577   ++stream_id_in_;
578   if (!persistent_id.empty()) {
579     unacked_server_ids_[stream_id_in_] = persistent_id;
580     gcm_store_->AddIncomingMessage(persistent_id,
581                                    base::Bind(&MCSClient::OnGCMUpdateFinished,
582                                               weak_ptr_factory_.GetWeakPtr()));
583   }
584
585   DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
586            << " with persistent id "
587            << (persistent_id.empty() ? "NULL" : persistent_id)
588            << ", stream id " << stream_id_in_ << " and last stream id received "
589            << last_stream_id_received;
590
591   if (unacked_server_ids_.size() > 0 &&
592       unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
593     SendMessage(MCSMessage(kIqStanzaTag,
594                            BuildStreamAck().
595                                PassAs<const google::protobuf::MessageLite>()));
596   }
597
598   // The connection is alive, treat this message as a heartbeat ack.
599   heartbeat_manager_.OnHeartbeatAcked();
600
601   switch (tag) {
602     case kLoginResponseTag: {
603       DCHECK_EQ(CONNECTING, state_);
604       mcs_proto::LoginResponse* login_response =
605           reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
606       DVLOG(1) << "Received login response:";
607       DVLOG(1) << "  Id: " << login_response->id();
608       DVLOG(1) << "  Timestamp: " << login_response->server_timestamp();
609       if (login_response->has_error() && login_response->error().code() != 0) {
610         state_ = UNINITIALIZED;
611         DVLOG(1) << "  Error code: " << login_response->error().code();
612         DVLOG(1) << "  Error message: " << login_response->error().message();
613         LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
614         connection_factory_->SignalConnectionReset(
615             ConnectionFactory::LOGIN_FAILURE);
616         mcs_error_callback_.Run();
617         return;
618       }
619
620       if (login_response->has_heartbeat_config()) {
621         heartbeat_manager_.UpdateHeartbeatConfig(
622             login_response->heartbeat_config());
623       }
624
625       state_ = CONNECTED;
626       stream_id_in_ = 1;  // To account for the login response.
627       DCHECK_EQ(1U, stream_id_out_);
628
629       // Pass the login response on up.
630       base::MessageLoop::current()->PostTask(
631           FROM_HERE,
632           base::Bind(message_received_callback_,
633                      MCSMessage(tag,
634                                 protobuf.PassAs<
635                                     const google::protobuf::MessageLite>())));
636
637       // If there are pending messages, attempt to send one.
638       if (!to_send_.empty()) {
639         base::MessageLoop::current()->PostTask(
640             FROM_HERE,
641             base::Bind(&MCSClient::MaybeSendMessage,
642                        weak_ptr_factory_.GetWeakPtr()));
643       }
644
645       heartbeat_manager_.Start(
646           base::Bind(&MCSClient::SendHeartbeat,
647                      weak_ptr_factory_.GetWeakPtr()),
648           base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
649                      weak_ptr_factory_.GetWeakPtr()));
650       return;
651     }
652     case kHeartbeatPingTag:
653       DCHECK_GE(stream_id_in_, 1U);
654       DVLOG(1) << "Received heartbeat ping, sending ack.";
655       SendMessage(
656           MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
657       return;
658     case kHeartbeatAckTag:
659       DCHECK_GE(stream_id_in_, 1U);
660       DVLOG(1) << "Received heartbeat ack.";
661       // Do nothing else, all messages act as heartbeat acks.
662       return;
663     case kCloseTag:
664       LOG(ERROR) << "Received close command, resetting connection.";
665       state_ = LOADED;
666       connection_factory_->SignalConnectionReset(
667           ConnectionFactory::CLOSE_COMMAND);
668       return;
669     case kIqStanzaTag: {
670       DCHECK_GE(stream_id_in_, 1U);
671       mcs_proto::IqStanza* iq_stanza =
672           reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
673       const mcs_proto::Extension& iq_extension = iq_stanza->extension();
674       switch (iq_extension.id()) {
675         case kSelectiveAck: {
676           PersistentIdList acked_ids;
677           if (BuildPersistentIdListFromProto(iq_extension.data(),
678                                              &acked_ids)) {
679             HandleSelectiveAck(acked_ids);
680           }
681           return;
682         }
683         case kStreamAck:
684           // Do nothing. The last received stream id is always processed if it's
685           // present.
686           return;
687         default:
688           LOG(WARNING) << "Received invalid iq stanza extension "
689                        << iq_extension.id();
690           return;
691       }
692     }
693     case kDataMessageStanzaTag: {
694       DCHECK_GE(stream_id_in_, 1U);
695       mcs_proto::DataMessageStanza* data_message =
696           reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
697       if (data_message->category() == kMCSCategory) {
698         HandleMCSDataMesssage(protobuf.Pass());
699         return;
700       }
701
702       DCHECK(protobuf.get());
703       base::MessageLoop::current()->PostTask(
704           FROM_HERE,
705           base::Bind(message_received_callback_,
706                      MCSMessage(tag,
707                                 protobuf.PassAs<
708                                     const google::protobuf::MessageLite>())));
709       return;
710     }
711     default:
712       LOG(ERROR) << "Received unexpected message of type "
713                  << static_cast<int>(tag);
714       return;
715   }
716 }
717
718 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
719   PersistentIdList acked_outgoing_persistent_ids;
720   StreamIdList acked_outgoing_stream_ids;
721   while (!to_resend_.empty() &&
722          to_resend_.front()->stream_id <= last_stream_id_received) {
723     const MCSPacketInternal& outgoing_packet = to_resend_.front();
724     acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
725     acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
726     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
727     to_resend_.pop_front();
728   }
729
730   DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
731            << " outgoing messages, " << to_resend_.size()
732            << " remaining unacked";
733   gcm_store_->RemoveOutgoingMessages(
734       acked_outgoing_persistent_ids,
735       base::Bind(&MCSClient::OnGCMUpdateFinished,
736                  weak_ptr_factory_.GetWeakPtr()));
737
738   HandleServerConfirmedReceipt(last_stream_id_received);
739 }
740
741 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
742   // First check the to_resend_ queue. Acknowledgments should always happen
743   // in the order they were sent, so if messages are present they should match
744   // the acknowledge list.
745   PersistentIdList::const_iterator iter = id_list.begin();
746   for (; iter != id_list.end() && !to_resend_.empty(); ++iter) {
747     const MCSPacketInternal& outgoing_packet = to_resend_.front();
748     DCHECK_EQ(outgoing_packet->persistent_id, *iter);
749     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
750
751     // No need to re-acknowledge any server messages this message already
752     // acknowledged.
753     StreamId device_stream_id = outgoing_packet->stream_id;
754     HandleServerConfirmedReceipt(device_stream_id);
755
756     to_resend_.pop_front();
757   }
758
759   // If the acknowledged ids aren't all there, they might be in the to_send_
760   // queue (typically when a StreamAck confirms messages as part of a login
761   // response).
762   for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
763     const MCSPacketInternal& outgoing_packet = PopMessageForSend();
764     DCHECK_EQ(outgoing_packet->persistent_id, *iter);
765     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
766
767     // No need to re-acknowledge any server messages this message already
768     // acknowledged.
769     StreamId device_stream_id = outgoing_packet->stream_id;
770     HandleServerConfirmedReceipt(device_stream_id);
771   }
772
773   DCHECK(iter == id_list.end());
774
775   DVLOG(1) << "Server acked " << id_list.size()
776            << " messages, " << to_resend_.size() << " remaining unacked.";
777   gcm_store_->RemoveOutgoingMessages(
778       id_list,
779       base::Bind(&MCSClient::OnGCMUpdateFinished,
780                  weak_ptr_factory_.GetWeakPtr()));
781
782   // Resend any remaining outgoing messages, as they were not received by the
783   // server.
784   DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
785   while (!to_resend_.empty()) {
786     to_send_.push_front(to_resend_.back());
787     to_resend_.pop_back();
788   }
789 }
790
791 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
792   PersistentIdList acked_incoming_ids;
793   for (std::map<StreamId, PersistentIdList>::iterator iter =
794            acked_server_ids_.begin();
795        iter != acked_server_ids_.end() &&
796            iter->first <= device_stream_id;) {
797     acked_incoming_ids.insert(acked_incoming_ids.end(),
798                               iter->second.begin(),
799                               iter->second.end());
800     acked_server_ids_.erase(iter++);
801   }
802
803   DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
804            << " acknowledged server messages.";
805   gcm_store_->RemoveIncomingMessages(
806       acked_incoming_ids,
807       base::Bind(&MCSClient::OnGCMUpdateFinished,
808                  weak_ptr_factory_.GetWeakPtr()));
809 }
810
811 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
812   return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
813 }
814
815 void MCSClient::OnConnectionResetByHeartbeat() {
816   connection_factory_->SignalConnectionReset(
817       ConnectionFactory::HEARTBEAT_FAILURE);
818 }
819
820 void MCSClient::NotifyMessageSendStatus(
821     const google::protobuf::MessageLite& protobuf,
822     MessageSendStatus status) {
823   if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
824     return;
825
826   const mcs_proto::DataMessageStanza* data_message_stanza =
827       reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
828   message_sent_callback_.Run(
829       data_message_stanza->device_user_id(),
830       data_message_stanza->category(),
831       data_message_stanza->id(),
832       status);
833 }
834
835 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
836   gcm_store_ = gcm_store;
837 }
838
839 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
840   MCSPacketInternal packet = to_send_.front();
841   to_send_.pop_front();
842
843   if (packet->tag == kDataMessageStanzaTag) {
844     mcs_proto::DataMessageStanza* data_message =
845         reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
846     CollapseKey collapse_key(*data_message);
847     if (collapse_key.IsValid())
848       collapse_key_map_.erase(collapse_key);
849   }
850
851   return packet;
852 }
853
854 } // namespace gcm