72c4f906bc4a42d7789fbd9321a699c296a92278
[platform/framework/web/crosswalk.git] / src / components / copresence / rpc / rpc_handler.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/copresence/rpc/rpc_handler.h"
6
7 #include <map>
8
9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14 #include "base/time/time.h"
15 #include "components/copresence/copresence_switches.h"
16 #include "components/copresence/handlers/directive_handler.h"
17 #include "components/copresence/proto/codes.pb.h"
18 #include "components/copresence/proto/data.pb.h"
19 #include "components/copresence/proto/rpcs.pb.h"
20 #include "components/copresence/public/copresence_client_delegate.h"
21 #include "net/http/http_status_code.h"
22
23 // TODO(ckehoe): Return error messages for bad requests.
24
25 namespace copresence {
26
27 using google::protobuf::MessageLite;
28 using google::protobuf::RepeatedPtrField;
29
30 const char RpcHandler::kReportRequestRpcName[] = "report";
31
32 namespace {
33
34 // UrlSafe is defined as:
35 // '/' represented by a '_' and '+' represented by a '-'
36 // TODO(rkc): Move this to the wrapper.
37 std::string ToUrlSafe(std::string token) {
38   base::ReplaceChars(token, "+", "-", &token);
39   base::ReplaceChars(token, "/", "_", &token);
40   return token;
41 }
42
43 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000;  // 10 minutes.
44 const int kMaxInvalidTokens = 10000;
45 const char kRegisterDeviceRpcName[] = "registerdevice";
46 const char kDefaultCopresenceServer[] =
47     "https://www.googleapis.com/copresence/v2/copresence";
48
49 // Logging
50
51 // Checks for a copresence error. If there is one, logs it and returns true.
52 bool CopresenceErrorLogged(const Status& status) {
53   if (status.code() != OK) {
54     LOG(ERROR) << "Copresence error code " << status.code()
55                << (status.message().empty() ? std::string() :
56                   ": " + status.message());
57   }
58   return status.code() != OK;
59 }
60
61 void LogIfErrorStatus(const util::error::Code& code,
62                       const std::string& context) {
63   LOG_IF(ERROR, code != util::error::OK)
64       << context << " error " << code << ". See "
65       << "cs/google3/util/task/codes.proto for more info.";
66 }
67
68 // If any errors occurred, logs them and returns true.
69 bool ReportErrorLogged(const ReportResponse& response) {
70   bool result = CopresenceErrorLogged(response.header().status());
71
72   // The Report fails or succeeds as a unit. If any responses had errors,
73   // the header will too. Thus we don't need to propagate individual errors.
74   if (response.has_update_signals_response())
75     LogIfErrorStatus(response.update_signals_response().status(), "Update");
76   if (response.has_manage_messages_response())
77     LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
78   if (response.has_manage_subscriptions_response()) {
79     LogIfErrorStatus(response.manage_subscriptions_response().status(),
80                      "Subscribe");
81   }
82
83   return result;
84 }
85
86 // Request construction
87 // TODO(ckehoe): Move these into a separate file?
88
89 template <typename T>
90 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
91   if (msg.has_token_exchange_strategy() &&
92       msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
93     return msg.token_exchange_strategy().broadcast_scan_configuration();
94   }
95   return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
96 }
97
98 // This method will extract token exchange strategies
99 // from the publishes and subscribes in a report request.
100 // TODO(ckehoe): Delete this when the server supports
101 // BroadcastScanConfiguration.
102 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
103     const ReportRequest& request) {
104   bool broadcast_only = false;
105   bool scan_only = false;
106
107   // Strategies for publishes.
108   if (request.has_manage_messages_request()) {
109     const RepeatedPtrField<PublishedMessage>& messages =
110         request.manage_messages_request().message_to_publish();
111     for (int i = 0; i < messages.size(); ++i) {
112       BroadcastScanConfiguration config =
113           GetBroadcastScanConfig(messages.Get(i));
114       broadcast_only = broadcast_only || config == BROADCAST_ONLY;
115       scan_only = scan_only || config == SCAN_ONLY;
116       if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
117         return BROADCAST_AND_SCAN;
118     }
119   }
120
121   // Strategies for subscriptions.
122   if (request.has_manage_subscriptions_request()) {
123     const RepeatedPtrField<Subscription> subscriptions =
124         request.manage_subscriptions_request().subscription();
125     for (int i = 0; i < subscriptions.size(); ++i) {
126       BroadcastScanConfiguration config =
127           GetBroadcastScanConfig(subscriptions.Get(i));
128       broadcast_only = broadcast_only || config == BROADCAST_ONLY;
129       scan_only = scan_only || config == SCAN_ONLY;
130       if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
131         return BROADCAST_AND_SCAN;
132     }
133   }
134
135   if (broadcast_only)
136     return BROADCAST_ONLY;
137   if (scan_only)
138     return SCAN_ONLY;
139
140   // If nothing else is specified, default to both broadcast and scan.
141   return BROADCAST_AND_SCAN;
142 }
143
144 // TODO(rkc): Fix this hack once the server supports setting strategies per
145 // operation.
146 bool ExtractIsAudibleStrategy(const ReportRequest& request) {
147   if (request.has_manage_messages_request()) {
148     const RepeatedPtrField<PublishedMessage> messages =
149         request.manage_messages_request().message_to_publish();
150     for (int i = 0; i < messages.size(); ++i) {
151       const PublishedMessage& msg = messages.Get(i);
152       if (msg.has_token_exchange_strategy() &&
153           msg.token_exchange_strategy().has_use_audible() &&
154           msg.token_exchange_strategy().use_audible()) {
155         return true;
156       }
157     }
158   }
159   return false;
160 }
161
162 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
163   scoped_ptr<DeviceState> state(new DeviceState);
164
165 // TODO(ckehoe): Currently this code causes a linker error on Windows.
166 #ifndef OS_WIN
167   TokenTechnology* token_technology =
168       state->mutable_capabilities()->add_token_technology();
169   token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
170   if (ExtractIsAudibleStrategy(request))
171     token_technology->set_medium(AUDIO_AUDIBLE_DTMF);
172
173   BroadcastScanConfiguration config =
174       ExtractTokenExchangeStrategy(request);
175   if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
176     token_technology->add_instruction_type(TRANSMIT);
177   if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
178     token_technology->add_instruction_type(RECEIVE);
179 #endif
180
181   return state.Pass();
182 }
183
184 // TODO(ckehoe): We're keeping this code in a separate function for now
185 // because we get a version string from Chrome, but the proto expects
186 // an int64 version. We should probably change the version proto
187 // to handle a more detailed version.
188 ClientVersion* CreateVersion(const std::string& client,
189                              const std::string& version_name) {
190   ClientVersion* version = new ClientVersion;
191
192   version->set_client(client);
193   version->set_version_name(version_name);
194
195   return version;
196 }
197
198 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
199   TokenObservation* token_observation =
200       request->mutable_update_signals_request()->add_token_observation();
201   token_observation->set_token_id(ToUrlSafe(token.token));
202
203   TokenSignals* signals = token_observation->add_signals();
204   signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
205                                     : AUDIO_ULTRASOUND_PASSBAND);
206   signals->set_observed_time_millis(base::Time::Now().ToJsTime());
207 }
208
209 OptInStateFilter* CreateOptedInOrOutFilter() {
210   OptInStateFilter* filter = new OptInStateFilter;
211   filter->add_allowed_opt_in_state(copresence::OPTED_IN);
212   filter->add_allowed_opt_in_state(copresence::OPTED_OUT);
213   return filter;
214 }
215
216 void AllowOptedOutMessages(ReportRequest* request) {
217   // TODO(ckehoe): Collapse this pattern into ProcessPublish()
218   // and ProcessSubscribe() methods.
219
220   if (request->has_manage_messages_request()) {
221     RepeatedPtrField<PublishedMessage>* messages = request
222         ->mutable_manage_messages_request()->mutable_message_to_publish();
223     for (int i = 0; i < messages->size(); ++i) {
224       PublishedMessage* message = messages->Mutable(i);
225       if (!message->has_opt_in_state_filter())
226         message->set_allocated_opt_in_state_filter(CreateOptedInOrOutFilter());
227     }
228   }
229
230   if (request->has_manage_subscriptions_request()) {
231     RepeatedPtrField<Subscription>* subscriptions =
232         request->mutable_manage_subscriptions_request()->mutable_subscription();
233     for (int i = 0; i < subscriptions->size(); ++i) {
234       Subscription* subscription = subscriptions->Mutable(i);
235       if (!subscription->has_opt_in_state_filter()) {
236         subscription->set_allocated_opt_in_state_filter(
237             CreateOptedInOrOutFilter());
238       }
239     }
240   }
241 }
242
243 }  // namespace
244
245 // Public methods
246
247 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
248     : delegate_(delegate),
249       invalid_audio_token_cache_(
250           base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
251           kMaxInvalidTokens),
252       server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
253                                        base::Unretained(this))) {}
254
255 RpcHandler::~RpcHandler() {
256   for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
257        post != pending_posts_.end(); ++post) {
258     delete *post;
259   }
260
261   if (delegate_ && delegate_->GetWhispernetClient()) {
262     delegate_->GetWhispernetClient()->RegisterTokensCallback(
263         WhispernetClient::TokensCallback());
264   }
265 }
266
267 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
268   scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
269   DCHECK(device_id_.empty());
270
271   request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
272   Identity* identity =
273       request->mutable_device_identifiers()->mutable_registrant();
274   identity->set_type(CHROME);
275   identity->set_chrome_id(base::GenerateGUID());
276   SendServerRequest(
277       kRegisterDeviceRpcName,
278       std::string(),
279       request.Pass(),
280       base::Bind(&RpcHandler::RegisterResponseHandler,
281                  // On destruction, this request will be cancelled.
282                  base::Unretained(this),
283                  init_done_callback));
284 }
285
286 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
287   SendReportRequest(request.Pass(), std::string(), StatusCallback());
288 }
289
290 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
291                                    const std::string& app_id,
292                                    const StatusCallback& status_callback) {
293   DCHECK(request.get());
294   DCHECK(!device_id_.empty())
295       << "RpcHandler::Initialize() must complete successfully "
296       << "before other RpcHandler methods are called.";
297
298   DVLOG(3) << "Sending report request to server.";
299
300   // If we are unpublishing or unsubscribing, we need to stop those publish or
301   // subscribes right away, we don't need to wait for the server to tell us.
302   ProcessRemovedOperations(*request);
303
304   request->mutable_update_signals_request()->set_allocated_state(
305       GetDeviceCapabilities(*request).release());
306
307   AddPlayingTokens(request.get());
308
309   AllowOptedOutMessages(request.get());
310   SendServerRequest(kReportRequestRpcName,
311                     app_id,
312                     request.Pass(),
313                     // On destruction, this request will be cancelled.
314                     base::Bind(&RpcHandler::ReportResponseHandler,
315                                base::Unretained(this),
316                                status_callback));
317 }
318
319 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
320   DCHECK(!tokens.empty());
321
322   scoped_ptr<ReportRequest> request(new ReportRequest);
323   for (size_t i = 0; i < tokens.size(); ++i) {
324     if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
325       continue;
326     DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
327     AddTokenToRequest(request.get(), tokens[i]);
328   }
329   SendReportRequest(request.Pass());
330 }
331
332 void RpcHandler::ConnectToWhispernet() {
333   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
334
335   // |directive_handler_| will be destructed with us, so unretained is safe.
336   directive_handler_.reset(new DirectiveHandler);
337   directive_handler_->Initialize(
338       base::Bind(&WhispernetClient::DecodeSamples,
339                  base::Unretained(whispernet_client)),
340       base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
341                  base::Unretained(this)));
342
343   whispernet_client->RegisterTokensCallback(
344       base::Bind(&RpcHandler::ReportTokens,
345                  // On destruction, this callback will be disconnected.
346                  base::Unretained(this)));
347 }
348
349 // Private methods
350
351 void RpcHandler::RegisterResponseHandler(
352     const SuccessCallback& init_done_callback,
353     HttpPost* completed_post,
354     int http_status_code,
355     const std::string& response_data) {
356   if (completed_post) {
357     int elements_erased = pending_posts_.erase(completed_post);
358     DCHECK(elements_erased);
359     delete completed_post;
360   }
361
362   if (http_status_code != net::HTTP_OK) {
363     init_done_callback.Run(false);
364     return;
365   }
366
367   RegisterDeviceResponse response;
368   if (!response.ParseFromString(response_data)) {
369     LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
370     init_done_callback.Run(false);
371     return;
372   }
373
374   if (CopresenceErrorLogged(response.header().status()))
375     return;
376   device_id_ = response.registered_device_id();
377   DCHECK(!device_id_.empty());
378   DVLOG(2) << "Device registration successful: id " << device_id_;
379   init_done_callback.Run(true);
380 }
381
382 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
383                                        HttpPost* completed_post,
384                                        int http_status_code,
385                                        const std::string& response_data) {
386   if (completed_post) {
387     int elements_erased = pending_posts_.erase(completed_post);
388     DCHECK(elements_erased);
389     delete completed_post;
390   }
391
392   if (http_status_code != net::HTTP_OK) {
393     if (!status_callback.is_null())
394       status_callback.Run(FAIL);
395     return;
396   }
397
398   DVLOG(3) << "Received ReportResponse.";
399   ReportResponse response;
400   if (!response.ParseFromString(response_data)) {
401     LOG(ERROR) << "Invalid ReportResponse";
402     if (!status_callback.is_null())
403       status_callback.Run(FAIL);
404     return;
405   }
406
407   if (ReportErrorLogged(response)) {
408     if (!status_callback.is_null())
409       status_callback.Run(FAIL);
410     return;
411   }
412
413   const RepeatedPtrField<MessageResult>& message_results =
414       response.manage_messages_response().published_message_result();
415   for (int i = 0; i < message_results.size(); ++i) {
416     DVLOG(2) << "Published message with id "
417              << message_results.Get(i).published_message_id();
418   }
419
420   const RepeatedPtrField<SubscriptionResult>& subscription_results =
421       response.manage_subscriptions_response().subscription_result();
422   for (int i = 0; i < subscription_results.size(); ++i) {
423     DVLOG(2) << "Created subscription with id "
424              << subscription_results.Get(i).subscription_id();
425   }
426
427   if (response.has_update_signals_response()) {
428     const UpdateSignalsResponse& update_response =
429         response.update_signals_response();
430     DispatchMessages(update_response.message());
431
432     if (directive_handler_.get()) {
433       for (int i = 0; i < update_response.directive_size(); ++i)
434         directive_handler_->AddDirective(update_response.directive(i));
435     } else {
436       DVLOG(1) << "No directive handler.";
437     }
438
439     const RepeatedPtrField<Token>& tokens = update_response.token();
440     for (int i = 0; i < tokens.size(); ++i) {
441       switch (tokens.Get(i).status()) {
442         case VALID:
443           // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
444           // short TTL (like 10s) and send it up with every report request.
445           // Then we'll still get messages while we're waiting to hear it again.
446           VLOG(1) << "Got valid token " << tokens.Get(i).id();
447           break;
448         case INVALID:
449           DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
450           invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
451           break;
452         default:
453           DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
454                    << tokens.Get(i).status();
455       }
456     }
457   }
458
459   // TODO(ckehoe): Return a more detailed status response.
460   if (!status_callback.is_null())
461     status_callback.Run(SUCCESS);
462 }
463
464 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
465   // Remove unpublishes.
466   if (request.has_manage_messages_request()) {
467     const RepeatedPtrField<std::string>& unpublishes =
468         request.manage_messages_request().id_to_unpublish();
469     for (int i = 0; i < unpublishes.size(); ++i)
470       directive_handler_->RemoveDirectives(unpublishes.Get(i));
471   }
472
473   // Remove unsubscribes.
474   if (request.has_manage_subscriptions_request()) {
475     const RepeatedPtrField<std::string>& unsubscribes =
476         request.manage_subscriptions_request().id_to_unsubscribe();
477     for (int i = 0; i < unsubscribes.size(); ++i)
478       directive_handler_->RemoveDirectives(unsubscribes.Get(i));
479   }
480 }
481
482 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
483   if (!directive_handler_)
484     return;
485
486   const std::string& audible_token = directive_handler_->CurrentAudibleToken();
487   const std::string& inaudible_token =
488       directive_handler_->CurrentInaudibleToken();
489
490   if (!audible_token.empty())
491     AddTokenToRequest(request, AudioToken(audible_token, true));
492   if (!inaudible_token.empty())
493     AddTokenToRequest(request, AudioToken(inaudible_token, false));
494 }
495
496 void RpcHandler::DispatchMessages(
497     const RepeatedPtrField<SubscribedMessage>& messages) {
498   if (messages.size() == 0)
499     return;
500
501   // Index the messages by subscription id.
502   std::map<std::string, std::vector<Message> > messages_by_subscription;
503   DVLOG(3) << "Dispatching " << messages.size() << " messages";
504   for (int m = 0; m < messages.size(); ++m) {
505     const RepeatedPtrField<std::string>& subscription_ids =
506         messages.Get(m).subscription_id();
507     for (int s = 0; s < subscription_ids.size(); ++s) {
508       messages_by_subscription[subscription_ids.Get(s)].push_back(
509           messages.Get(m).published_message());
510     }
511   }
512
513   // Send the messages for each subscription.
514   for (std::map<std::string, std::vector<Message> >::const_iterator
515            subscription = messages_by_subscription.begin();
516        subscription != messages_by_subscription.end();
517        ++subscription) {
518     // TODO(ckehoe): Once we have the app ID from the server, we need to pass
519     // it in here and get rid of the app id registry from the main API class.
520     delegate_->HandleMessages("", subscription->first, subscription->second);
521   }
522 }
523
524 RequestHeader* RpcHandler::CreateRequestHeader(
525     const std::string& client_name) const {
526   RequestHeader* header = new RequestHeader;
527
528   header->set_allocated_framework_version(CreateVersion(
529       "Chrome", delegate_->GetPlatformVersionString()));
530   if (!client_name.empty()) {
531     header->set_allocated_client_version(
532         CreateVersion(client_name, std::string()));
533   }
534   header->set_current_time_millis(base::Time::Now().ToJsTime());
535   header->set_registered_device_id(device_id_);
536
537   DeviceFingerprint* fingerprint = new DeviceFingerprint;
538   fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
539   fingerprint->set_type(CHROME_PLATFORM_TYPE);
540   header->set_allocated_device_fingerprint(fingerprint);
541
542   return header;
543 }
544
545 template <class T>
546 void RpcHandler::SendServerRequest(
547     const std::string& rpc_name,
548     const std::string& app_id,
549     scoped_ptr<T> request,
550     const PostCleanupCallback& response_handler) {
551   request->set_allocated_header(CreateRequestHeader(app_id));
552   server_post_callback_.Run(delegate_->GetRequestContext(),
553                             rpc_name,
554                             make_scoped_ptr<MessageLite>(request.release()),
555                             response_handler);
556 }
557
558 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
559                               const std::string& rpc_name,
560                               scoped_ptr<MessageLite> request_proto,
561                               const PostCleanupCallback& callback) {
562   // Create the base URL to call.
563   CommandLine* command_line = CommandLine::ForCurrentProcess();
564   const std::string copresence_server_host =
565       command_line->HasSwitch(switches::kCopresenceServer) ?
566       command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
567       kDefaultCopresenceServer;
568
569   // Create the request and keep a pointer until it completes.
570   HttpPost* http_post = new HttpPost(
571       url_context_getter,
572       copresence_server_host,
573       rpc_name,
574       command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
575       delegate_->GetAPIKey(),
576       *request_proto);
577
578   http_post->Start(base::Bind(callback, http_post));
579   pending_posts_.insert(http_post);
580 }
581
582 void RpcHandler::AudioDirectiveListToWhispernetConnector(
583     const std::string& token,
584     bool audible,
585     const WhispernetClient::SamplesCallback& samples_callback) {
586   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
587   if (whispernet_client) {
588     whispernet_client->RegisterSamplesCallback(samples_callback);
589     whispernet_client->EncodeToken(token, audible);
590   }
591 }
592
593 }  // namespace copresence