1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/rpc/rpc_handler.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14 #include "base/time/time.h"
15 #include "components/copresence/copresence_switches.h"
16 #include "components/copresence/handlers/directive_handler.h"
17 #include "components/copresence/proto/codes.pb.h"
18 #include "components/copresence/proto/data.pb.h"
19 #include "components/copresence/proto/rpcs.pb.h"
20 #include "components/copresence/public/copresence_client_delegate.h"
21 #include "net/http/http_status_code.h"
23 // TODO(ckehoe): Return error messages for bad requests.
25 namespace copresence {
27 using google::protobuf::MessageLite;
28 using google::protobuf::RepeatedPtrField;
30 const char RpcHandler::kReportRequestRpcName[] = "report";
34 // UrlSafe is defined as:
35 // '/' represented by a '_' and '+' represented by a '-'
36 // TODO(rkc): Move this to the wrapper.
37 std::string ToUrlSafe(std::string token) {
38 base::ReplaceChars(token, "+", "-", &token);
39 base::ReplaceChars(token, "/", "_", &token);
43 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
44 const int kMaxInvalidTokens = 10000;
45 const char kRegisterDeviceRpcName[] = "registerdevice";
46 const char kDefaultCopresenceServer[] =
47 "https://www.googleapis.com/copresence/v2/copresence";
51 // Checks for a copresence error. If there is one, logs it and returns true.
52 bool CopresenceErrorLogged(const Status& status) {
53 if (status.code() != OK) {
54 LOG(ERROR) << "Copresence error code " << status.code()
55 << (status.message().empty() ? std::string() :
56 ": " + status.message());
58 return status.code() != OK;
61 void LogIfErrorStatus(const util::error::Code& code,
62 const std::string& context) {
63 LOG_IF(ERROR, code != util::error::OK)
64 << context << " error " << code << ". See "
65 << "cs/google3/util/task/codes.proto for more info.";
68 // If any errors occurred, logs them and returns true.
69 bool ReportErrorLogged(const ReportResponse& response) {
70 bool result = CopresenceErrorLogged(response.header().status());
72 // The Report fails or succeeds as a unit. If any responses had errors,
73 // the header will too. Thus we don't need to propagate individual errors.
74 if (response.has_update_signals_response())
75 LogIfErrorStatus(response.update_signals_response().status(), "Update");
76 if (response.has_manage_messages_response())
77 LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
78 if (response.has_manage_subscriptions_response()) {
79 LogIfErrorStatus(response.manage_subscriptions_response().status(),
86 // Request construction
87 // TODO(ckehoe): Move these into a separate file?
90 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
91 if (msg.has_token_exchange_strategy() &&
92 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
93 return msg.token_exchange_strategy().broadcast_scan_configuration();
95 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
98 // This method will extract token exchange strategies
99 // from the publishes and subscribes in a report request.
100 // TODO(ckehoe): Delete this when the server supports
101 // BroadcastScanConfiguration.
102 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
103 const ReportRequest& request) {
104 bool broadcast_only = false;
105 bool scan_only = false;
107 // Strategies for publishes.
108 if (request.has_manage_messages_request()) {
109 const RepeatedPtrField<PublishedMessage>& messages =
110 request.manage_messages_request().message_to_publish();
111 for (int i = 0; i < messages.size(); ++i) {
112 BroadcastScanConfiguration config =
113 GetBroadcastScanConfig(messages.Get(i));
114 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
115 scan_only = scan_only || config == SCAN_ONLY;
116 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
117 return BROADCAST_AND_SCAN;
121 // Strategies for subscriptions.
122 if (request.has_manage_subscriptions_request()) {
123 const RepeatedPtrField<Subscription> subscriptions =
124 request.manage_subscriptions_request().subscription();
125 for (int i = 0; i < subscriptions.size(); ++i) {
126 BroadcastScanConfiguration config =
127 GetBroadcastScanConfig(subscriptions.Get(i));
128 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
129 scan_only = scan_only || config == SCAN_ONLY;
130 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
131 return BROADCAST_AND_SCAN;
136 return BROADCAST_ONLY;
140 // If nothing else is specified, default to both broadcast and scan.
141 return BROADCAST_AND_SCAN;
144 // TODO(rkc): Fix this hack once the server supports setting strategies per
146 bool ExtractIsAudibleStrategy(const ReportRequest& request) {
147 if (request.has_manage_messages_request()) {
148 const RepeatedPtrField<PublishedMessage> messages =
149 request.manage_messages_request().message_to_publish();
150 for (int i = 0; i < messages.size(); ++i) {
151 const PublishedMessage& msg = messages.Get(i);
152 if (msg.has_token_exchange_strategy() &&
153 msg.token_exchange_strategy().has_use_audible() &&
154 msg.token_exchange_strategy().use_audible()) {
162 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
163 scoped_ptr<DeviceState> state(new DeviceState);
165 // TODO(ckehoe): Currently this code causes a linker error on Windows.
167 TokenTechnology* token_technology =
168 state->mutable_capabilities()->add_token_technology();
169 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
170 if (ExtractIsAudibleStrategy(request))
171 token_technology->set_medium(AUDIO_AUDIBLE_DTMF);
173 BroadcastScanConfiguration config =
174 ExtractTokenExchangeStrategy(request);
175 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
176 token_technology->add_instruction_type(TRANSMIT);
177 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
178 token_technology->add_instruction_type(RECEIVE);
184 // TODO(ckehoe): We're keeping this code in a separate function for now
185 // because we get a version string from Chrome, but the proto expects
186 // an int64 version. We should probably change the version proto
187 // to handle a more detailed version.
188 ClientVersion* CreateVersion(const std::string& client,
189 const std::string& version_name) {
190 ClientVersion* version = new ClientVersion;
192 version->set_client(client);
193 version->set_version_name(version_name);
198 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
199 TokenObservation* token_observation =
200 request->mutable_update_signals_request()->add_token_observation();
201 token_observation->set_token_id(ToUrlSafe(token.token));
203 TokenSignals* signals = token_observation->add_signals();
204 signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
205 : AUDIO_ULTRASOUND_PASSBAND);
206 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
209 OptInStateFilter* CreateOptedInOrOutFilter() {
210 OptInStateFilter* filter = new OptInStateFilter;
211 filter->add_allowed_opt_in_state(copresence::OPTED_IN);
212 filter->add_allowed_opt_in_state(copresence::OPTED_OUT);
216 void AllowOptedOutMessages(ReportRequest* request) {
217 // TODO(ckehoe): Collapse this pattern into ProcessPublish()
218 // and ProcessSubscribe() methods.
220 if (request->has_manage_messages_request()) {
221 RepeatedPtrField<PublishedMessage>* messages = request
222 ->mutable_manage_messages_request()->mutable_message_to_publish();
223 for (int i = 0; i < messages->size(); ++i) {
224 PublishedMessage* message = messages->Mutable(i);
225 if (!message->has_opt_in_state_filter())
226 message->set_allocated_opt_in_state_filter(CreateOptedInOrOutFilter());
230 if (request->has_manage_subscriptions_request()) {
231 RepeatedPtrField<Subscription>* subscriptions =
232 request->mutable_manage_subscriptions_request()->mutable_subscription();
233 for (int i = 0; i < subscriptions->size(); ++i) {
234 Subscription* subscription = subscriptions->Mutable(i);
235 if (!subscription->has_opt_in_state_filter()) {
236 subscription->set_allocated_opt_in_state_filter(
237 CreateOptedInOrOutFilter());
247 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
248 : delegate_(delegate),
249 invalid_audio_token_cache_(
250 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
252 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
253 base::Unretained(this))) {}
255 RpcHandler::~RpcHandler() {
256 for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
257 post != pending_posts_.end(); ++post) {
261 if (delegate_ && delegate_->GetWhispernetClient()) {
262 delegate_->GetWhispernetClient()->RegisterTokensCallback(
263 WhispernetClient::TokensCallback());
267 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
268 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
269 DCHECK(device_id_.empty());
271 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
273 request->mutable_device_identifiers()->mutable_registrant();
274 identity->set_type(CHROME);
275 identity->set_chrome_id(base::GenerateGUID());
277 kRegisterDeviceRpcName,
280 base::Bind(&RpcHandler::RegisterResponseHandler,
281 // On destruction, this request will be cancelled.
282 base::Unretained(this),
283 init_done_callback));
286 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
287 SendReportRequest(request.Pass(), std::string(), StatusCallback());
290 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
291 const std::string& app_id,
292 const StatusCallback& status_callback) {
293 DCHECK(request.get());
294 DCHECK(!device_id_.empty())
295 << "RpcHandler::Initialize() must complete successfully "
296 << "before other RpcHandler methods are called.";
298 DVLOG(3) << "Sending report request to server.";
300 // If we are unpublishing or unsubscribing, we need to stop those publish or
301 // subscribes right away, we don't need to wait for the server to tell us.
302 ProcessRemovedOperations(*request);
304 request->mutable_update_signals_request()->set_allocated_state(
305 GetDeviceCapabilities(*request).release());
307 AddPlayingTokens(request.get());
309 AllowOptedOutMessages(request.get());
310 SendServerRequest(kReportRequestRpcName,
313 // On destruction, this request will be cancelled.
314 base::Bind(&RpcHandler::ReportResponseHandler,
315 base::Unretained(this),
319 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
320 DCHECK(!tokens.empty());
322 scoped_ptr<ReportRequest> request(new ReportRequest);
323 for (size_t i = 0; i < tokens.size(); ++i) {
324 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
326 DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
327 AddTokenToRequest(request.get(), tokens[i]);
329 SendReportRequest(request.Pass());
332 void RpcHandler::ConnectToWhispernet() {
333 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
335 // |directive_handler_| will be destructed with us, so unretained is safe.
336 directive_handler_.reset(new DirectiveHandler);
337 directive_handler_->Initialize(
338 base::Bind(&WhispernetClient::DecodeSamples,
339 base::Unretained(whispernet_client)),
340 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
341 base::Unretained(this)));
343 whispernet_client->RegisterTokensCallback(
344 base::Bind(&RpcHandler::ReportTokens,
345 // On destruction, this callback will be disconnected.
346 base::Unretained(this)));
351 void RpcHandler::RegisterResponseHandler(
352 const SuccessCallback& init_done_callback,
353 HttpPost* completed_post,
354 int http_status_code,
355 const std::string& response_data) {
356 if (completed_post) {
357 int elements_erased = pending_posts_.erase(completed_post);
358 DCHECK(elements_erased);
359 delete completed_post;
362 if (http_status_code != net::HTTP_OK) {
363 init_done_callback.Run(false);
367 RegisterDeviceResponse response;
368 if (!response.ParseFromString(response_data)) {
369 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
370 init_done_callback.Run(false);
374 if (CopresenceErrorLogged(response.header().status()))
376 device_id_ = response.registered_device_id();
377 DCHECK(!device_id_.empty());
378 DVLOG(2) << "Device registration successful: id " << device_id_;
379 init_done_callback.Run(true);
382 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
383 HttpPost* completed_post,
384 int http_status_code,
385 const std::string& response_data) {
386 if (completed_post) {
387 int elements_erased = pending_posts_.erase(completed_post);
388 DCHECK(elements_erased);
389 delete completed_post;
392 if (http_status_code != net::HTTP_OK) {
393 if (!status_callback.is_null())
394 status_callback.Run(FAIL);
398 DVLOG(3) << "Received ReportResponse.";
399 ReportResponse response;
400 if (!response.ParseFromString(response_data)) {
401 LOG(ERROR) << "Invalid ReportResponse";
402 if (!status_callback.is_null())
403 status_callback.Run(FAIL);
407 if (ReportErrorLogged(response)) {
408 if (!status_callback.is_null())
409 status_callback.Run(FAIL);
413 const RepeatedPtrField<MessageResult>& message_results =
414 response.manage_messages_response().published_message_result();
415 for (int i = 0; i < message_results.size(); ++i) {
416 DVLOG(2) << "Published message with id "
417 << message_results.Get(i).published_message_id();
420 const RepeatedPtrField<SubscriptionResult>& subscription_results =
421 response.manage_subscriptions_response().subscription_result();
422 for (int i = 0; i < subscription_results.size(); ++i) {
423 DVLOG(2) << "Created subscription with id "
424 << subscription_results.Get(i).subscription_id();
427 if (response.has_update_signals_response()) {
428 const UpdateSignalsResponse& update_response =
429 response.update_signals_response();
430 DispatchMessages(update_response.message());
432 if (directive_handler_.get()) {
433 for (int i = 0; i < update_response.directive_size(); ++i)
434 directive_handler_->AddDirective(update_response.directive(i));
436 DVLOG(1) << "No directive handler.";
439 const RepeatedPtrField<Token>& tokens = update_response.token();
440 for (int i = 0; i < tokens.size(); ++i) {
441 switch (tokens.Get(i).status()) {
443 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
444 // short TTL (like 10s) and send it up with every report request.
445 // Then we'll still get messages while we're waiting to hear it again.
446 VLOG(1) << "Got valid token " << tokens.Get(i).id();
449 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
450 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
453 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
454 << tokens.Get(i).status();
459 // TODO(ckehoe): Return a more detailed status response.
460 if (!status_callback.is_null())
461 status_callback.Run(SUCCESS);
464 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
465 // Remove unpublishes.
466 if (request.has_manage_messages_request()) {
467 const RepeatedPtrField<std::string>& unpublishes =
468 request.manage_messages_request().id_to_unpublish();
469 for (int i = 0; i < unpublishes.size(); ++i)
470 directive_handler_->RemoveDirectives(unpublishes.Get(i));
473 // Remove unsubscribes.
474 if (request.has_manage_subscriptions_request()) {
475 const RepeatedPtrField<std::string>& unsubscribes =
476 request.manage_subscriptions_request().id_to_unsubscribe();
477 for (int i = 0; i < unsubscribes.size(); ++i)
478 directive_handler_->RemoveDirectives(unsubscribes.Get(i));
482 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
483 if (!directive_handler_)
486 const std::string& audible_token = directive_handler_->CurrentAudibleToken();
487 const std::string& inaudible_token =
488 directive_handler_->CurrentInaudibleToken();
490 if (!audible_token.empty())
491 AddTokenToRequest(request, AudioToken(audible_token, true));
492 if (!inaudible_token.empty())
493 AddTokenToRequest(request, AudioToken(inaudible_token, false));
496 void RpcHandler::DispatchMessages(
497 const RepeatedPtrField<SubscribedMessage>& messages) {
498 if (messages.size() == 0)
501 // Index the messages by subscription id.
502 std::map<std::string, std::vector<Message> > messages_by_subscription;
503 DVLOG(3) << "Dispatching " << messages.size() << " messages";
504 for (int m = 0; m < messages.size(); ++m) {
505 const RepeatedPtrField<std::string>& subscription_ids =
506 messages.Get(m).subscription_id();
507 for (int s = 0; s < subscription_ids.size(); ++s) {
508 messages_by_subscription[subscription_ids.Get(s)].push_back(
509 messages.Get(m).published_message());
513 // Send the messages for each subscription.
514 for (std::map<std::string, std::vector<Message> >::const_iterator
515 subscription = messages_by_subscription.begin();
516 subscription != messages_by_subscription.end();
518 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
519 // it in here and get rid of the app id registry from the main API class.
520 delegate_->HandleMessages("", subscription->first, subscription->second);
524 RequestHeader* RpcHandler::CreateRequestHeader(
525 const std::string& client_name) const {
526 RequestHeader* header = new RequestHeader;
528 header->set_allocated_framework_version(CreateVersion(
529 "Chrome", delegate_->GetPlatformVersionString()));
530 if (!client_name.empty()) {
531 header->set_allocated_client_version(
532 CreateVersion(client_name, std::string()));
534 header->set_current_time_millis(base::Time::Now().ToJsTime());
535 header->set_registered_device_id(device_id_);
537 DeviceFingerprint* fingerprint = new DeviceFingerprint;
538 fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
539 fingerprint->set_type(CHROME_PLATFORM_TYPE);
540 header->set_allocated_device_fingerprint(fingerprint);
546 void RpcHandler::SendServerRequest(
547 const std::string& rpc_name,
548 const std::string& app_id,
549 scoped_ptr<T> request,
550 const PostCleanupCallback& response_handler) {
551 request->set_allocated_header(CreateRequestHeader(app_id));
552 server_post_callback_.Run(delegate_->GetRequestContext(),
554 make_scoped_ptr<MessageLite>(request.release()),
558 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
559 const std::string& rpc_name,
560 scoped_ptr<MessageLite> request_proto,
561 const PostCleanupCallback& callback) {
562 // Create the base URL to call.
563 CommandLine* command_line = CommandLine::ForCurrentProcess();
564 const std::string copresence_server_host =
565 command_line->HasSwitch(switches::kCopresenceServer) ?
566 command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
567 kDefaultCopresenceServer;
569 // Create the request and keep a pointer until it completes.
570 HttpPost* http_post = new HttpPost(
572 copresence_server_host,
574 command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
575 delegate_->GetAPIKey(),
578 http_post->Start(base::Bind(callback, http_post));
579 pending_posts_.insert(http_post);
582 void RpcHandler::AudioDirectiveListToWhispernetConnector(
583 const std::string& token,
585 const WhispernetClient::SamplesCallback& samples_callback) {
586 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
587 if (whispernet_client) {
588 whispernet_client->RegisterSamplesCallback(samples_callback);
589 whispernet_client->EncodeToken(token, audible);
593 } // namespace copresence