1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/time/clock.h"
12 #include "base/time/time.h"
13 #include "google_apis/gcm/base/mcs_util.h"
14 #include "google_apis/gcm/base/socket_stream.h"
15 #include "google_apis/gcm/engine/connection_factory.h"
17 using namespace google::protobuf::io;
23 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
25 // The category of messages intended for the GCM client itself from MCS.
26 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
28 // The from field for messages originating in the GCM client.
29 const char kGCMFromField[] = "gcm@android.com";
31 // MCS status message types.
32 // TODO(zea): handle these at the GCMClient layer.
33 const char kIdleNotification[] = "IdleNotification";
34 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
35 // const char kPowerNotification[] = "PowerNotification";
36 // const char kDataActiveNotification[] = "DataActiveNotification";
38 // The number of unacked messages to allow before sending a stream ack.
39 // Applies to both incoming and outgoing messages.
40 // TODO(zea): make this server configurable.
41 const int kUnackedMessageBeforeStreamAck = 10;
43 // The global maximum number of pending messages to have in the send queue.
44 const size_t kMaxSendQueueSize = 10 * 1024;
46 // The maximum message size that can be sent to the server.
47 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
49 // Helper for converting a proto persistent id list to a vector of strings.
50 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
51 std::vector<std::string>* id_list) {
52 mcs_proto::SelectiveAck selective_ack;
53 if (!selective_ack.ParseFromString(bytes))
55 std::vector<std::string> new_list;
56 for (int i = 0; i < selective_ack.id_size(); ++i) {
57 DCHECK(!selective_ack.id(i).empty());
58 new_list.push_back(selective_ack.id(i));
60 id_list->swap(new_list);
68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
71 // Comparison operator for use in maps.
72 bool operator<(const CollapseKey& right) const;
74 // Whether the message had a valid collapse key.
77 std::string token() const { return token_; }
78 std::string app_id() const { return app_id_; }
79 int64 device_user_id() const { return device_user_id_; }
82 const std::string token_;
83 const std::string app_id_;
84 const int64 device_user_id_;
87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
88 : token_(message.token()),
89 app_id_(message.category()),
90 device_user_id_(message.device_user_id()) {}
92 CollapseKey::~CollapseKey() {}
94 bool CollapseKey::IsValid() const {
95 // Device user id is optional, but the application id and token are not.
96 return !token_.empty() && !app_id_.empty();
99 bool CollapseKey::operator<(const CollapseKey& right) const {
100 if (device_user_id_ != right.device_user_id())
101 return device_user_id_ < right.device_user_id();
102 if (app_id_ != right.app_id())
103 return app_id_ < right.app_id();
104 return token_ < right.token();
107 struct ReliablePacketInfo {
108 ReliablePacketInfo();
109 ~ReliablePacketInfo();
111 // The stream id with which the message was sent.
114 // If reliable delivery was requested, the persistent id of the message.
115 std::string persistent_id;
117 // The type of message itself (for easier lookup).
120 // The protobuf of the message itself.
124 ReliablePacketInfo::ReliablePacketInfo()
125 : stream_id(0), tag(0) {
127 ReliablePacketInfo::~ReliablePacketInfo() {}
129 MCSClient::MCSClient(const std::string& version_string,
131 ConnectionFactory* connection_factory,
133 : version_string_(version_string),
135 state_(UNINITIALIZED),
138 connection_factory_(connection_factory),
139 connection_handler_(NULL),
140 last_device_to_server_stream_id_received_(0),
141 last_server_to_device_stream_id_received_(0),
144 gcm_store_(gcm_store),
145 weak_ptr_factory_(this) {
148 MCSClient::~MCSClient() {
151 void MCSClient::Initialize(
152 const ErrorCallback& error_callback,
153 const OnMessageReceivedCallback& message_received_callback,
154 const OnMessageSentCallback& message_sent_callback,
155 scoped_ptr<GCMStore::LoadResult> load_result) {
156 DCHECK_EQ(state_, UNINITIALIZED);
159 mcs_error_callback_ = error_callback;
160 message_received_callback_ = message_received_callback;
161 message_sent_callback_ = message_sent_callback;
163 connection_factory_->Initialize(
164 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
165 weak_ptr_factory_.GetWeakPtr()),
166 base::Bind(&MCSClient::HandlePacketFromWire,
167 weak_ptr_factory_.GetWeakPtr()),
168 base::Bind(&MCSClient::MaybeSendMessage,
169 weak_ptr_factory_.GetWeakPtr()));
170 connection_handler_ = connection_factory_->GetConnectionHandler();
172 stream_id_out_ = 1; // Login request is hardcoded to id 1.
174 android_id_ = load_result->device_android_id;
175 security_token_ = load_result->device_security_token;
177 if (android_id_ == 0) {
178 DVLOG(1) << "No device credentials found, assuming new client.";
179 // No need to try and load RMQ data in that case.
183 // |android_id_| is non-zero, so should |security_token_|.
184 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
187 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
188 << " incoming acks pending and "
189 << load_result->outgoing_messages.size()
190 << " outgoing messages pending.";
192 restored_unackeds_server_ids_ = load_result->incoming_messages;
194 // First go through and order the outgoing messages by recency.
195 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
196 std::vector<PersistentId> expired_ttl_ids;
197 for (GCMStore::OutgoingMessageMap::iterator iter =
198 load_result->outgoing_messages.begin();
199 iter != load_result->outgoing_messages.end(); ++iter) {
200 uint64 timestamp = 0;
201 if (!base::StringToUint64(iter->first, ×tamp)) {
202 LOG(ERROR) << "Invalid restored message.";
203 // TODO(fgorski): Error: data unreadable
204 mcs_error_callback_.Run();
208 // Check if the TTL has expired for this message.
209 if (HasTTLExpired(*iter->second, clock_)) {
210 expired_ttl_ids.push_back(iter->first);
211 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
215 ordered_messages[timestamp] = iter->second.release();
218 if (!expired_ttl_ids.empty()) {
219 gcm_store_->RemoveOutgoingMessages(
221 base::Bind(&MCSClient::OnGCMUpdateFinished,
222 weak_ptr_factory_.GetWeakPtr()));
225 // Now go through and add the outgoing messages to the send queue in their
226 // appropriate order (oldest at front, most recent at back).
227 for (std::map<uint64, google::protobuf::MessageLite*>::iterator
228 iter = ordered_messages.begin();
229 iter != ordered_messages.end(); ++iter) {
230 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
231 packet_info->protobuf.reset(iter->second);
232 packet_info->tag = GetMCSProtoTag(*iter->second);
233 packet_info->persistent_id = base::Uint64ToString(iter->first);
234 to_send_.push_back(make_linked_ptr(packet_info));
236 if (packet_info->tag == kDataMessageStanzaTag) {
237 mcs_proto::DataMessageStanza* data_message =
238 reinterpret_cast<mcs_proto::DataMessageStanza*>(
239 packet_info->protobuf.get());
240 CollapseKey collapse_key(*data_message);
241 if (collapse_key.IsValid())
242 collapse_key_map_[collapse_key] = packet_info;
247 void MCSClient::Login(uint64 android_id, uint64 security_token) {
248 DCHECK_EQ(state_, LOADED);
249 DCHECK(android_id_ == 0 || android_id_ == android_id);
250 DCHECK(security_token_ == 0 || security_token_ == security_token);
252 if (android_id != android_id_ && security_token != security_token_) {
254 DCHECK(security_token);
255 android_id_ = android_id;
256 security_token_ = security_token;
259 DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
262 connection_factory_->Connect();
265 void MCSClient::SendMessage(const MCSMessage& message) {
266 int ttl = GetTTL(message.GetProtobuf());
268 if (to_send_.size() > kMaxSendQueueSize) {
269 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
272 if (message.size() > kMaxMessageBytes) {
273 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
277 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
278 packet_info->tag = message.tag();
279 packet_info->protobuf = message.CloneProtobuf();
282 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
284 // First check if this message should replace a pending message with the
285 // same collapse key.
286 mcs_proto::DataMessageStanza* data_message =
287 reinterpret_cast<mcs_proto::DataMessageStanza*>(
288 packet_info->protobuf.get());
289 CollapseKey collapse_key(*data_message);
290 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
291 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
292 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
293 << original_packet->persistent_id;
294 original_packet->protobuf = packet_info->protobuf.Pass();
295 SetPersistentId(original_packet->persistent_id,
296 original_packet->protobuf.get());
297 gcm_store_->OverwriteOutgoingMessage(
298 original_packet->persistent_id,
300 base::Bind(&MCSClient::OnGCMUpdateFinished,
301 weak_ptr_factory_.GetWeakPtr()));
303 // The message is already queued, return.
306 PersistentId persistent_id = GetNextPersistentId();
307 DVLOG(1) << "Setting persistent id to " << persistent_id;
308 packet_info->persistent_id = persistent_id;
309 SetPersistentId(persistent_id, packet_info->protobuf.get());
310 if (!gcm_store_->AddOutgoingMessage(
312 MCSMessage(message.tag(), *(packet_info->protobuf)),
313 base::Bind(&MCSClient::OnGCMUpdateFinished,
314 weak_ptr_factory_.GetWeakPtr()))) {
315 NotifyMessageSendStatus(message.GetProtobuf(),
316 APP_QUEUE_SIZE_LIMIT_REACHED);
321 if (collapse_key.IsValid())
322 collapse_key_map_[collapse_key] = packet_info.get();
323 } else if (!connection_factory_->IsEndpointReachable()) {
324 DVLOG(1) << "No active connection, dropping message.";
325 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
329 to_send_.push_back(make_linked_ptr(packet_info.release()));
331 // Notify that the messages has been succsfully queued for sending.
332 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
333 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
338 void MCSClient::Destroy() {
339 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished,
340 weak_ptr_factory_.GetWeakPtr()));
343 void MCSClient::ResetStateAndBuildLoginRequest(
344 mcs_proto::LoginRequest* request) {
346 DCHECK(security_token_);
349 last_device_to_server_stream_id_received_ = 0;
350 last_server_to_device_stream_id_received_ = 0;
352 heartbeat_manager_.Stop();
354 // Add any pending acknowledgments to the list of ids.
355 for (StreamIdToPersistentIdMap::const_iterator iter =
356 unacked_server_ids_.begin();
357 iter != unacked_server_ids_.end(); ++iter) {
358 restored_unackeds_server_ids_.push_back(iter->second);
360 unacked_server_ids_.clear();
362 // Any acknowledged server ids which have not been confirmed by the server
363 // are treated like unacknowledged ids.
364 for (std::map<StreamId, PersistentIdList>::const_iterator iter =
365 acked_server_ids_.begin();
366 iter != acked_server_ids_.end(); ++iter) {
367 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
368 iter->second.begin(),
371 acked_server_ids_.clear();
373 // Then build the request, consuming all pending acknowledgments.
374 request->Swap(BuildLoginRequest(android_id_,
376 version_string_).get());
377 for (PersistentIdList::const_iterator iter =
378 restored_unackeds_server_ids_.begin();
379 iter != restored_unackeds_server_ids_.end(); ++iter) {
380 request->add_received_persistent_id(*iter);
382 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
383 restored_unackeds_server_ids_.clear();
385 // Push all unacknowledged messages to front of send queue. No need to save
386 // to RMQ, as all messages that reach this point should already have been
387 // saved as necessary.
388 while (!to_resend_.empty()) {
389 to_send_.push_front(to_resend_.back());
390 to_resend_.pop_back();
393 // Drop all TTL == 0 or expired TTL messages from the queue.
394 std::deque<MCSPacketInternal> new_to_send;
395 std::vector<PersistentId> expired_ttl_ids;
396 while (!to_send_.empty()) {
397 MCSPacketInternal packet = PopMessageForSend();
398 if (GetTTL(*packet->protobuf) > 0 &&
399 !HasTTLExpired(*packet->protobuf, clock_)) {
400 new_to_send.push_back(packet);
402 // If the TTL was 0 there is no persistent id, so no need to remove the
403 // message from the persistent store.
404 if (!packet->persistent_id.empty())
405 expired_ttl_ids.push_back(packet->persistent_id);
406 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
410 if (!expired_ttl_ids.empty()) {
411 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
412 << " messages expired.";
413 gcm_store_->RemoveOutgoingMessages(
415 base::Bind(&MCSClient::OnGCMUpdateFinished,
416 weak_ptr_factory_.GetWeakPtr()));
419 to_send_.swap(new_to_send);
421 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
422 << " incoming acks pending, and " << to_send_.size()
423 << " pending outgoing messages.";
428 void MCSClient::SendHeartbeat() {
429 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
432 void MCSClient::OnGCMUpdateFinished(bool success) {
433 LOG_IF(ERROR, !success) << "GCM Update failed!";
434 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
435 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
438 void MCSClient::MaybeSendMessage() {
439 if (to_send_.empty())
442 // If the connection has been reset, do nothing. On reconnection
443 // MaybeSendMessage will be automatically invoked again.
444 // TODO(zea): consider doing TTL expiration at connection reset time, rather
445 // than reconnect time.
446 if (!connection_factory_->IsEndpointReachable())
449 MCSPacketInternal packet = PopMessageForSend();
450 if (HasTTLExpired(*packet->protobuf, clock_)) {
451 DCHECK(!packet->persistent_id.empty());
452 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
453 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
454 gcm_store_->RemoveOutgoingMessage(
455 packet->persistent_id,
456 base::Bind(&MCSClient::OnGCMUpdateFinished,
457 weak_ptr_factory_.GetWeakPtr()));
458 base::MessageLoop::current()->PostTask(
460 base::Bind(&MCSClient::MaybeSendMessage,
461 weak_ptr_factory_.GetWeakPtr()));
464 DVLOG(1) << "Pending output message found, sending.";
465 if (!packet->persistent_id.empty())
466 to_resend_.push_back(packet);
467 SendPacketToWire(packet.get());
470 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
471 packet_info->stream_id = ++stream_id_out_;
472 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
474 // Set the queued time as necessary.
475 if (packet_info->tag == kDataMessageStanzaTag) {
476 mcs_proto::DataMessageStanza* data_message =
477 reinterpret_cast<mcs_proto::DataMessageStanza*>(
478 packet_info->protobuf.get());
479 uint64 sent = data_message->sent();
481 int queued = (clock_->Now().ToInternalValue() /
482 base::Time::kMicrosecondsPerSecond) - sent;
483 DVLOG(1) << "Message was queued for " << queued << " seconds.";
484 data_message->set_queued(queued);
487 // Set the proper last received stream id to acknowledge received server
489 DVLOG(1) << "Setting last stream id received to "
491 SetLastStreamIdReceived(stream_id_in_,
492 packet_info->protobuf.get());
493 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
494 last_server_to_device_stream_id_received_ = stream_id_in_;
495 // Mark all acknowledged server messages as such. Note: they're not dropped,
496 // as it may be that they'll need to be re-acked if this message doesn't
498 PersistentIdList persistent_id_list;
499 for (StreamIdToPersistentIdMap::const_iterator iter =
500 unacked_server_ids_.begin();
501 iter != unacked_server_ids_.end(); ++iter) {
502 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
503 persistent_id_list.push_back(iter->second);
505 unacked_server_ids_.clear();
506 acked_server_ids_[stream_id_out_] = persistent_id_list;
509 connection_handler_->SendMessage(*packet_info->protobuf);
512 void MCSClient::HandleMCSDataMesssage(
513 scoped_ptr<google::protobuf::MessageLite> protobuf) {
514 mcs_proto::DataMessageStanza* data_message =
515 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
516 // TODO(zea): implement a proper status manager rather than hardcoding these
518 scoped_ptr<mcs_proto::DataMessageStanza> response(
519 new mcs_proto::DataMessageStanza());
520 response->set_from(kGCMFromField);
521 response->set_sent(clock_->Now().ToInternalValue() /
522 base::Time::kMicrosecondsPerSecond);
523 response->set_ttl(0);
525 for (int i = 0; i < data_message->app_data_size(); ++i) {
526 const mcs_proto::AppData& app_data = data_message->app_data(i);
527 if (app_data.key() == kIdleNotification) {
528 // Tell the MCS server the client is not idle.
530 mcs_proto::AppData data;
531 data.set_key(kIdleNotification);
532 data.set_value("false");
533 response->add_app_data()->CopyFrom(data);
534 response->set_category(kMCSCategory);
540 MCSMessage(kDataMessageStanzaTag,
541 response.PassAs<const google::protobuf::MessageLite>()));
545 void MCSClient::HandlePacketFromWire(
546 scoped_ptr<google::protobuf::MessageLite> protobuf) {
549 uint8 tag = GetMCSProtoTag(*protobuf);
550 PersistentId persistent_id = GetPersistentId(*protobuf);
551 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
553 if (last_stream_id_received != 0) {
554 last_device_to_server_stream_id_received_ = last_stream_id_received;
556 // Process device to server messages that have now been acknowledged by the
557 // server. Because messages are stored in order, just pop off all that have
558 // a stream id lower than server's last received stream id.
559 HandleStreamAck(last_stream_id_received);
561 // Process server_to_device_messages that the server now knows were
562 // acknowledged. Again, they're in order, so just keep going until the
563 // stream id is reached.
564 StreamIdList acked_stream_ids_to_remove;
565 for (std::map<StreamId, PersistentIdList>::iterator iter =
566 acked_server_ids_.begin();
567 iter != acked_server_ids_.end() &&
568 iter->first <= last_stream_id_received; ++iter) {
569 acked_stream_ids_to_remove.push_back(iter->first);
571 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
572 iter != acked_stream_ids_to_remove.end(); ++iter) {
573 acked_server_ids_.erase(*iter);
578 if (!persistent_id.empty()) {
579 unacked_server_ids_[stream_id_in_] = persistent_id;
580 gcm_store_->AddIncomingMessage(persistent_id,
581 base::Bind(&MCSClient::OnGCMUpdateFinished,
582 weak_ptr_factory_.GetWeakPtr()));
585 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
586 << " with persistent id "
587 << (persistent_id.empty() ? "NULL" : persistent_id)
588 << ", stream id " << stream_id_in_ << " and last stream id received "
589 << last_stream_id_received;
591 if (unacked_server_ids_.size() > 0 &&
592 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
593 SendMessage(MCSMessage(kIqStanzaTag,
595 PassAs<const google::protobuf::MessageLite>()));
598 // The connection is alive, treat this message as a heartbeat ack.
599 heartbeat_manager_.OnHeartbeatAcked();
602 case kLoginResponseTag: {
603 DCHECK_EQ(CONNECTING, state_);
604 mcs_proto::LoginResponse* login_response =
605 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
606 DVLOG(1) << "Received login response:";
607 DVLOG(1) << " Id: " << login_response->id();
608 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
609 if (login_response->has_error() && login_response->error().code() != 0) {
610 state_ = UNINITIALIZED;
611 DVLOG(1) << " Error code: " << login_response->error().code();
612 DVLOG(1) << " Error message: " << login_response->error().message();
613 LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
614 connection_factory_->SignalConnectionReset(
615 ConnectionFactory::LOGIN_FAILURE);
616 mcs_error_callback_.Run();
620 if (login_response->has_heartbeat_config()) {
621 heartbeat_manager_.UpdateHeartbeatConfig(
622 login_response->heartbeat_config());
626 stream_id_in_ = 1; // To account for the login response.
627 DCHECK_EQ(1U, stream_id_out_);
629 // Pass the login response on up.
630 base::MessageLoop::current()->PostTask(
632 base::Bind(message_received_callback_,
635 const google::protobuf::MessageLite>())));
637 // If there are pending messages, attempt to send one.
638 if (!to_send_.empty()) {
639 base::MessageLoop::current()->PostTask(
641 base::Bind(&MCSClient::MaybeSendMessage,
642 weak_ptr_factory_.GetWeakPtr()));
645 heartbeat_manager_.Start(
646 base::Bind(&MCSClient::SendHeartbeat,
647 weak_ptr_factory_.GetWeakPtr()),
648 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
649 weak_ptr_factory_.GetWeakPtr()));
652 case kHeartbeatPingTag:
653 DCHECK_GE(stream_id_in_, 1U);
654 DVLOG(1) << "Received heartbeat ping, sending ack.";
656 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
658 case kHeartbeatAckTag:
659 DCHECK_GE(stream_id_in_, 1U);
660 DVLOG(1) << "Received heartbeat ack.";
661 // Do nothing else, all messages act as heartbeat acks.
664 LOG(ERROR) << "Received close command, resetting connection.";
666 connection_factory_->SignalConnectionReset(
667 ConnectionFactory::CLOSE_COMMAND);
670 DCHECK_GE(stream_id_in_, 1U);
671 mcs_proto::IqStanza* iq_stanza =
672 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
673 const mcs_proto::Extension& iq_extension = iq_stanza->extension();
674 switch (iq_extension.id()) {
675 case kSelectiveAck: {
676 PersistentIdList acked_ids;
677 if (BuildPersistentIdListFromProto(iq_extension.data(),
679 HandleSelectiveAck(acked_ids);
684 // Do nothing. The last received stream id is always processed if it's
688 LOG(WARNING) << "Received invalid iq stanza extension "
689 << iq_extension.id();
693 case kDataMessageStanzaTag: {
694 DCHECK_GE(stream_id_in_, 1U);
695 mcs_proto::DataMessageStanza* data_message =
696 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
697 if (data_message->category() == kMCSCategory) {
698 HandleMCSDataMesssage(protobuf.Pass());
702 DCHECK(protobuf.get());
703 base::MessageLoop::current()->PostTask(
705 base::Bind(message_received_callback_,
708 const google::protobuf::MessageLite>())));
712 LOG(ERROR) << "Received unexpected message of type "
713 << static_cast<int>(tag);
718 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
719 PersistentIdList acked_outgoing_persistent_ids;
720 StreamIdList acked_outgoing_stream_ids;
721 while (!to_resend_.empty() &&
722 to_resend_.front()->stream_id <= last_stream_id_received) {
723 const MCSPacketInternal& outgoing_packet = to_resend_.front();
724 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
725 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
726 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
727 to_resend_.pop_front();
730 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
731 << " outgoing messages, " << to_resend_.size()
732 << " remaining unacked";
733 gcm_store_->RemoveOutgoingMessages(
734 acked_outgoing_persistent_ids,
735 base::Bind(&MCSClient::OnGCMUpdateFinished,
736 weak_ptr_factory_.GetWeakPtr()));
738 HandleServerConfirmedReceipt(last_stream_id_received);
741 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
742 // First check the to_resend_ queue. Acknowledgments should always happen
743 // in the order they were sent, so if messages are present they should match
744 // the acknowledge list.
745 PersistentIdList::const_iterator iter = id_list.begin();
746 for (; iter != id_list.end() && !to_resend_.empty(); ++iter) {
747 const MCSPacketInternal& outgoing_packet = to_resend_.front();
748 DCHECK_EQ(outgoing_packet->persistent_id, *iter);
749 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
751 // No need to re-acknowledge any server messages this message already
753 StreamId device_stream_id = outgoing_packet->stream_id;
754 HandleServerConfirmedReceipt(device_stream_id);
756 to_resend_.pop_front();
759 // If the acknowledged ids aren't all there, they might be in the to_send_
760 // queue (typically when a StreamAck confirms messages as part of a login
762 for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
763 const MCSPacketInternal& outgoing_packet = PopMessageForSend();
764 DCHECK_EQ(outgoing_packet->persistent_id, *iter);
765 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
767 // No need to re-acknowledge any server messages this message already
769 StreamId device_stream_id = outgoing_packet->stream_id;
770 HandleServerConfirmedReceipt(device_stream_id);
773 DCHECK(iter == id_list.end());
775 DVLOG(1) << "Server acked " << id_list.size()
776 << " messages, " << to_resend_.size() << " remaining unacked.";
777 gcm_store_->RemoveOutgoingMessages(
779 base::Bind(&MCSClient::OnGCMUpdateFinished,
780 weak_ptr_factory_.GetWeakPtr()));
782 // Resend any remaining outgoing messages, as they were not received by the
784 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
785 while (!to_resend_.empty()) {
786 to_send_.push_front(to_resend_.back());
787 to_resend_.pop_back();
791 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
792 PersistentIdList acked_incoming_ids;
793 for (std::map<StreamId, PersistentIdList>::iterator iter =
794 acked_server_ids_.begin();
795 iter != acked_server_ids_.end() &&
796 iter->first <= device_stream_id;) {
797 acked_incoming_ids.insert(acked_incoming_ids.end(),
798 iter->second.begin(),
800 acked_server_ids_.erase(iter++);
803 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
804 << " acknowledged server messages.";
805 gcm_store_->RemoveIncomingMessages(
807 base::Bind(&MCSClient::OnGCMUpdateFinished,
808 weak_ptr_factory_.GetWeakPtr()));
811 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
812 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
815 void MCSClient::OnConnectionResetByHeartbeat() {
816 connection_factory_->SignalConnectionReset(
817 ConnectionFactory::HEARTBEAT_FAILURE);
820 void MCSClient::NotifyMessageSendStatus(
821 const google::protobuf::MessageLite& protobuf,
822 MessageSendStatus status) {
823 if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
826 const mcs_proto::DataMessageStanza* data_message_stanza =
827 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
828 message_sent_callback_.Run(
829 data_message_stanza->device_user_id(),
830 data_message_stanza->category(),
831 data_message_stanza->id(),
835 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
836 gcm_store_ = gcm_store;
839 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
840 MCSPacketInternal packet = to_send_.front();
841 to_send_.pop_front();
843 if (packet->tag == kDataMessageStanzaTag) {
844 mcs_proto::DataMessageStanza* data_message =
845 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
846 CollapseKey collapse_key(*data_message);
847 if (collapse_key.IsValid())
848 collapse_key_map_.erase(collapse_key);