X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=src%2Fcomponents%2Fcopresence%2Frpc%2Frpc_handler.cc;h=78bd95835f1989bb4b3c09dba344cbd407cbdb18;hb=1afa4dd80ef85af7c90efaea6959db1d92330844;hp=5b59a6431860d96a799a6721678cb7802e263a28;hpb=90762837333c13ccf56f2ad88e4481fc71e8d281;p=platform%2Fframework%2Fweb%2Fcrosswalk.git diff --git a/src/components/copresence/rpc/rpc_handler.cc b/src/components/copresence/rpc/rpc_handler.cc index 5b59a64..78bd958 100644 --- a/src/components/copresence/rpc/rpc_handler.cc +++ b/src/components/copresence/rpc/rpc_handler.cc @@ -4,13 +4,12 @@ #include "components/copresence/rpc/rpc_handler.h" -#include - #include "base/bind.h" #include "base/command_line.h" #include "base/guid.h" #include "base/logging.h" #include "base/strings/string_util.h" +#include "base/strings/stringprintf.h" // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now, @@ -22,10 +21,13 @@ #include "components/copresence/copresence_switches.h" #include "components/copresence/handlers/directive_handler.h" +#include "components/copresence/handlers/gcm_handler.h" #include "components/copresence/proto/codes.pb.h" #include "components/copresence/proto/data.pb.h" #include "components/copresence/proto/rpcs.pb.h" +#include "components/copresence/public/copresence_constants.h" #include "components/copresence/public/copresence_delegate.h" +#include "components/copresence/rpc/http_post.h" #include "net/http/http_status_code.h" // TODO(ckehoe): Return error messages for bad requests. @@ -39,6 +41,13 @@ const char RpcHandler::kReportRequestRpcName[] = "report"; namespace { +const int kTokenLoggingSuffix = 5; +const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. +const int kMaxInvalidTokens = 10000; +const char kRegisterDeviceRpcName[] = "registerdevice"; +const char kDefaultCopresenceServer[] = + "https://www.googleapis.com/copresence/v2/copresence"; + // UrlSafe is defined as: // '/' represented by a '_' and '+' represented by a '-' // TODO(rkc): Move this to the wrapper. @@ -48,20 +57,14 @@ std::string ToUrlSafe(std::string token) { return token; } -const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. -const int kMaxInvalidTokens = 10000; -const char kRegisterDeviceRpcName[] = "registerdevice"; -const char kDefaultCopresenceServer[] = - "https://www.googleapis.com/copresence/v2/copresence"; // Logging // Checks for a copresence error. If there is one, logs it and returns true. -bool CopresenceErrorLogged(const Status& status) { +bool IsErrorStatus(const Status& status) { if (status.code() != OK) { LOG(ERROR) << "Copresence error code " << status.code() - << (status.message().empty() ? std::string() : - ": " + status.message()); + << (status.message().empty() ? "" : ": " + status.message()); } return status.code() != OK; } @@ -75,7 +78,7 @@ void LogIfErrorStatus(const util::error::Code& code, // If any errors occurred, logs them and returns true. bool ReportErrorLogged(const ReportResponse& response) { - bool result = CopresenceErrorLogged(response.header().status()); + bool result = IsErrorStatus(response.header().status()); // The Report fails or succeeds as a unit. If any responses had errors, // the header will too. Thus we don't need to propagate individual errors. @@ -91,6 +94,16 @@ bool ReportErrorLogged(const ReportResponse& response) { return result; } +const std::string LoggingStrForToken(const std::string& auth_token) { + if (auth_token.empty()) + return "anonymous"; + + std::string token_suffix = auth_token.substr( + auth_token.length() - kTokenLoggingSuffix, kTokenLoggingSuffix); + return base::StringPrintf("token ...%s", token_suffix.c_str()); +} + + // Request construction // TODO(ckehoe): Move these into a separate file? @@ -135,7 +148,7 @@ ClientVersion* CreateVersion(const std::string& client, return version; } -void AddTokenToRequest(ReportRequest* request, const AudioToken& token) { +void AddTokenToRequest(const AudioToken& token, ReportRequest* request) { TokenObservation* token_observation = request->mutable_update_signals_request()->add_token_observation(); token_observation->set_token_id(ToUrlSafe(token.token)); @@ -148,60 +161,67 @@ void AddTokenToRequest(ReportRequest* request, const AudioToken& token) { } // namespace -// Public methods -RpcHandler::RpcHandler(CopresenceDelegate* delegate) +// Public functions. + +RpcHandler::RpcHandler(CopresenceDelegate* delegate, + DirectiveHandler* directive_handler, + GCMHandler* gcm_handler, + const PostCallback& server_post_callback) : delegate_(delegate), + directive_handler_(directive_handler), + gcm_handler_(gcm_handler), + server_post_callback_(server_post_callback), invalid_audio_token_cache_( base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), - kMaxInvalidTokens), - server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, - base::Unretained(this))) {} - -RpcHandler::~RpcHandler() { - for (std::set::iterator post = pending_posts_.begin(); - post != pending_posts_.end(); ++post) { - delete *post; - } + kMaxInvalidTokens) { + DCHECK(delegate_); + DCHECK(directive_handler_); + // |gcm_handler_| is optional. + + if (server_post_callback_.is_null()) { + server_post_callback_ = + base::Bind(&RpcHandler::SendHttpPost, base::Unretained(this)); + } - if (delegate_ && delegate_->GetWhispernetClient()) { - delegate_->GetWhispernetClient()->RegisterTokensCallback( - WhispernetClient::TokensCallback()); + if (gcm_handler_) { + gcm_handler_->GetGcmId( + base::Bind(&RpcHandler::RegisterGcmId, base::Unretained(this))); + } } -} -void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { - scoped_ptr request(new RegisterDeviceRequest); - DCHECK(device_id_.empty()); - - request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); - Identity* identity = - request->mutable_device_identifiers()->mutable_registrant(); - identity->set_type(CHROME); - identity->set_chrome_id(base::GenerateGUID()); - SendServerRequest( - kRegisterDeviceRpcName, - std::string(), - request.Pass(), - base::Bind(&RpcHandler::RegisterResponseHandler, - // On destruction, this request will be cancelled. - base::Unretained(this), - init_done_callback)); -} - -void RpcHandler::SendReportRequest(scoped_ptr request) { - SendReportRequest(request.Pass(), std::string(), StatusCallback()); +RpcHandler::~RpcHandler() { + // Do not use |directive_handler_| or |gcm_handler_| here. + // They will already have been destructed. + for (HttpPost* post : pending_posts_) + delete post; } void RpcHandler::SendReportRequest(scoped_ptr request, const std::string& app_id, + const std::string& auth_token, const StatusCallback& status_callback) { DCHECK(request.get()); - DCHECK(!device_id_.empty()) - << "RpcHandler::Initialize() must complete successfully " - << "before other RpcHandler methods are called."; - DVLOG(3) << "Sending report request to server."; + // Check that we have a "device" registered for this auth token. + bool queue_request = true; + const auto& registration = device_id_by_auth_token_.find(auth_token); + if (registration == device_id_by_auth_token_.end()) { + // Not registered. + RegisterForToken(auth_token); + } else if (!registration->second.empty()) { + // Registration complete. + queue_request = false; + } + + // We're not registered, or registration is in progress. + if (queue_request) { + pending_requests_queue_.push_back(new PendingRequest( + request.Pass(), app_id, auth_token, status_callback)); + return; + } + + DVLOG(3) << "Sending ReportRequest to server."; // If we are unpublishing or unsubscribing, we need to stop those publish or // subscribes right away, we don't need to wait for the server to tell us. @@ -213,7 +233,9 @@ void RpcHandler::SendReportRequest(scoped_ptr request, AddPlayingTokens(request.get()); SendServerRequest(kReportRequestRpcName, + registration->second, app_id, + auth_token, request.Pass(), // On destruction, this request will be cancelled. base::Bind(&RpcHandler::ReportResponseHandler, @@ -224,64 +246,199 @@ void RpcHandler::SendReportRequest(scoped_ptr request, void RpcHandler::ReportTokens(const std::vector& tokens) { DCHECK(!tokens.empty()); - scoped_ptr request(new ReportRequest); - for (size_t i = 0; i < tokens.size(); ++i) { - if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token))) + if (device_id_by_auth_token_.empty()) { + VLOG(2) << "Skipping token reporting because no device IDs are registered"; + return; + } + + // Construct the ReportRequest. + ReportRequest request; + for (const AudioToken& token : tokens) { + if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token))) continue; - DVLOG(3) << "Sending token " << tokens[i].token << " to server."; - AddTokenToRequest(request.get(), tokens[i]); + DVLOG(3) << "Sending token " << token.token << " to server under " + << device_id_by_auth_token_.size() << " device ID(s)"; + AddTokenToRequest(token, &request); + } + + // Report under all active tokens. + for (const auto& registration : device_id_by_auth_token_) { + SendReportRequest(make_scoped_ptr(new ReportRequest(request)), + registration.first); } - SendReportRequest(request.Pass()); } -void RpcHandler::ConnectToWhispernet() { - WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); - - // |directive_handler_| will be destructed with us, so unretained is safe. - directive_handler_.reset(new DirectiveHandler); - directive_handler_->Initialize( - base::Bind(&WhispernetClient::DecodeSamples, - base::Unretained(whispernet_client)), - base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, - base::Unretained(this))); - - whispernet_client->RegisterTokensCallback( - base::Bind(&RpcHandler::ReportTokens, - // On destruction, this callback will be disconnected. - base::Unretained(this))); + +// Private functions. + +RpcHandler::PendingRequest::PendingRequest(scoped_ptr report, + const std::string& app_id, + const std::string& auth_token, + const StatusCallback& callback) + : report(report.Pass()), + app_id(app_id), + auth_token(auth_token), + callback(callback) {} + +RpcHandler::PendingRequest::~PendingRequest() {} + +void RpcHandler::RegisterForToken(const std::string& auth_token) { + DVLOG(2) << "Sending " << LoggingStrForToken(auth_token) + << " registration to server."; + + scoped_ptr request(new RegisterDeviceRequest); + + // Add a GCM ID for authenticated registration, if we have one. + if (auth_token.empty() || gcm_id_.empty()) { + request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); + } else { + DVLOG(2) << "Registering GCM ID with " << LoggingStrForToken(auth_token); + request->mutable_push_service()->set_service(GCM); + request->mutable_push_service()->mutable_gcm_registration() + ->set_device_token(gcm_id_); + } + + // Only identify as a Chrome device if we're in anonymous mode. + // Authenticated calls come from a "GAIA device". + if (auth_token.empty()) { + Identity* identity = + request->mutable_device_identifiers()->mutable_registrant(); + identity->set_type(CHROME); + identity->set_chrome_id(base::GenerateGUID()); + + // Since we're generating a new "Chrome ID" here, + // we need to make sure this isn't a duplicate registration. + DCHECK_EQ(0u, device_id_by_auth_token_.count(std::string())) + << "Attempted anonymous re-registration"; + } + + bool gcm_pending = !auth_token.empty() && gcm_handler_ && gcm_id_.empty(); + SendServerRequest( + kRegisterDeviceRpcName, + // This will have the side effect of populating an empty device ID + // for this auth token in the map. This is what we want, + // to mark registration as being in progress. + device_id_by_auth_token_[auth_token], + std::string(), // app ID + auth_token, + request.Pass(), + base::Bind(&RpcHandler::RegisterResponseHandler, + // On destruction, this request will be cancelled. + base::Unretained(this), + auth_token, + gcm_pending)); } -// Private methods +void RpcHandler::ProcessQueuedRequests(const std::string& auth_token) { + // Track requests that are not on this auth token. + ScopedVector still_pending_requests; + + // If there is no device ID for this auth token, registration failed. + bool registration_failed = + (device_id_by_auth_token_.count(auth_token) == 0); + + // We momentarily take ownership of all the pointers in the queue. + // They are either deleted here or passed on to a new queue. + for (PendingRequest* request : pending_requests_queue_) { + if (request->auth_token == auth_token) { + if (registration_failed) { + request->callback.Run(FAIL); + } else { + SendReportRequest(request->report.Pass(), + request->app_id, + request->auth_token, + request->callback); + } + delete request; + } else { + // The request is on a different auth token. + still_pending_requests.push_back(request); + } + } + + // Only keep the requests that weren't processed. + // All the pointers in the queue are now spoken for. + pending_requests_queue_.weak_clear(); + pending_requests_queue_ = still_pending_requests.Pass(); +} + +void RpcHandler::SendReportRequest(scoped_ptr request, + const std::string& auth_token) { + SendReportRequest(request.Pass(), + std::string(), + auth_token, + StatusCallback()); +} + +// Store a GCM ID and send it to the server if needed. The constructor passes +// this callback to the GCMHandler to receive the ID whenever it's ready. +// It may be returned immediately, if the ID is cached, or require a server +// round-trip. This ID must then be passed along to the copresence server. +// There are a few ways this can happen for each auth token: +// +// 1. The GCM ID is available when we first register, and is passed along +// with the RegisterDeviceRequest. +// +// 2. The GCM ID becomes available after the RegisterDeviceRequest has +// completed. Then the loop in this function will invoke RegisterForToken() +// again to pass on the ID. +// +// 3. The GCM ID becomes available after the RegisterDeviceRequest is sent, +// but before it completes. In this case, the gcm_pending flag is passed +// through to the RegisterResponseHandler, which invokes RegisterForToken() +// again to pass on the ID. The loop here must skip pending registrations, +// as the device ID will be empty. +// +// TODO(ckehoe): Add tests for these scenarios. +void RpcHandler::RegisterGcmId(const std::string& gcm_id) { + gcm_id_ = gcm_id; + if (!gcm_id.empty()) { + for (const auto& registration : device_id_by_auth_token_) { + const std::string& auth_token = registration.first; + const std::string& device_id = registration.second; + if (!auth_token.empty() && !device_id.empty()) + RegisterForToken(auth_token); + } + } +} void RpcHandler::RegisterResponseHandler( - const SuccessCallback& init_done_callback, + const std::string& auth_token, + bool gcm_pending, HttpPost* completed_post, int http_status_code, const std::string& response_data) { if (completed_post) { int elements_erased = pending_posts_.erase(completed_post); - DCHECK(elements_erased); + DCHECK_GT(elements_erased, 0); delete completed_post; } - if (http_status_code != net::HTTP_OK) { - init_done_callback.Run(false); - return; - } + // Registration is no longer in progress. + // If it was successful, we'll update below. + device_id_by_auth_token_.erase(auth_token); RegisterDeviceResponse response; - if (!response.ParseFromString(response_data)) { + if (http_status_code != net::HTTP_OK) { + // TODO(ckehoe): Retry registration if appropriate. + LOG(ERROR) << LoggingStrForToken(auth_token) + << " device registration failed"; + } else if (!response.ParseFromString(response_data)) { LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; - init_done_callback.Run(false); - return; + } else if (!IsErrorStatus(response.header().status())) { + const std::string& device_id = response.registered_device_id(); + DCHECK(!device_id.empty()); + device_id_by_auth_token_[auth_token] = device_id; + DVLOG(2) << LoggingStrForToken(auth_token) + << " device registration successful. Id: " << device_id; + + // If we have a GCM ID now, and didn't before, pass it on to the server. + if (gcm_pending && !gcm_id_.empty()) + RegisterForToken(auth_token); } - if (CopresenceErrorLogged(response.header().status())) - return; - device_id_ = response.registered_device_id(); - DCHECK(!device_id_.empty()); - DVLOG(2) << "Device registration successful: id " << device_id_; - init_done_callback.Run(true); + // Send or fail requests on this auth token. + ProcessQueuedRequests(auth_token); } void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, @@ -315,18 +472,14 @@ void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, return; } - const RepeatedPtrField& message_results = - response.manage_messages_response().published_message_result(); - for (int i = 0; i < message_results.size(); ++i) { - DVLOG(2) << "Published message with id " - << message_results.Get(i).published_message_id(); + for (const MessageResult& result : + response.manage_messages_response().published_message_result()) { + DVLOG(2) << "Published message with id " << result.published_message_id(); } - const RepeatedPtrField& subscription_results = - response.manage_subscriptions_response().subscription_result(); - for (int i = 0; i < subscription_results.size(); ++i) { - DVLOG(2) << "Created subscription with id " - << subscription_results.Get(i).subscription_id(); + for (const SubscriptionResult& result : + response.manage_subscriptions_response().subscription_result()) { + DVLOG(2) << "Created subscription with id " << result.subscription_id(); } if (response.has_update_signals_response()) { @@ -334,29 +487,24 @@ void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, response.update_signals_response(); DispatchMessages(update_response.message()); - if (directive_handler_.get()) { - for (int i = 0; i < update_response.directive_size(); ++i) - directive_handler_->AddDirective(update_response.directive(i)); - } else { - DVLOG(1) << "No directive handler."; - } + for (const Directive& directive : update_response.directive()) + directive_handler_->AddDirective(directive); - const RepeatedPtrField& tokens = update_response.token(); - for (int i = 0; i < tokens.size(); ++i) { - switch (tokens.Get(i).status()) { + for (const Token& token : update_response.token()) { + switch (token.status()) { case VALID: // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a // short TTL (like 10s) and send it up with every report request. // Then we'll still get messages while we're waiting to hear it again. - VLOG(1) << "Got valid token " << tokens.Get(i).id(); + VLOG(1) << "Got valid token " << token.id(); break; case INVALID: - DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); - invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); + DVLOG(3) << "Discarding invalid token " << token.id(); + invalid_audio_token_cache_.Add(token.id(), true); break; default: - DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " - << tokens.Get(i).status(); + DVLOG(2) << "Token " << token.id() << " has status code " + << token.status(); } } } @@ -369,33 +517,31 @@ void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) { // Remove unpublishes. if (request.has_manage_messages_request()) { - const RepeatedPtrField& unpublishes = - request.manage_messages_request().id_to_unpublish(); - for (int i = 0; i < unpublishes.size(); ++i) - directive_handler_->RemoveDirectives(unpublishes.Get(i)); + for (const std::string& unpublish : + request.manage_messages_request().id_to_unpublish()) { + directive_handler_->RemoveDirectives(unpublish); + } } // Remove unsubscribes. if (request.has_manage_subscriptions_request()) { - const RepeatedPtrField& unsubscribes = - request.manage_subscriptions_request().id_to_unsubscribe(); - for (int i = 0; i < unsubscribes.size(); ++i) - directive_handler_->RemoveDirectives(unsubscribes.Get(i)); + for (const std::string& unsubscribe : + request.manage_subscriptions_request().id_to_unsubscribe()) { + directive_handler_->RemoveDirectives(unsubscribe); + } } } void RpcHandler::AddPlayingTokens(ReportRequest* request) { - if (!directive_handler_) - return; - - const std::string& audible_token = directive_handler_->CurrentAudibleToken(); + const std::string& audible_token = + directive_handler_->GetCurrentAudioToken(AUDIBLE); const std::string& inaudible_token = - directive_handler_->CurrentInaudibleToken(); + directive_handler_->GetCurrentAudioToken(INAUDIBLE); if (!audible_token.empty()) - AddTokenToRequest(request, AudioToken(audible_token, true)); + AddTokenToRequest(AudioToken(audible_token, true), request); if (!inaudible_token.empty()) - AddTokenToRequest(request, AudioToken(inaudible_token, false)); + AddTokenToRequest(AudioToken(inaudible_token, false), request); } void RpcHandler::DispatchMessages( @@ -406,28 +552,28 @@ void RpcHandler::DispatchMessages( // Index the messages by subscription id. std::map> messages_by_subscription; DVLOG(3) << "Dispatching " << messages.size() << " messages"; - for (int m = 0; m < messages.size(); ++m) { - const RepeatedPtrField& subscription_ids = - messages.Get(m).subscription_id(); - for (int s = 0; s < subscription_ids.size(); ++s) { - messages_by_subscription[subscription_ids.Get(s)].push_back( - messages.Get(m).published_message()); + for (const SubscribedMessage& message : messages) { + for (const std::string& subscription_id : message.subscription_id()) { + messages_by_subscription[subscription_id].push_back( + message.published_message()); } } // Send the messages for each subscription. - for (std::map>::const_iterator - subscription = messages_by_subscription.begin(); - subscription != messages_by_subscription.end(); - ++subscription) { + for (const auto& map_entry : messages_by_subscription) { // TODO(ckehoe): Once we have the app ID from the server, we need to pass // it in here and get rid of the app id registry from the main API class. - delegate_->HandleMessages("", subscription->first, subscription->second); + const std::string& subscription = map_entry.first; + const std::vector& messages = map_entry.second; + delegate_->HandleMessages(std::string(), subscription, messages); } } +// TODO(ckehoe): Pass in the version string and +// group this with the local functions up top. RequestHeader* RpcHandler::CreateRequestHeader( - const std::string& client_name) const { + const std::string& client_name, + const std::string& device_id) const { RequestHeader* header = new RequestHeader; header->set_allocated_framework_version(CreateVersion( @@ -437,7 +583,8 @@ RequestHeader* RpcHandler::CreateRequestHeader( CreateVersion(client_name, std::string())); } header->set_current_time_millis(base::Time::Now().ToJsTime()); - header->set_registered_device_id(device_id_); + if (!device_id.empty()) + header->set_registered_device_id(device_id); DeviceFingerprint* fingerprint = new DeviceFingerprint; fingerprint->set_platform_version(delegate_->GetPlatformVersionString()); @@ -450,18 +597,24 @@ RequestHeader* RpcHandler::CreateRequestHeader( template void RpcHandler::SendServerRequest( const std::string& rpc_name, + const std::string& device_id, const std::string& app_id, + const std::string& auth_token, scoped_ptr request, const PostCleanupCallback& response_handler) { - request->set_allocated_header(CreateRequestHeader(app_id)); + request->set_allocated_header(CreateRequestHeader(app_id, device_id)); server_post_callback_.Run(delegate_->GetRequestContext(), rpc_name, + delegate_->GetAPIKey(app_id), + auth_token, make_scoped_ptr(request.release()), response_handler); } void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter, const std::string& rpc_name, + const std::string& api_key, + const std::string& auth_token, scoped_ptr request_proto, const PostCleanupCallback& callback) { // Create the base URL to call. @@ -476,23 +629,13 @@ void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter, url_context_getter, copresence_server_host, rpc_name, + api_key, + auth_token, command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken), - delegate_->GetAPIKey(), *request_proto); http_post->Start(base::Bind(callback, http_post)); pending_posts_.insert(http_post); } -void RpcHandler::AudioDirectiveListToWhispernetConnector( - const std::string& token, - bool audible, - const WhispernetClient::SamplesCallback& samples_callback) { - WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); - if (whispernet_client) { - whispernet_client->RegisterSamplesCallback(samples_callback); - whispernet_client->EncodeToken(token, audible); - } -} - } // namespace copresence