Upstream version 11.40.277.0
[platform/framework/web/crosswalk.git] / src / google_apis / gcm / engine / mcs_client.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/engine/mcs_client.h"
6
7 #include <set>
8
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/metrics/histogram.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/time/clock.h"
15 #include "base/time/time.h"
16 #include "base/timer/timer.h"
17 #include "google_apis/gcm/base/mcs_util.h"
18 #include "google_apis/gcm/base/socket_stream.h"
19 #include "google_apis/gcm/engine/connection_factory.h"
20 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
21
22 using namespace google::protobuf::io;
23
24 namespace gcm {
25
26 namespace {
27
28 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
29
30 // The category of messages intended for the GCM client itself from MCS.
31 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
32
33 // The from field for messages originating in the GCM client.
34 const char kGCMFromField[] = "gcm@android.com";
35
36 // MCS status message types.
37 // TODO(zea): handle these at the GCMClient layer.
38 const char kIdleNotification[] = "IdleNotification";
39 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
40 // const char kPowerNotification[] = "PowerNotification";
41 // const char kDataActiveNotification[] = "DataActiveNotification";
42
43 // The number of unacked messages to allow before sending a stream ack.
44 // Applies to both incoming and outgoing messages.
45 // TODO(zea): make this server configurable.
46 const int kUnackedMessageBeforeStreamAck = 10;
47
48 // The global maximum number of pending messages to have in the send queue.
49 const size_t kMaxSendQueueSize = 10 * 1024;
50
51 // The maximum message size that can be sent to the server.
52 const int kMaxMessageBytes = 4 * 1024;  // 4KB, like the server.
53
54 // Helper for converting a proto persistent id list to a vector of strings.
55 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
56                                     std::vector<std::string>* id_list) {
57   mcs_proto::SelectiveAck selective_ack;
58   if (!selective_ack.ParseFromString(bytes))
59     return false;
60   std::vector<std::string> new_list;
61   for (int i = 0; i < selective_ack.id_size(); ++i) {
62     DCHECK(!selective_ack.id(i).empty());
63     new_list.push_back(selective_ack.id(i));
64   }
65   id_list->swap(new_list);
66   return true;
67 }
68
69 }  // namespace
70
71 class CollapseKey {
72  public:
73   explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
74   ~CollapseKey();
75
76   // Comparison operator for use in maps.
77   bool operator<(const CollapseKey& right) const;
78
79   // Whether the message had a valid collapse key.
80   bool IsValid() const;
81
82   std::string token() const { return token_; }
83   std::string app_id() const { return app_id_; }
84   int64 device_user_id() const { return device_user_id_; }
85
86  private:
87   const std::string token_;
88   const std::string app_id_;
89   const int64 device_user_id_;
90 };
91
92 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
93     : token_(message.token()),
94       app_id_(message.category()),
95       device_user_id_(message.device_user_id()) {}
96
97 CollapseKey::~CollapseKey() {}
98
99 bool CollapseKey::IsValid() const {
100   // Device user id is optional, but the application id and token are not.
101   return !token_.empty() && !app_id_.empty();
102 }
103
104 bool CollapseKey::operator<(const CollapseKey& right) const {
105   if (device_user_id_ != right.device_user_id())
106     return device_user_id_ < right.device_user_id();
107   if (app_id_ != right.app_id())
108     return app_id_ < right.app_id();
109   return token_ < right.token();
110 }
111
112 struct ReliablePacketInfo {
113   ReliablePacketInfo();
114   ~ReliablePacketInfo();
115
116   // The stream id with which the message was sent.
117   uint32 stream_id;
118
119   // If reliable delivery was requested, the persistent id of the message.
120   std::string persistent_id;
121
122   // The type of message itself (for easier lookup).
123   uint8 tag;
124
125   // The protobuf of the message itself.
126   MCSProto protobuf;
127 };
128
129 ReliablePacketInfo::ReliablePacketInfo()
130   : stream_id(0), tag(0) {
131 }
132 ReliablePacketInfo::~ReliablePacketInfo() {}
133
134 int MCSClient::GetSendQueueSize() const {
135   return to_send_.size();
136 }
137
138 int MCSClient::GetResendQueueSize() const {
139   return to_resend_.size();
140 }
141
142 std::string MCSClient::GetStateString() const {
143   switch(state_) {
144     case UNINITIALIZED:
145       return "UNINITIALIZED";
146     case LOADED:
147       return "LOADED";
148     case CONNECTING:
149       return "CONNECTING";
150     case CONNECTED:
151       return "CONNECTED";
152     default:
153       NOTREACHED();
154       return std::string();
155   }
156 }
157
158 MCSClient::MCSClient(const std::string& version_string,
159                      base::Clock* clock,
160                      ConnectionFactory* connection_factory,
161                      GCMStore* gcm_store,
162                      GCMStatsRecorder* recorder,
163                      scoped_ptr<base::Timer> heartbeat_timer)
164     : version_string_(version_string),
165       clock_(clock),
166       state_(UNINITIALIZED),
167       android_id_(0),
168       security_token_(0),
169       connection_factory_(connection_factory),
170       connection_handler_(NULL),
171       last_device_to_server_stream_id_received_(0),
172       last_server_to_device_stream_id_received_(0),
173       stream_id_out_(0),
174       stream_id_in_(0),
175       gcm_store_(gcm_store),
176       heartbeat_manager_(heartbeat_timer.Pass()),
177       recorder_(recorder),
178       weak_ptr_factory_(this) {
179 }
180
181 MCSClient::~MCSClient() {
182 }
183
184 void MCSClient::Initialize(
185     const ErrorCallback& error_callback,
186     const OnMessageReceivedCallback& message_received_callback,
187     const OnMessageSentCallback& message_sent_callback,
188     scoped_ptr<GCMStore::LoadResult> load_result) {
189   DCHECK_EQ(state_, UNINITIALIZED);
190
191   state_ = LOADED;
192   mcs_error_callback_ = error_callback;
193   message_received_callback_ = message_received_callback;
194   message_sent_callback_ = message_sent_callback;
195
196   connection_factory_->Initialize(
197       base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
198                  weak_ptr_factory_.GetWeakPtr()),
199       base::Bind(&MCSClient::HandlePacketFromWire,
200                  weak_ptr_factory_.GetWeakPtr()),
201       base::Bind(&MCSClient::MaybeSendMessage,
202                  weak_ptr_factory_.GetWeakPtr()));
203   connection_handler_ = connection_factory_->GetConnectionHandler();
204
205   stream_id_out_ = 1;  // Login request is hardcoded to id 1.
206
207   android_id_ = load_result->device_android_id;
208   security_token_ = load_result->device_security_token;
209
210   if (android_id_ == 0) {
211     DVLOG(1) << "No device credentials found, assuming new client.";
212     // No need to try and load RMQ data in that case.
213     return;
214   }
215
216   // |android_id_| is non-zero, so should |security_token_|.
217   DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
218                                  << " is non-zero.";
219
220   DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
221            << " incoming acks pending and "
222            << load_result->outgoing_messages.size()
223            << " outgoing messages pending.";
224
225   restored_unackeds_server_ids_ = load_result->incoming_messages;
226
227   // First go through and order the outgoing messages by recency.
228   std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
229   std::vector<PersistentId> expired_ttl_ids;
230   for (GCMStore::OutgoingMessageMap::iterator iter =
231            load_result->outgoing_messages.begin();
232        iter != load_result->outgoing_messages.end(); ++iter) {
233     uint64 timestamp = 0;
234     if (!base::StringToUint64(iter->first, &timestamp)) {
235       LOG(ERROR) << "Invalid restored message.";
236       // TODO(fgorski): Error: data unreadable
237       mcs_error_callback_.Run();
238       return;
239     }
240
241     // Check if the TTL has expired for this message.
242     if (HasTTLExpired(*iter->second, clock_)) {
243       expired_ttl_ids.push_back(iter->first);
244       NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
245       continue;
246     }
247
248     ordered_messages[timestamp] = iter->second.release();
249   }
250
251   if (!expired_ttl_ids.empty()) {
252     gcm_store_->RemoveOutgoingMessages(
253         expired_ttl_ids,
254         base::Bind(&MCSClient::OnGCMUpdateFinished,
255                    weak_ptr_factory_.GetWeakPtr()));
256   }
257
258   // Now go through and add the outgoing messages to the send queue in their
259   // appropriate order (oldest at front, most recent at back).
260   for (std::map<uint64, google::protobuf::MessageLite*>::iterator
261            iter = ordered_messages.begin();
262        iter != ordered_messages.end(); ++iter) {
263     ReliablePacketInfo* packet_info = new ReliablePacketInfo();
264     packet_info->protobuf.reset(iter->second);
265     packet_info->tag = GetMCSProtoTag(*iter->second);
266     packet_info->persistent_id = base::Uint64ToString(iter->first);
267     to_send_.push_back(make_linked_ptr(packet_info));
268
269     if (packet_info->tag == kDataMessageStanzaTag) {
270       mcs_proto::DataMessageStanza* data_message =
271           reinterpret_cast<mcs_proto::DataMessageStanza*>(
272               packet_info->protobuf.get());
273       CollapseKey collapse_key(*data_message);
274       if (collapse_key.IsValid())
275         collapse_key_map_[collapse_key] = packet_info;
276     }
277   }
278 }
279
280 void MCSClient::Login(uint64 android_id, uint64 security_token) {
281   DCHECK_EQ(state_, LOADED);
282   DCHECK(android_id_ == 0 || android_id_ == android_id);
283   DCHECK(security_token_ == 0 || security_token_ == security_token);
284
285   if (android_id != android_id_ && security_token != security_token_) {
286     DCHECK(android_id);
287     DCHECK(security_token);
288     android_id_ = android_id;
289     security_token_ = security_token;
290   }
291
292   DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
293
294   state_ = CONNECTING;
295   connection_factory_->Connect();
296 }
297
298 void MCSClient::SendMessage(const MCSMessage& message) {
299   int ttl = GetTTL(message.GetProtobuf());
300   DCHECK_GE(ttl, 0);
301   if (to_send_.size() > kMaxSendQueueSize) {
302     NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
303     return;
304   }
305   if (message.size() > kMaxMessageBytes) {
306     NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
307     return;
308   }
309
310   scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
311   packet_info->tag = message.tag();
312   packet_info->protobuf = message.CloneProtobuf();
313
314   if (ttl > 0) {
315     DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
316
317     // First check if this message should replace a pending message with the
318     // same collapse key.
319     mcs_proto::DataMessageStanza* data_message =
320         reinterpret_cast<mcs_proto::DataMessageStanza*>(
321             packet_info->protobuf.get());
322     CollapseKey collapse_key(*data_message);
323     if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
324       ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
325       DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
326                << original_packet->persistent_id;
327       original_packet->protobuf = packet_info->protobuf.Pass();
328       SetPersistentId(original_packet->persistent_id,
329                       original_packet->protobuf.get());
330       gcm_store_->OverwriteOutgoingMessage(
331           original_packet->persistent_id,
332           message,
333           base::Bind(&MCSClient::OnGCMUpdateFinished,
334                      weak_ptr_factory_.GetWeakPtr()));
335
336       // The message is already queued, return.
337       return;
338     } else {
339       PersistentId persistent_id = GetNextPersistentId();
340       DVLOG(1) << "Setting persistent id to " << persistent_id;
341       packet_info->persistent_id = persistent_id;
342       SetPersistentId(persistent_id, packet_info->protobuf.get());
343       if (!gcm_store_->AddOutgoingMessage(
344                persistent_id,
345                MCSMessage(message.tag(), *(packet_info->protobuf)),
346                base::Bind(&MCSClient::OnGCMUpdateFinished,
347                           weak_ptr_factory_.GetWeakPtr()))) {
348         NotifyMessageSendStatus(message.GetProtobuf(),
349                                 APP_QUEUE_SIZE_LIMIT_REACHED);
350         return;
351       }
352     }
353
354     if (collapse_key.IsValid())
355       collapse_key_map_[collapse_key] = packet_info.get();
356   } else if (!connection_factory_->IsEndpointReachable()) {
357     DVLOG(1) << "No active connection, dropping message.";
358     NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
359     return;
360   }
361
362   to_send_.push_back(make_linked_ptr(packet_info.release()));
363
364   // Notify that the messages has been succsfully queued for sending.
365   // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
366   NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
367
368   MaybeSendMessage();
369 }
370
371 void MCSClient::ResetStateAndBuildLoginRequest(
372     mcs_proto::LoginRequest* request) {
373   DCHECK(android_id_);
374   DCHECK(security_token_);
375   stream_id_in_ = 0;
376   stream_id_out_ = 1;
377   last_device_to_server_stream_id_received_ = 0;
378   last_server_to_device_stream_id_received_ = 0;
379
380   heartbeat_manager_.Stop();
381
382   // Add any pending acknowledgments to the list of ids.
383   for (StreamIdToPersistentIdMap::const_iterator iter =
384            unacked_server_ids_.begin();
385        iter != unacked_server_ids_.end(); ++iter) {
386     restored_unackeds_server_ids_.push_back(iter->second);
387   }
388   unacked_server_ids_.clear();
389
390   // Any acknowledged server ids which have not been confirmed by the server
391   // are treated like unacknowledged ids.
392   for (std::map<StreamId, PersistentIdList>::const_iterator iter =
393            acked_server_ids_.begin();
394        iter != acked_server_ids_.end(); ++iter) {
395     restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
396                                          iter->second.begin(),
397                                          iter->second.end());
398   }
399   acked_server_ids_.clear();
400
401   // Then build the request, consuming all pending acknowledgments.
402   request->Swap(BuildLoginRequest(android_id_,
403                                   security_token_,
404                                   version_string_).get());
405   for (PersistentIdList::const_iterator iter =
406            restored_unackeds_server_ids_.begin();
407        iter != restored_unackeds_server_ids_.end(); ++iter) {
408     request->add_received_persistent_id(*iter);
409   }
410   acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
411   restored_unackeds_server_ids_.clear();
412
413   // Push all unacknowledged messages to front of send queue. No need to save
414   // to RMQ, as all messages that reach this point should already have been
415   // saved as necessary.
416   while (!to_resend_.empty()) {
417     to_send_.push_front(to_resend_.back());
418     to_resend_.pop_back();
419   }
420
421   // Drop all TTL == 0 or expired TTL messages from the queue.
422   std::deque<MCSPacketInternal> new_to_send;
423   std::vector<PersistentId> expired_ttl_ids;
424   while (!to_send_.empty()) {
425     MCSPacketInternal packet = PopMessageForSend();
426     if (GetTTL(*packet->protobuf) > 0 &&
427         !HasTTLExpired(*packet->protobuf, clock_)) {
428       new_to_send.push_back(packet);
429     } else {
430       // If the TTL was 0 there is no persistent id, so no need to remove the
431       // message from the persistent store.
432       if (!packet->persistent_id.empty())
433         expired_ttl_ids.push_back(packet->persistent_id);
434       NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
435     }
436   }
437
438   if (!expired_ttl_ids.empty()) {
439     DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
440              << " messages expired.";
441     gcm_store_->RemoveOutgoingMessages(
442         expired_ttl_ids,
443         base::Bind(&MCSClient::OnGCMUpdateFinished,
444                    weak_ptr_factory_.GetWeakPtr()));
445   }
446
447   to_send_.swap(new_to_send);
448
449   DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
450            << " incoming acks pending, and " << to_send_.size()
451            << " pending outgoing messages.";
452
453   state_ = CONNECTING;
454 }
455
456 void MCSClient::SendHeartbeat() {
457   SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
458 }
459
460 void MCSClient::OnGCMUpdateFinished(bool success) {
461   LOG_IF(ERROR, !success) << "GCM Update failed!";
462   UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
463   // TODO(zea): Rebuild the store from scratch in case of persistence failure?
464 }
465
466 void MCSClient::MaybeSendMessage() {
467   if (to_send_.empty())
468     return;
469
470   // If the connection has been reset, do nothing. On reconnection
471   // MaybeSendMessage will be automatically invoked again.
472   // TODO(zea): consider doing TTL expiration at connection reset time, rather
473   // than reconnect time.
474   if (!connection_factory_->IsEndpointReachable())
475     return;
476
477   MCSPacketInternal packet = PopMessageForSend();
478   if (HasTTLExpired(*packet->protobuf, clock_)) {
479     DCHECK(!packet->persistent_id.empty());
480     DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
481     NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
482     gcm_store_->RemoveOutgoingMessage(
483         packet->persistent_id,
484         base::Bind(&MCSClient::OnGCMUpdateFinished,
485                    weak_ptr_factory_.GetWeakPtr()));
486     base::MessageLoop::current()->PostTask(
487             FROM_HERE,
488             base::Bind(&MCSClient::MaybeSendMessage,
489                        weak_ptr_factory_.GetWeakPtr()));
490     return;
491   }
492   DVLOG(1) << "Pending output message found, sending.";
493   if (!packet->persistent_id.empty())
494     to_resend_.push_back(packet);
495   SendPacketToWire(packet.get());
496 }
497
498 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
499   packet_info->stream_id = ++stream_id_out_;
500   DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
501
502   // Set the queued time as necessary.
503   if (packet_info->tag == kDataMessageStanzaTag) {
504     mcs_proto::DataMessageStanza* data_message =
505         reinterpret_cast<mcs_proto::DataMessageStanza*>(
506             packet_info->protobuf.get());
507     uint64 sent = data_message->sent();
508     DCHECK_GT(sent, 0U);
509     int queued = (clock_->Now().ToInternalValue() /
510         base::Time::kMicrosecondsPerSecond) - sent;
511     DVLOG(1) << "Message was queued for " << queued << " seconds.";
512     data_message->set_queued(queued);
513     recorder_->RecordDataSentToWire(
514         data_message->category(),
515         data_message->to(),
516         data_message->id(),
517         queued);
518   }
519
520   // Set the proper last received stream id to acknowledge received server
521   // packets.
522   DVLOG(1) << "Setting last stream id received to "
523            << stream_id_in_;
524   SetLastStreamIdReceived(stream_id_in_,
525                           packet_info->protobuf.get());
526   if (stream_id_in_ != last_server_to_device_stream_id_received_) {
527     last_server_to_device_stream_id_received_ = stream_id_in_;
528     // Mark all acknowledged server messages as such. Note: they're not dropped,
529     // as it may be that they'll need to be re-acked if this message doesn't
530     // make it.
531     PersistentIdList persistent_id_list;
532     for (StreamIdToPersistentIdMap::const_iterator iter =
533              unacked_server_ids_.begin();
534          iter != unacked_server_ids_.end(); ++iter) {
535       DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
536       persistent_id_list.push_back(iter->second);
537     }
538     unacked_server_ids_.clear();
539     acked_server_ids_[stream_id_out_] = persistent_id_list;
540   }
541
542   connection_handler_->SendMessage(*packet_info->protobuf);
543 }
544
545 void MCSClient::HandleMCSDataMesssage(
546     scoped_ptr<google::protobuf::MessageLite> protobuf) {
547   mcs_proto::DataMessageStanza* data_message =
548       reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
549   // TODO(zea): implement a proper status manager rather than hardcoding these
550   // values.
551   scoped_ptr<mcs_proto::DataMessageStanza> response(
552       new mcs_proto::DataMessageStanza());
553   response->set_from(kGCMFromField);
554   response->set_sent(clock_->Now().ToInternalValue() /
555                          base::Time::kMicrosecondsPerSecond);
556   response->set_ttl(0);
557   bool send = false;
558   for (int i = 0; i < data_message->app_data_size(); ++i) {
559     const mcs_proto::AppData& app_data = data_message->app_data(i);
560     if (app_data.key() == kIdleNotification) {
561       // Tell the MCS server the client is not idle.
562       send = true;
563       mcs_proto::AppData data;
564       data.set_key(kIdleNotification);
565       data.set_value("false");
566       response->add_app_data()->CopyFrom(data);
567       response->set_category(kMCSCategory);
568     }
569   }
570
571   if (send) {
572     SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass()));
573   }
574 }
575
576 void MCSClient::HandlePacketFromWire(
577     scoped_ptr<google::protobuf::MessageLite> protobuf) {
578   if (!protobuf.get())
579     return;
580   uint8 tag = GetMCSProtoTag(*protobuf);
581   PersistentId persistent_id = GetPersistentId(*protobuf);
582   StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
583
584   if (last_stream_id_received != 0) {
585     last_device_to_server_stream_id_received_ = last_stream_id_received;
586
587     // Process device to server messages that have now been acknowledged by the
588     // server. Because messages are stored in order, just pop off all that have
589     // a stream id lower than server's last received stream id.
590     HandleStreamAck(last_stream_id_received);
591
592     // Process server_to_device_messages that the server now knows were
593     // acknowledged. Again, they're in order, so just keep going until the
594     // stream id is reached.
595     StreamIdList acked_stream_ids_to_remove;
596     for (std::map<StreamId, PersistentIdList>::iterator iter =
597              acked_server_ids_.begin();
598          iter != acked_server_ids_.end() &&
599              iter->first <= last_stream_id_received; ++iter) {
600       acked_stream_ids_to_remove.push_back(iter->first);
601     }
602     for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
603          iter != acked_stream_ids_to_remove.end(); ++iter) {
604       acked_server_ids_.erase(*iter);
605     }
606   }
607
608   ++stream_id_in_;
609   if (!persistent_id.empty()) {
610     unacked_server_ids_[stream_id_in_] = persistent_id;
611     gcm_store_->AddIncomingMessage(persistent_id,
612                                    base::Bind(&MCSClient::OnGCMUpdateFinished,
613                                               weak_ptr_factory_.GetWeakPtr()));
614   }
615
616   DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
617            << " with persistent id "
618            << (persistent_id.empty() ? "NULL" : persistent_id)
619            << ", stream id " << stream_id_in_ << " and last stream id received "
620            << last_stream_id_received;
621
622   if (unacked_server_ids_.size() > 0 &&
623       unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
624     SendMessage(MCSMessage(kIqStanzaTag, BuildStreamAck()));
625   }
626
627   // The connection is alive, treat this message as a heartbeat ack.
628   heartbeat_manager_.OnHeartbeatAcked();
629
630   switch (tag) {
631     case kLoginResponseTag: {
632       DCHECK_EQ(CONNECTING, state_);
633       mcs_proto::LoginResponse* login_response =
634           reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
635       DVLOG(1) << "Received login response:";
636       DVLOG(1) << "  Id: " << login_response->id();
637       DVLOG(1) << "  Timestamp: " << login_response->server_timestamp();
638       if (login_response->has_error() && login_response->error().code() != 0) {
639         state_ = UNINITIALIZED;
640         DVLOG(1) << "  Error code: " << login_response->error().code();
641         DVLOG(1) << "  Error message: " << login_response->error().message();
642         LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
643         connection_factory_->SignalConnectionReset(
644             ConnectionFactory::LOGIN_FAILURE);
645         mcs_error_callback_.Run();
646         return;
647       }
648
649       if (login_response->has_heartbeat_config()) {
650         heartbeat_manager_.UpdateHeartbeatConfig(
651             login_response->heartbeat_config());
652       }
653
654       state_ = CONNECTED;
655       stream_id_in_ = 1;  // To account for the login response.
656       DCHECK_EQ(1U, stream_id_out_);
657
658       // Pass the login response on up.
659       base::MessageLoop::current()->PostTask(
660           FROM_HERE,
661           base::Bind(message_received_callback_,
662                      MCSMessage(tag, protobuf.Pass())));
663
664       // If there are pending messages, attempt to send one.
665       if (!to_send_.empty()) {
666         base::MessageLoop::current()->PostTask(
667             FROM_HERE,
668             base::Bind(&MCSClient::MaybeSendMessage,
669                        weak_ptr_factory_.GetWeakPtr()));
670       }
671
672       heartbeat_manager_.Start(
673           base::Bind(&MCSClient::SendHeartbeat,
674                      weak_ptr_factory_.GetWeakPtr()),
675           base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
676                      weak_ptr_factory_.GetWeakPtr()));
677       return;
678     }
679     case kHeartbeatPingTag:
680       DCHECK_GE(stream_id_in_, 1U);
681       DVLOG(1) << "Received heartbeat ping, sending ack.";
682       SendMessage(
683           MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
684       return;
685     case kHeartbeatAckTag:
686       DCHECK_GE(stream_id_in_, 1U);
687       DVLOG(1) << "Received heartbeat ack.";
688       // Do nothing else, all messages act as heartbeat acks.
689       return;
690     case kCloseTag:
691       LOG(ERROR) << "Received close command, resetting connection.";
692       state_ = LOADED;
693       connection_factory_->SignalConnectionReset(
694           ConnectionFactory::CLOSE_COMMAND);
695       return;
696     case kIqStanzaTag: {
697       DCHECK_GE(stream_id_in_, 1U);
698       mcs_proto::IqStanza* iq_stanza =
699           reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
700       const mcs_proto::Extension& iq_extension = iq_stanza->extension();
701       switch (iq_extension.id()) {
702         case kSelectiveAck: {
703           PersistentIdList acked_ids;
704           if (BuildPersistentIdListFromProto(iq_extension.data(),
705                                              &acked_ids)) {
706             HandleSelectiveAck(acked_ids);
707           }
708           return;
709         }
710         case kStreamAck:
711           // Do nothing. The last received stream id is always processed if it's
712           // present.
713           return;
714         default:
715           LOG(WARNING) << "Received invalid iq stanza extension "
716                        << iq_extension.id();
717           return;
718       }
719     }
720     case kDataMessageStanzaTag: {
721       DCHECK_GE(stream_id_in_, 1U);
722       mcs_proto::DataMessageStanza* data_message =
723           reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
724       if (data_message->category() == kMCSCategory) {
725         HandleMCSDataMesssage(protobuf.Pass());
726         return;
727       }
728
729       DCHECK(protobuf.get());
730       base::MessageLoop::current()->PostTask(
731           FROM_HERE,
732           base::Bind(message_received_callback_,
733                      MCSMessage(tag, protobuf.Pass())));
734       return;
735     }
736     default:
737       LOG(ERROR) << "Received unexpected message of type "
738                  << static_cast<int>(tag);
739       return;
740   }
741 }
742
743 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
744   PersistentIdList acked_outgoing_persistent_ids;
745   StreamIdList acked_outgoing_stream_ids;
746   while (!to_resend_.empty() &&
747          to_resend_.front()->stream_id <= last_stream_id_received) {
748     const MCSPacketInternal& outgoing_packet = to_resend_.front();
749     acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
750     acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
751     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
752     to_resend_.pop_front();
753   }
754
755   DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
756            << " outgoing messages, " << to_resend_.size()
757            << " remaining unacked";
758   gcm_store_->RemoveOutgoingMessages(
759       acked_outgoing_persistent_ids,
760       base::Bind(&MCSClient::OnGCMUpdateFinished,
761                  weak_ptr_factory_.GetWeakPtr()));
762
763   HandleServerConfirmedReceipt(last_stream_id_received);
764 }
765
766 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
767   std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
768
769   StreamId last_stream_id_received = 0;
770
771   // First check the to_resend_ queue. Acknowledgments are always contiguous,
772   // so if there's a pending message that hasn't been acked, all newer messages
773   // must also be unacked.
774   while(!to_resend_.empty() && !remaining_ids.empty()) {
775     const MCSPacketInternal& outgoing_packet = to_resend_.front();
776     if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
777       break;  // Newer message must be unacked too.
778     remaining_ids.erase(outgoing_packet->persistent_id);
779     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
780
781     // No need to re-acknowledge any server messages this message already
782     // acknowledged.
783     StreamId device_stream_id = outgoing_packet->stream_id;
784     if (device_stream_id > last_stream_id_received)
785       last_stream_id_received = device_stream_id;
786     to_resend_.pop_front();
787   }
788
789   // If the acknowledged ids aren't all there, they might be in the to_send_
790   // queue (typically when a SelectiveAck confirms messages as part of a login
791   // response).
792   while (!to_send_.empty() && !remaining_ids.empty()) {
793     const MCSPacketInternal& outgoing_packet = to_send_.front();
794     if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
795       break;  // Newer messages must be unacked too.
796     remaining_ids.erase(outgoing_packet->persistent_id);
797     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
798
799     // No need to re-acknowledge any server messages this message already
800     // acknowledged.
801     StreamId device_stream_id = outgoing_packet->stream_id;
802     if (device_stream_id > last_stream_id_received)
803       last_stream_id_received = device_stream_id;
804     PopMessageForSend();
805   }
806
807   // Only handle the largest stream id value. All other stream ids are
808   // implicitly handled.
809   if (last_stream_id_received > 0)
810     HandleServerConfirmedReceipt(last_stream_id_received);
811
812   // At this point, all remaining acked ids are redundant.
813   PersistentIdList acked_ids;
814   if (remaining_ids.size() > 0) {
815     for (size_t i = 0; i < id_list.size(); ++i) {
816       if (remaining_ids.count(id_list[i]) > 0)
817         continue;
818       acked_ids.push_back(id_list[i]);
819     }
820   } else {
821     acked_ids = id_list;
822   }
823
824   DVLOG(1) << "Server acked " << acked_ids.size()
825            << " messages, " << to_resend_.size() << " remaining unacked.";
826   gcm_store_->RemoveOutgoingMessages(
827       acked_ids,
828       base::Bind(&MCSClient::OnGCMUpdateFinished,
829                  weak_ptr_factory_.GetWeakPtr()));
830
831   // Resend any remaining outgoing messages, as they were not received by the
832   // server.
833   DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
834   while (!to_resend_.empty()) {
835     to_send_.push_front(to_resend_.back());
836     to_resend_.pop_back();
837   }
838   base::MessageLoop::current()->PostTask(
839       FROM_HERE,
840       base::Bind(&MCSClient::MaybeSendMessage,
841                  weak_ptr_factory_.GetWeakPtr()));
842 }
843
844 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
845   PersistentIdList acked_incoming_ids;
846   for (std::map<StreamId, PersistentIdList>::iterator iter =
847            acked_server_ids_.begin();
848        iter != acked_server_ids_.end() &&
849            iter->first <= device_stream_id;) {
850     acked_incoming_ids.insert(acked_incoming_ids.end(),
851                               iter->second.begin(),
852                               iter->second.end());
853     acked_server_ids_.erase(iter++);
854   }
855
856   DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
857            << " acknowledged server messages.";
858   gcm_store_->RemoveIncomingMessages(
859       acked_incoming_ids,
860       base::Bind(&MCSClient::OnGCMUpdateFinished,
861                  weak_ptr_factory_.GetWeakPtr()));
862 }
863
864 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
865   return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
866 }
867
868 void MCSClient::OnConnectionResetByHeartbeat() {
869   connection_factory_->SignalConnectionReset(
870       ConnectionFactory::HEARTBEAT_FAILURE);
871 }
872
873 void MCSClient::NotifyMessageSendStatus(
874     const google::protobuf::MessageLite& protobuf,
875     MessageSendStatus status) {
876   if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
877     return;
878
879   const mcs_proto::DataMessageStanza* data_message_stanza =
880       reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
881   recorder_->RecordNotifySendStatus(
882       data_message_stanza->category(),
883       data_message_stanza->to(),
884       data_message_stanza->id(),
885       status,
886       protobuf.ByteSize(),
887       data_message_stanza->ttl());
888   message_sent_callback_.Run(
889       data_message_stanza->device_user_id(),
890       data_message_stanza->category(),
891       data_message_stanza->id(),
892       status);
893 }
894
895 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
896   MCSPacketInternal packet = to_send_.front();
897   to_send_.pop_front();
898
899   if (packet->tag == kDataMessageStanzaTag) {
900     mcs_proto::DataMessageStanza* data_message =
901         reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
902     CollapseKey collapse_key(*data_message);
903     if (collapse_key.IsValid())
904       collapse_key_map_.erase(collapse_key);
905   }
906
907   return packet;
908 }
909
910 } // namespace gcm