Upstream version 10.38.220.0
[platform/framework/web/crosswalk.git] / src / components / copresence / rpc / rpc_handler.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/copresence/rpc/rpc_handler.h"
6
7 #include <map>
8
9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14
15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
17 // we fix this with an #undef.
18 #include "base/time/time.h"
19 #if defined(OS_WIN)
20 #undef DeviceCapabilities
21 #endif
22
23 #include "components/copresence/copresence_switches.h"
24 #include "components/copresence/handlers/directive_handler.h"
25 #include "components/copresence/proto/codes.pb.h"
26 #include "components/copresence/proto/data.pb.h"
27 #include "components/copresence/proto/rpcs.pb.h"
28 #include "components/copresence/public/copresence_client_delegate.h"
29 #include "net/http/http_status_code.h"
30
31 // TODO(ckehoe): Return error messages for bad requests.
32
33 namespace copresence {
34
35 using google::protobuf::MessageLite;
36 using google::protobuf::RepeatedPtrField;
37
38 const char RpcHandler::kReportRequestRpcName[] = "report";
39
40 namespace {
41
42 // UrlSafe is defined as:
43 // '/' represented by a '_' and '+' represented by a '-'
44 // TODO(rkc): Move this to the wrapper.
45 std::string ToUrlSafe(std::string token) {
46   base::ReplaceChars(token, "+", "-", &token);
47   base::ReplaceChars(token, "/", "_", &token);
48   return token;
49 }
50
51 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000;  // 10 minutes.
52 const int kMaxInvalidTokens = 10000;
53 const char kRegisterDeviceRpcName[] = "registerdevice";
54 const char kDefaultCopresenceServer[] =
55     "https://www.googleapis.com/copresence/v2/copresence";
56
57 // Logging
58
59 // Checks for a copresence error. If there is one, logs it and returns true.
60 bool CopresenceErrorLogged(const Status& status) {
61   if (status.code() != OK) {
62     LOG(ERROR) << "Copresence error code " << status.code()
63                << (status.message().empty() ? std::string() :
64                   ": " + status.message());
65   }
66   return status.code() != OK;
67 }
68
69 void LogIfErrorStatus(const util::error::Code& code,
70                       const std::string& context) {
71   LOG_IF(ERROR, code != util::error::OK)
72       << context << " error " << code << ". See "
73       << "cs/google3/util/task/codes.proto for more info.";
74 }
75
76 // If any errors occurred, logs them and returns true.
77 bool ReportErrorLogged(const ReportResponse& response) {
78   bool result = CopresenceErrorLogged(response.header().status());
79
80   // The Report fails or succeeds as a unit. If any responses had errors,
81   // the header will too. Thus we don't need to propagate individual errors.
82   if (response.has_update_signals_response())
83     LogIfErrorStatus(response.update_signals_response().status(), "Update");
84   if (response.has_manage_messages_response())
85     LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
86   if (response.has_manage_subscriptions_response()) {
87     LogIfErrorStatus(response.manage_subscriptions_response().status(),
88                      "Subscribe");
89   }
90
91   return result;
92 }
93
94 // Request construction
95 // TODO(ckehoe): Move these into a separate file?
96
97 template <typename T>
98 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
99   if (msg.has_token_exchange_strategy() &&
100       msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
101     return msg.token_exchange_strategy().broadcast_scan_configuration();
102   }
103   return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
104 }
105
106 // This method will extract token exchange strategies
107 // from the publishes and subscribes in a report request.
108 // TODO(ckehoe): Delete this when the server supports
109 // BroadcastScanConfiguration.
110 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
111     const ReportRequest& request) {
112   bool broadcast_only = false;
113   bool scan_only = false;
114
115   // Strategies for publishes.
116   if (request.has_manage_messages_request()) {
117     const RepeatedPtrField<PublishedMessage>& messages =
118         request.manage_messages_request().message_to_publish();
119     for (int i = 0; i < messages.size(); ++i) {
120       BroadcastScanConfiguration config =
121           GetBroadcastScanConfig(messages.Get(i));
122       broadcast_only = broadcast_only || config == BROADCAST_ONLY;
123       scan_only = scan_only || config == SCAN_ONLY;
124       if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
125         return BROADCAST_AND_SCAN;
126     }
127   }
128
129   // Strategies for subscriptions.
130   if (request.has_manage_subscriptions_request()) {
131     const RepeatedPtrField<Subscription> subscriptions =
132         request.manage_subscriptions_request().subscription();
133     for (int i = 0; i < subscriptions.size(); ++i) {
134       BroadcastScanConfiguration config =
135           GetBroadcastScanConfig(subscriptions.Get(i));
136       broadcast_only = broadcast_only || config == BROADCAST_ONLY;
137       scan_only = scan_only || config == SCAN_ONLY;
138       if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
139         return BROADCAST_AND_SCAN;
140     }
141   }
142
143   if (broadcast_only)
144     return BROADCAST_ONLY;
145   if (scan_only)
146     return SCAN_ONLY;
147
148   // If nothing else is specified, default to both broadcast and scan.
149   return BROADCAST_AND_SCAN;
150 }
151
152 // TODO(rkc): Fix this hack once the server supports setting strategies per
153 // operation.
154 bool ExtractIsAudibleStrategy(const ReportRequest& request) {
155   if (request.has_manage_messages_request()) {
156     const RepeatedPtrField<PublishedMessage> messages =
157         request.manage_messages_request().message_to_publish();
158     for (int i = 0; i < messages.size(); ++i) {
159       const PublishedMessage& msg = messages.Get(i);
160       if (msg.has_token_exchange_strategy() &&
161           msg.token_exchange_strategy().has_use_audible() &&
162           msg.token_exchange_strategy().use_audible()) {
163         return true;
164       }
165     }
166   }
167   return false;
168 }
169
170 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
171   scoped_ptr<DeviceState> state(new DeviceState);
172
173   TokenTechnology* token_technology =
174       state->mutable_capabilities()->add_token_technology();
175   token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
176   if (ExtractIsAudibleStrategy(request))
177     token_technology->set_medium(AUDIO_AUDIBLE_DTMF);
178
179   BroadcastScanConfiguration config =
180       ExtractTokenExchangeStrategy(request);
181   if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
182     token_technology->add_instruction_type(TRANSMIT);
183   if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
184     token_technology->add_instruction_type(RECEIVE);
185
186   return state.Pass();
187 }
188
189 // TODO(ckehoe): We're keeping this code in a separate function for now
190 // because we get a version string from Chrome, but the proto expects
191 // an int64 version. We should probably change the version proto
192 // to handle a more detailed version.
193 ClientVersion* CreateVersion(const std::string& client,
194                              const std::string& version_name) {
195   ClientVersion* version = new ClientVersion;
196
197   version->set_client(client);
198   version->set_version_name(version_name);
199
200   return version;
201 }
202
203 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
204   TokenObservation* token_observation =
205       request->mutable_update_signals_request()->add_token_observation();
206   token_observation->set_token_id(ToUrlSafe(token.token));
207
208   TokenSignals* signals = token_observation->add_signals();
209   signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
210                                     : AUDIO_ULTRASOUND_PASSBAND);
211   signals->set_observed_time_millis(base::Time::Now().ToJsTime());
212 }
213
214 }  // namespace
215
216 // Public methods
217
218 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
219     : delegate_(delegate),
220       invalid_audio_token_cache_(
221           base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
222           kMaxInvalidTokens),
223       server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
224                                        base::Unretained(this))) {}
225
226 RpcHandler::~RpcHandler() {
227   for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
228        post != pending_posts_.end(); ++post) {
229     delete *post;
230   }
231
232   if (delegate_ && delegate_->GetWhispernetClient()) {
233     delegate_->GetWhispernetClient()->RegisterTokensCallback(
234         WhispernetClient::TokensCallback());
235   }
236 }
237
238 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
239   scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
240   DCHECK(device_id_.empty());
241
242   request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
243   Identity* identity =
244       request->mutable_device_identifiers()->mutable_registrant();
245   identity->set_type(CHROME);
246   identity->set_chrome_id(base::GenerateGUID());
247   SendServerRequest(
248       kRegisterDeviceRpcName,
249       std::string(),
250       request.Pass(),
251       base::Bind(&RpcHandler::RegisterResponseHandler,
252                  // On destruction, this request will be cancelled.
253                  base::Unretained(this),
254                  init_done_callback));
255 }
256
257 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
258   SendReportRequest(request.Pass(), std::string(), StatusCallback());
259 }
260
261 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
262                                    const std::string& app_id,
263                                    const StatusCallback& status_callback) {
264   DCHECK(request.get());
265   DCHECK(!device_id_.empty())
266       << "RpcHandler::Initialize() must complete successfully "
267       << "before other RpcHandler methods are called.";
268
269   DVLOG(3) << "Sending report request to server.";
270
271   // If we are unpublishing or unsubscribing, we need to stop those publish or
272   // subscribes right away, we don't need to wait for the server to tell us.
273   ProcessRemovedOperations(*request);
274
275   request->mutable_update_signals_request()->set_allocated_state(
276       GetDeviceCapabilities(*request).release());
277
278   AddPlayingTokens(request.get());
279
280   // TODO(ckehoe): Currently the server supports only BROADCAST_AND_SCAN.
281   // Remove this once b/16715253 is fixed.
282   if (request->has_manage_messages_request()) {
283     RepeatedPtrField<PublishedMessage>* messages = request
284         ->mutable_manage_messages_request()->mutable_message_to_publish();
285     for (int i = 0; i < messages->size(); ++i) {
286       messages->Mutable(i)->mutable_token_exchange_strategy()
287           ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN);
288     }
289   }
290   if (request->has_manage_subscriptions_request()) {
291     RepeatedPtrField<Subscription>* subscriptions =
292         request->mutable_manage_subscriptions_request()->mutable_subscription();
293     for (int i = 0; i < subscriptions->size(); ++i) {
294       subscriptions->Mutable(i)->mutable_token_exchange_strategy()
295           ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN);
296     }
297   }
298
299   SendServerRequest(kReportRequestRpcName,
300                     app_id,
301                     request.Pass(),
302                     // On destruction, this request will be cancelled.
303                     base::Bind(&RpcHandler::ReportResponseHandler,
304                                base::Unretained(this),
305                                status_callback));
306 }
307
308 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
309   DCHECK(!tokens.empty());
310
311   scoped_ptr<ReportRequest> request(new ReportRequest);
312   for (size_t i = 0; i < tokens.size(); ++i) {
313     if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
314       continue;
315     DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
316     AddTokenToRequest(request.get(), tokens[i]);
317   }
318   SendReportRequest(request.Pass());
319 }
320
321 void RpcHandler::ConnectToWhispernet() {
322   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
323
324   // |directive_handler_| will be destructed with us, so unretained is safe.
325   directive_handler_.reset(new DirectiveHandler);
326   directive_handler_->Initialize(
327       base::Bind(&WhispernetClient::DecodeSamples,
328                  base::Unretained(whispernet_client)),
329       base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
330                  base::Unretained(this)));
331
332   whispernet_client->RegisterTokensCallback(
333       base::Bind(&RpcHandler::ReportTokens,
334                  // On destruction, this callback will be disconnected.
335                  base::Unretained(this)));
336 }
337
338 // Private methods
339
340 void RpcHandler::RegisterResponseHandler(
341     const SuccessCallback& init_done_callback,
342     HttpPost* completed_post,
343     int http_status_code,
344     const std::string& response_data) {
345   if (completed_post) {
346     int elements_erased = pending_posts_.erase(completed_post);
347     DCHECK(elements_erased);
348     delete completed_post;
349   }
350
351   if (http_status_code != net::HTTP_OK) {
352     init_done_callback.Run(false);
353     return;
354   }
355
356   RegisterDeviceResponse response;
357   if (!response.ParseFromString(response_data)) {
358     LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
359     init_done_callback.Run(false);
360     return;
361   }
362
363   if (CopresenceErrorLogged(response.header().status()))
364     return;
365   device_id_ = response.registered_device_id();
366   DCHECK(!device_id_.empty());
367   DVLOG(2) << "Device registration successful: id " << device_id_;
368   init_done_callback.Run(true);
369 }
370
371 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
372                                        HttpPost* completed_post,
373                                        int http_status_code,
374                                        const std::string& response_data) {
375   if (completed_post) {
376     int elements_erased = pending_posts_.erase(completed_post);
377     DCHECK(elements_erased);
378     delete completed_post;
379   }
380
381   if (http_status_code != net::HTTP_OK) {
382     if (!status_callback.is_null())
383       status_callback.Run(FAIL);
384     return;
385   }
386
387   DVLOG(3) << "Received ReportResponse.";
388   ReportResponse response;
389   if (!response.ParseFromString(response_data)) {
390     LOG(ERROR) << "Invalid ReportResponse";
391     if (!status_callback.is_null())
392       status_callback.Run(FAIL);
393     return;
394   }
395
396   if (ReportErrorLogged(response)) {
397     if (!status_callback.is_null())
398       status_callback.Run(FAIL);
399     return;
400   }
401
402   const RepeatedPtrField<MessageResult>& message_results =
403       response.manage_messages_response().published_message_result();
404   for (int i = 0; i < message_results.size(); ++i) {
405     DVLOG(2) << "Published message with id "
406              << message_results.Get(i).published_message_id();
407   }
408
409   const RepeatedPtrField<SubscriptionResult>& subscription_results =
410       response.manage_subscriptions_response().subscription_result();
411   for (int i = 0; i < subscription_results.size(); ++i) {
412     DVLOG(2) << "Created subscription with id "
413              << subscription_results.Get(i).subscription_id();
414   }
415
416   if (response.has_update_signals_response()) {
417     const UpdateSignalsResponse& update_response =
418         response.update_signals_response();
419     DispatchMessages(update_response.message());
420
421     if (directive_handler_.get()) {
422       for (int i = 0; i < update_response.directive_size(); ++i)
423         directive_handler_->AddDirective(update_response.directive(i));
424     } else {
425       DVLOG(1) << "No directive handler.";
426     }
427
428     const RepeatedPtrField<Token>& tokens = update_response.token();
429     for (int i = 0; i < tokens.size(); ++i) {
430       switch (tokens.Get(i).status()) {
431         case VALID:
432           // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
433           // short TTL (like 10s) and send it up with every report request.
434           // Then we'll still get messages while we're waiting to hear it again.
435           VLOG(1) << "Got valid token " << tokens.Get(i).id();
436           break;
437         case INVALID:
438           DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
439           invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
440           break;
441         default:
442           DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
443                    << tokens.Get(i).status();
444       }
445     }
446   }
447
448   // TODO(ckehoe): Return a more detailed status response.
449   if (!status_callback.is_null())
450     status_callback.Run(SUCCESS);
451 }
452
453 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
454   // Remove unpublishes.
455   if (request.has_manage_messages_request()) {
456     const RepeatedPtrField<std::string>& unpublishes =
457         request.manage_messages_request().id_to_unpublish();
458     for (int i = 0; i < unpublishes.size(); ++i)
459       directive_handler_->RemoveDirectives(unpublishes.Get(i));
460   }
461
462   // Remove unsubscribes.
463   if (request.has_manage_subscriptions_request()) {
464     const RepeatedPtrField<std::string>& unsubscribes =
465         request.manage_subscriptions_request().id_to_unsubscribe();
466     for (int i = 0; i < unsubscribes.size(); ++i)
467       directive_handler_->RemoveDirectives(unsubscribes.Get(i));
468   }
469 }
470
471 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
472   if (!directive_handler_)
473     return;
474
475   const std::string& audible_token = directive_handler_->CurrentAudibleToken();
476   const std::string& inaudible_token =
477       directive_handler_->CurrentInaudibleToken();
478
479   if (!audible_token.empty())
480     AddTokenToRequest(request, AudioToken(audible_token, true));
481   if (!inaudible_token.empty())
482     AddTokenToRequest(request, AudioToken(inaudible_token, false));
483 }
484
485 void RpcHandler::DispatchMessages(
486     const RepeatedPtrField<SubscribedMessage>& messages) {
487   if (messages.size() == 0)
488     return;
489
490   // Index the messages by subscription id.
491   std::map<std::string, std::vector<Message> > messages_by_subscription;
492   DVLOG(3) << "Dispatching " << messages.size() << " messages";
493   for (int m = 0; m < messages.size(); ++m) {
494     const RepeatedPtrField<std::string>& subscription_ids =
495         messages.Get(m).subscription_id();
496     for (int s = 0; s < subscription_ids.size(); ++s) {
497       messages_by_subscription[subscription_ids.Get(s)].push_back(
498           messages.Get(m).published_message());
499     }
500   }
501
502   // Send the messages for each subscription.
503   for (std::map<std::string, std::vector<Message> >::const_iterator
504            subscription = messages_by_subscription.begin();
505        subscription != messages_by_subscription.end();
506        ++subscription) {
507     // TODO(ckehoe): Once we have the app ID from the server, we need to pass
508     // it in here and get rid of the app id registry from the main API class.
509     delegate_->HandleMessages("", subscription->first, subscription->second);
510   }
511 }
512
513 RequestHeader* RpcHandler::CreateRequestHeader(
514     const std::string& client_name) const {
515   RequestHeader* header = new RequestHeader;
516
517   header->set_allocated_framework_version(CreateVersion(
518       "Chrome", delegate_->GetPlatformVersionString()));
519   if (!client_name.empty()) {
520     header->set_allocated_client_version(
521         CreateVersion(client_name, std::string()));
522   }
523   header->set_current_time_millis(base::Time::Now().ToJsTime());
524   header->set_registered_device_id(device_id_);
525
526   DeviceFingerprint* fingerprint = new DeviceFingerprint;
527   fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
528   fingerprint->set_type(CHROME_PLATFORM_TYPE);
529   header->set_allocated_device_fingerprint(fingerprint);
530
531   return header;
532 }
533
534 template <class T>
535 void RpcHandler::SendServerRequest(
536     const std::string& rpc_name,
537     const std::string& app_id,
538     scoped_ptr<T> request,
539     const PostCleanupCallback& response_handler) {
540   request->set_allocated_header(CreateRequestHeader(app_id));
541   server_post_callback_.Run(delegate_->GetRequestContext(),
542                             rpc_name,
543                             make_scoped_ptr<MessageLite>(request.release()),
544                             response_handler);
545 }
546
547 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
548                               const std::string& rpc_name,
549                               scoped_ptr<MessageLite> request_proto,
550                               const PostCleanupCallback& callback) {
551   // Create the base URL to call.
552   CommandLine* command_line = CommandLine::ForCurrentProcess();
553   const std::string copresence_server_host =
554       command_line->HasSwitch(switches::kCopresenceServer) ?
555       command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
556       kDefaultCopresenceServer;
557
558   // Create the request and keep a pointer until it completes.
559   HttpPost* http_post = new HttpPost(
560       url_context_getter,
561       copresence_server_host,
562       rpc_name,
563       command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
564       delegate_->GetAPIKey(),
565       *request_proto);
566
567   http_post->Start(base::Bind(callback, http_post));
568   pending_posts_.insert(http_post);
569 }
570
571 void RpcHandler::AudioDirectiveListToWhispernetConnector(
572     const std::string& token,
573     bool audible,
574     const WhispernetClient::SamplesCallback& samples_callback) {
575   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
576   if (whispernet_client) {
577     whispernet_client->RegisterSamplesCallback(samples_callback);
578     whispernet_client->EncodeToken(token, audible);
579   }
580 }
581
582 }  // namespace copresence