#include "components/copresence/rpc/rpc_handler.h"
-#include <map>
-
#include "base/bind.h"
#include "base/command_line.h"
#include "base/guid.h"
#include "base/logging.h"
#include "base/strings/string_util.h"
+#include "base/strings/stringprintf.h"
// TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
// to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
#include "components/copresence/copresence_switches.h"
#include "components/copresence/handlers/directive_handler.h"
+#include "components/copresence/handlers/gcm_handler.h"
#include "components/copresence/proto/codes.pb.h"
#include "components/copresence/proto/data.pb.h"
#include "components/copresence/proto/rpcs.pb.h"
+#include "components/copresence/public/copresence_constants.h"
#include "components/copresence/public/copresence_delegate.h"
+#include "components/copresence/rpc/http_post.h"
#include "net/http/http_status_code.h"
// TODO(ckehoe): Return error messages for bad requests.
namespace {
+const int kTokenLoggingSuffix = 5;
+const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
+const int kMaxInvalidTokens = 10000;
+const char kRegisterDeviceRpcName[] = "registerdevice";
+const char kDefaultCopresenceServer[] =
+ "https://www.googleapis.com/copresence/v2/copresence";
+
// UrlSafe is defined as:
// '/' represented by a '_' and '+' represented by a '-'
// TODO(rkc): Move this to the wrapper.
return token;
}
-const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
-const int kMaxInvalidTokens = 10000;
-const char kRegisterDeviceRpcName[] = "registerdevice";
-const char kDefaultCopresenceServer[] =
- "https://www.googleapis.com/copresence/v2/copresence";
// Logging
// Checks for a copresence error. If there is one, logs it and returns true.
-bool CopresenceErrorLogged(const Status& status) {
+bool IsErrorStatus(const Status& status) {
if (status.code() != OK) {
LOG(ERROR) << "Copresence error code " << status.code()
- << (status.message().empty() ? std::string() :
- ": " + status.message());
+ << (status.message().empty() ? "" : ": " + status.message());
}
return status.code() != OK;
}
// If any errors occurred, logs them and returns true.
bool ReportErrorLogged(const ReportResponse& response) {
- bool result = CopresenceErrorLogged(response.header().status());
+ bool result = IsErrorStatus(response.header().status());
// The Report fails or succeeds as a unit. If any responses had errors,
// the header will too. Thus we don't need to propagate individual errors.
return result;
}
+const std::string LoggingStrForToken(const std::string& auth_token) {
+ if (auth_token.empty())
+ return "anonymous";
+
+ std::string token_suffix = auth_token.substr(
+ auth_token.length() - kTokenLoggingSuffix, kTokenLoggingSuffix);
+ return base::StringPrintf("token ...%s", token_suffix.c_str());
+}
+
+
// Request construction
// TODO(ckehoe): Move these into a separate file?
return version;
}
-void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
+void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
TokenObservation* token_observation =
request->mutable_update_signals_request()->add_token_observation();
token_observation->set_token_id(ToUrlSafe(token.token));
} // namespace
-// Public methods
-RpcHandler::RpcHandler(CopresenceDelegate* delegate)
+// Public functions.
+
+RpcHandler::RpcHandler(CopresenceDelegate* delegate,
+ DirectiveHandler* directive_handler,
+ GCMHandler* gcm_handler,
+ const PostCallback& server_post_callback)
: delegate_(delegate),
+ directive_handler_(directive_handler),
+ gcm_handler_(gcm_handler),
+ server_post_callback_(server_post_callback),
invalid_audio_token_cache_(
base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
- kMaxInvalidTokens),
- server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
- base::Unretained(this))) {}
-
-RpcHandler::~RpcHandler() {
- for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
- post != pending_posts_.end(); ++post) {
- delete *post;
- }
+ kMaxInvalidTokens) {
+ DCHECK(delegate_);
+ DCHECK(directive_handler_);
+ // |gcm_handler_| is optional.
+
+ if (server_post_callback_.is_null()) {
+ server_post_callback_ =
+ base::Bind(&RpcHandler::SendHttpPost, base::Unretained(this));
+ }
- if (delegate_ && delegate_->GetWhispernetClient()) {
- delegate_->GetWhispernetClient()->RegisterTokensCallback(
- WhispernetClient::TokensCallback());
+ if (gcm_handler_) {
+ gcm_handler_->GetGcmId(
+ base::Bind(&RpcHandler::RegisterGcmId, base::Unretained(this)));
+ }
}
-}
-void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
- scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
- DCHECK(device_id_.empty());
-
- request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
- Identity* identity =
- request->mutable_device_identifiers()->mutable_registrant();
- identity->set_type(CHROME);
- identity->set_chrome_id(base::GenerateGUID());
- SendServerRequest(
- kRegisterDeviceRpcName,
- std::string(),
- request.Pass(),
- base::Bind(&RpcHandler::RegisterResponseHandler,
- // On destruction, this request will be cancelled.
- base::Unretained(this),
- init_done_callback));
-}
-
-void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
- SendReportRequest(request.Pass(), std::string(), StatusCallback());
+RpcHandler::~RpcHandler() {
+ // Do not use |directive_handler_| or |gcm_handler_| here.
+ // They will already have been destructed.
+ for (HttpPost* post : pending_posts_)
+ delete post;
}
void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
const std::string& app_id,
+ const std::string& auth_token,
const StatusCallback& status_callback) {
DCHECK(request.get());
- DCHECK(!device_id_.empty())
- << "RpcHandler::Initialize() must complete successfully "
- << "before other RpcHandler methods are called.";
- DVLOG(3) << "Sending report request to server.";
+ // Check that we have a "device" registered for this auth token.
+ bool queue_request = true;
+ const auto& registration = device_id_by_auth_token_.find(auth_token);
+ if (registration == device_id_by_auth_token_.end()) {
+ // Not registered.
+ RegisterForToken(auth_token);
+ } else if (!registration->second.empty()) {
+ // Registration complete.
+ queue_request = false;
+ }
+
+ // We're not registered, or registration is in progress.
+ if (queue_request) {
+ pending_requests_queue_.push_back(new PendingRequest(
+ request.Pass(), app_id, auth_token, status_callback));
+ return;
+ }
+
+ DVLOG(3) << "Sending ReportRequest to server.";
// If we are unpublishing or unsubscribing, we need to stop those publish or
// subscribes right away, we don't need to wait for the server to tell us.
AddPlayingTokens(request.get());
SendServerRequest(kReportRequestRpcName,
+ registration->second,
app_id,
+ auth_token,
request.Pass(),
// On destruction, this request will be cancelled.
base::Bind(&RpcHandler::ReportResponseHandler,
void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
DCHECK(!tokens.empty());
- scoped_ptr<ReportRequest> request(new ReportRequest);
- for (size_t i = 0; i < tokens.size(); ++i) {
- if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
+ if (device_id_by_auth_token_.empty()) {
+ VLOG(2) << "Skipping token reporting because no device IDs are registered";
+ return;
+ }
+
+ // Construct the ReportRequest.
+ ReportRequest request;
+ for (const AudioToken& token : tokens) {
+ if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token)))
continue;
- DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
- AddTokenToRequest(request.get(), tokens[i]);
+ DVLOG(3) << "Sending token " << token.token << " to server under "
+ << device_id_by_auth_token_.size() << " device ID(s)";
+ AddTokenToRequest(token, &request);
+ }
+
+ // Report under all active tokens.
+ for (const auto& registration : device_id_by_auth_token_) {
+ SendReportRequest(make_scoped_ptr(new ReportRequest(request)),
+ registration.first);
}
- SendReportRequest(request.Pass());
}
-void RpcHandler::ConnectToWhispernet() {
- WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
-
- // |directive_handler_| will be destructed with us, so unretained is safe.
- directive_handler_.reset(new DirectiveHandler);
- directive_handler_->Initialize(
- base::Bind(&WhispernetClient::DecodeSamples,
- base::Unretained(whispernet_client)),
- base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
- base::Unretained(this)));
-
- whispernet_client->RegisterTokensCallback(
- base::Bind(&RpcHandler::ReportTokens,
- // On destruction, this callback will be disconnected.
- base::Unretained(this)));
+
+// Private functions.
+
+RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report,
+ const std::string& app_id,
+ const std::string& auth_token,
+ const StatusCallback& callback)
+ : report(report.Pass()),
+ app_id(app_id),
+ auth_token(auth_token),
+ callback(callback) {}
+
+RpcHandler::PendingRequest::~PendingRequest() {}
+
+void RpcHandler::RegisterForToken(const std::string& auth_token) {
+ DVLOG(2) << "Sending " << LoggingStrForToken(auth_token)
+ << " registration to server.";
+
+ scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
+
+ // Add a GCM ID for authenticated registration, if we have one.
+ if (auth_token.empty() || gcm_id_.empty()) {
+ request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
+ } else {
+ DVLOG(2) << "Registering GCM ID with " << LoggingStrForToken(auth_token);
+ request->mutable_push_service()->set_service(GCM);
+ request->mutable_push_service()->mutable_gcm_registration()
+ ->set_device_token(gcm_id_);
+ }
+
+ // Only identify as a Chrome device if we're in anonymous mode.
+ // Authenticated calls come from a "GAIA device".
+ if (auth_token.empty()) {
+ Identity* identity =
+ request->mutable_device_identifiers()->mutable_registrant();
+ identity->set_type(CHROME);
+ identity->set_chrome_id(base::GenerateGUID());
+
+ // Since we're generating a new "Chrome ID" here,
+ // we need to make sure this isn't a duplicate registration.
+ DCHECK_EQ(0u, device_id_by_auth_token_.count(std::string()))
+ << "Attempted anonymous re-registration";
+ }
+
+ bool gcm_pending = !auth_token.empty() && gcm_handler_ && gcm_id_.empty();
+ SendServerRequest(
+ kRegisterDeviceRpcName,
+ // This will have the side effect of populating an empty device ID
+ // for this auth token in the map. This is what we want,
+ // to mark registration as being in progress.
+ device_id_by_auth_token_[auth_token],
+ std::string(), // app ID
+ auth_token,
+ request.Pass(),
+ base::Bind(&RpcHandler::RegisterResponseHandler,
+ // On destruction, this request will be cancelled.
+ base::Unretained(this),
+ auth_token,
+ gcm_pending));
}
-// Private methods
+void RpcHandler::ProcessQueuedRequests(const std::string& auth_token) {
+ // Track requests that are not on this auth token.
+ ScopedVector<PendingRequest> still_pending_requests;
+
+ // If there is no device ID for this auth token, registration failed.
+ bool registration_failed =
+ (device_id_by_auth_token_.count(auth_token) == 0);
+
+ // We momentarily take ownership of all the pointers in the queue.
+ // They are either deleted here or passed on to a new queue.
+ for (PendingRequest* request : pending_requests_queue_) {
+ if (request->auth_token == auth_token) {
+ if (registration_failed) {
+ request->callback.Run(FAIL);
+ } else {
+ SendReportRequest(request->report.Pass(),
+ request->app_id,
+ request->auth_token,
+ request->callback);
+ }
+ delete request;
+ } else {
+ // The request is on a different auth token.
+ still_pending_requests.push_back(request);
+ }
+ }
+
+ // Only keep the requests that weren't processed.
+ // All the pointers in the queue are now spoken for.
+ pending_requests_queue_.weak_clear();
+ pending_requests_queue_ = still_pending_requests.Pass();
+}
+
+void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
+ const std::string& auth_token) {
+ SendReportRequest(request.Pass(),
+ std::string(),
+ auth_token,
+ StatusCallback());
+}
+
+// Store a GCM ID and send it to the server if needed. The constructor passes
+// this callback to the GCMHandler to receive the ID whenever it's ready.
+// It may be returned immediately, if the ID is cached, or require a server
+// round-trip. This ID must then be passed along to the copresence server.
+// There are a few ways this can happen for each auth token:
+//
+// 1. The GCM ID is available when we first register, and is passed along
+// with the RegisterDeviceRequest.
+//
+// 2. The GCM ID becomes available after the RegisterDeviceRequest has
+// completed. Then the loop in this function will invoke RegisterForToken()
+// again to pass on the ID.
+//
+// 3. The GCM ID becomes available after the RegisterDeviceRequest is sent,
+// but before it completes. In this case, the gcm_pending flag is passed
+// through to the RegisterResponseHandler, which invokes RegisterForToken()
+// again to pass on the ID. The loop here must skip pending registrations,
+// as the device ID will be empty.
+//
+// TODO(ckehoe): Add tests for these scenarios.
+void RpcHandler::RegisterGcmId(const std::string& gcm_id) {
+ gcm_id_ = gcm_id;
+ if (!gcm_id.empty()) {
+ for (const auto& registration : device_id_by_auth_token_) {
+ const std::string& auth_token = registration.first;
+ const std::string& device_id = registration.second;
+ if (!auth_token.empty() && !device_id.empty())
+ RegisterForToken(auth_token);
+ }
+ }
+}
void RpcHandler::RegisterResponseHandler(
- const SuccessCallback& init_done_callback,
+ const std::string& auth_token,
+ bool gcm_pending,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data) {
if (completed_post) {
int elements_erased = pending_posts_.erase(completed_post);
- DCHECK(elements_erased);
+ DCHECK_GT(elements_erased, 0);
delete completed_post;
}
- if (http_status_code != net::HTTP_OK) {
- init_done_callback.Run(false);
- return;
- }
+ // Registration is no longer in progress.
+ // If it was successful, we'll update below.
+ device_id_by_auth_token_.erase(auth_token);
RegisterDeviceResponse response;
- if (!response.ParseFromString(response_data)) {
+ if (http_status_code != net::HTTP_OK) {
+ // TODO(ckehoe): Retry registration if appropriate.
+ LOG(ERROR) << LoggingStrForToken(auth_token)
+ << " device registration failed";
+ } else if (!response.ParseFromString(response_data)) {
LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
- init_done_callback.Run(false);
- return;
+ } else if (!IsErrorStatus(response.header().status())) {
+ const std::string& device_id = response.registered_device_id();
+ DCHECK(!device_id.empty());
+ device_id_by_auth_token_[auth_token] = device_id;
+ DVLOG(2) << LoggingStrForToken(auth_token)
+ << " device registration successful. Id: " << device_id;
+
+ // If we have a GCM ID now, and didn't before, pass it on to the server.
+ if (gcm_pending && !gcm_id_.empty())
+ RegisterForToken(auth_token);
}
- if (CopresenceErrorLogged(response.header().status()))
- return;
- device_id_ = response.registered_device_id();
- DCHECK(!device_id_.empty());
- DVLOG(2) << "Device registration successful: id " << device_id_;
- init_done_callback.Run(true);
+ // Send or fail requests on this auth token.
+ ProcessQueuedRequests(auth_token);
}
void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
return;
}
- const RepeatedPtrField<MessageResult>& message_results =
- response.manage_messages_response().published_message_result();
- for (int i = 0; i < message_results.size(); ++i) {
- DVLOG(2) << "Published message with id "
- << message_results.Get(i).published_message_id();
+ for (const MessageResult& result :
+ response.manage_messages_response().published_message_result()) {
+ DVLOG(2) << "Published message with id " << result.published_message_id();
}
- const RepeatedPtrField<SubscriptionResult>& subscription_results =
- response.manage_subscriptions_response().subscription_result();
- for (int i = 0; i < subscription_results.size(); ++i) {
- DVLOG(2) << "Created subscription with id "
- << subscription_results.Get(i).subscription_id();
+ for (const SubscriptionResult& result :
+ response.manage_subscriptions_response().subscription_result()) {
+ DVLOG(2) << "Created subscription with id " << result.subscription_id();
}
if (response.has_update_signals_response()) {
response.update_signals_response();
DispatchMessages(update_response.message());
- if (directive_handler_.get()) {
- for (int i = 0; i < update_response.directive_size(); ++i)
- directive_handler_->AddDirective(update_response.directive(i));
- } else {
- DVLOG(1) << "No directive handler.";
- }
+ for (const Directive& directive : update_response.directive())
+ directive_handler_->AddDirective(directive);
- const RepeatedPtrField<Token>& tokens = update_response.token();
- for (int i = 0; i < tokens.size(); ++i) {
- switch (tokens.Get(i).status()) {
+ for (const Token& token : update_response.token()) {
+ switch (token.status()) {
case VALID:
// TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
// short TTL (like 10s) and send it up with every report request.
// Then we'll still get messages while we're waiting to hear it again.
- VLOG(1) << "Got valid token " << tokens.Get(i).id();
+ VLOG(1) << "Got valid token " << token.id();
break;
case INVALID:
- DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
- invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
+ DVLOG(3) << "Discarding invalid token " << token.id();
+ invalid_audio_token_cache_.Add(token.id(), true);
break;
default:
- DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
- << tokens.Get(i).status();
+ DVLOG(2) << "Token " << token.id() << " has status code "
+ << token.status();
}
}
}
void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
// Remove unpublishes.
if (request.has_manage_messages_request()) {
- const RepeatedPtrField<std::string>& unpublishes =
- request.manage_messages_request().id_to_unpublish();
- for (int i = 0; i < unpublishes.size(); ++i)
- directive_handler_->RemoveDirectives(unpublishes.Get(i));
+ for (const std::string& unpublish :
+ request.manage_messages_request().id_to_unpublish()) {
+ directive_handler_->RemoveDirectives(unpublish);
+ }
}
// Remove unsubscribes.
if (request.has_manage_subscriptions_request()) {
- const RepeatedPtrField<std::string>& unsubscribes =
- request.manage_subscriptions_request().id_to_unsubscribe();
- for (int i = 0; i < unsubscribes.size(); ++i)
- directive_handler_->RemoveDirectives(unsubscribes.Get(i));
+ for (const std::string& unsubscribe :
+ request.manage_subscriptions_request().id_to_unsubscribe()) {
+ directive_handler_->RemoveDirectives(unsubscribe);
+ }
}
}
void RpcHandler::AddPlayingTokens(ReportRequest* request) {
- if (!directive_handler_)
- return;
-
- const std::string& audible_token = directive_handler_->CurrentAudibleToken();
+ const std::string& audible_token =
+ directive_handler_->GetCurrentAudioToken(AUDIBLE);
const std::string& inaudible_token =
- directive_handler_->CurrentInaudibleToken();
+ directive_handler_->GetCurrentAudioToken(INAUDIBLE);
if (!audible_token.empty())
- AddTokenToRequest(request, AudioToken(audible_token, true));
+ AddTokenToRequest(AudioToken(audible_token, true), request);
if (!inaudible_token.empty())
- AddTokenToRequest(request, AudioToken(inaudible_token, false));
+ AddTokenToRequest(AudioToken(inaudible_token, false), request);
}
void RpcHandler::DispatchMessages(
// Index the messages by subscription id.
std::map<std::string, std::vector<Message>> messages_by_subscription;
DVLOG(3) << "Dispatching " << messages.size() << " messages";
- for (int m = 0; m < messages.size(); ++m) {
- const RepeatedPtrField<std::string>& subscription_ids =
- messages.Get(m).subscription_id();
- for (int s = 0; s < subscription_ids.size(); ++s) {
- messages_by_subscription[subscription_ids.Get(s)].push_back(
- messages.Get(m).published_message());
+ for (const SubscribedMessage& message : messages) {
+ for (const std::string& subscription_id : message.subscription_id()) {
+ messages_by_subscription[subscription_id].push_back(
+ message.published_message());
}
}
// Send the messages for each subscription.
- for (std::map<std::string, std::vector<Message>>::const_iterator
- subscription = messages_by_subscription.begin();
- subscription != messages_by_subscription.end();
- ++subscription) {
+ for (const auto& map_entry : messages_by_subscription) {
// TODO(ckehoe): Once we have the app ID from the server, we need to pass
// it in here and get rid of the app id registry from the main API class.
- delegate_->HandleMessages("", subscription->first, subscription->second);
+ const std::string& subscription = map_entry.first;
+ const std::vector<Message>& messages = map_entry.second;
+ delegate_->HandleMessages(std::string(), subscription, messages);
}
}
+// TODO(ckehoe): Pass in the version string and
+// group this with the local functions up top.
RequestHeader* RpcHandler::CreateRequestHeader(
- const std::string& client_name) const {
+ const std::string& client_name,
+ const std::string& device_id) const {
RequestHeader* header = new RequestHeader;
header->set_allocated_framework_version(CreateVersion(
CreateVersion(client_name, std::string()));
}
header->set_current_time_millis(base::Time::Now().ToJsTime());
- header->set_registered_device_id(device_id_);
+ if (!device_id.empty())
+ header->set_registered_device_id(device_id);
DeviceFingerprint* fingerprint = new DeviceFingerprint;
fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
template <class T>
void RpcHandler::SendServerRequest(
const std::string& rpc_name,
+ const std::string& device_id,
const std::string& app_id,
+ const std::string& auth_token,
scoped_ptr<T> request,
const PostCleanupCallback& response_handler) {
- request->set_allocated_header(CreateRequestHeader(app_id));
+ request->set_allocated_header(CreateRequestHeader(app_id, device_id));
server_post_callback_.Run(delegate_->GetRequestContext(),
rpc_name,
+ delegate_->GetAPIKey(app_id),
+ auth_token,
make_scoped_ptr<MessageLite>(request.release()),
response_handler);
}
void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
const std::string& rpc_name,
+ const std::string& api_key,
+ const std::string& auth_token,
scoped_ptr<MessageLite> request_proto,
const PostCleanupCallback& callback) {
// Create the base URL to call.
url_context_getter,
copresence_server_host,
rpc_name,
+ api_key,
+ auth_token,
command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
- delegate_->GetAPIKey(),
*request_proto);
http_post->Start(base::Bind(callback, http_post));
pending_posts_.insert(http_post);
}
-void RpcHandler::AudioDirectiveListToWhispernetConnector(
- const std::string& token,
- bool audible,
- const WhispernetClient::SamplesCallback& samples_callback) {
- WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
- if (whispernet_client) {
- whispernet_client->RegisterSamplesCallback(samples_callback);
- whispernet_client->EncodeToken(token, audible);
- }
-}
-
} // namespace copresence