0b60a77fa92fb53fe5e61287ec9bcd0c383d21f6
[platform/framework/web/crosswalk.git] / src / components / copresence / rpc / rpc_handler.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/copresence/rpc/rpc_handler.h"
6
7 #include <map>
8
9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14 #include "base/time/time.h"
15 #include "components/copresence/copresence_switches.h"
16 #include "components/copresence/handlers/directive_handler.h"
17 #include "components/copresence/proto/codes.pb.h"
18 #include "components/copresence/proto/data.pb.h"
19 #include "components/copresence/proto/rpcs.pb.h"
20 #include "components/copresence/public/copresence_client_delegate.h"
21 #include "net/http/http_status_code.h"
22
23 // TODO(ckehoe): Return error messages for bad requests.
24
25 namespace copresence {
26
27 using google::protobuf::MessageLite;
28 using google::protobuf::RepeatedPtrField;
29
30 const char RpcHandler::kReportRequestRpcName[] = "report";
31
32 namespace {
33
34 // UrlSafe is defined as:
35 // '/' represented by a '_' and '+' represented by a '-'
36 // TODO(rkc): Move this to the wrapper.
37 std::string ToUrlSafe(std::string token) {
38   base::ReplaceChars(token, "+", "-", &token);
39   base::ReplaceChars(token, "/", "_", &token);
40   return token;
41 }
42
43 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000;  // 10 minutes.
44 const int kMaxInvalidTokens = 10000;
45 const char kRegisterDeviceRpcName[] = "registerdevice";
46 const char kDefaultCopresenceServer[] =
47     "https://www.googleapis.com/copresence/v2/copresence";
48
49 // Logging
50
51 // Checks for a copresence error. If there is one, logs it and returns true.
52 bool CopresenceErrorLogged(const Status& status) {
53   if (status.code() != OK) {
54     LOG(ERROR) << "Copresence error code " << status.code()
55                << (status.message().empty() ? std::string() :
56                   ": " + status.message());
57   }
58   return status.code() != OK;
59 }
60
61 void LogIfErrorStatus(const util::error::Code& code,
62                       const std::string& context) {
63   LOG_IF(ERROR, code != util::error::OK)
64       << context << " error " << code << ". See "
65       << "cs/google3/util/task/codes.proto for more info.";
66 }
67
68 // If any errors occurred, logs them and returns true.
69 bool ReportErrorLogged(const ReportResponse& response) {
70   bool result = CopresenceErrorLogged(response.header().status());
71
72   // The Report fails or succeeds as a unit. If any responses had errors,
73   // the header will too. Thus we don't need to propagate individual errors.
74   if (response.has_update_signals_response())
75     LogIfErrorStatus(response.update_signals_response().status(), "Update");
76   if (response.has_manage_messages_response())
77     LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
78   if (response.has_manage_subscriptions_response()) {
79     LogIfErrorStatus(response.manage_subscriptions_response().status(),
80                      "Subscribe");
81   }
82
83   return result;
84 }
85
86 // Request construction
87 // TODO(ckehoe): Move these into a separate file?
88
89 template <typename T>
90 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
91   if (msg.has_token_exchange_strategy() &&
92       msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
93     return msg.token_exchange_strategy().broadcast_scan_configuration();
94   }
95   return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
96 }
97
98 // This method will extract token exchange strategies
99 // from the publishes and subscribes in a report request.
100 // TODO(ckehoe): Delete this when the server supports
101 // BroadcastScanConfiguration.
102 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
103     const ReportRequest& request) {
104   bool broadcast_only = false;
105   bool scan_only = false;
106
107   // Strategies for publishes.
108   if (request.has_manage_messages_request()) {
109     const RepeatedPtrField<PublishedMessage>& messages =
110         request.manage_messages_request().message_to_publish();
111     for (int i = 0; i < messages.size(); ++i) {
112       BroadcastScanConfiguration config =
113           GetBroadcastScanConfig(messages.Get(i));
114       broadcast_only = broadcast_only || config == BROADCAST_ONLY;
115       scan_only = scan_only || config == SCAN_ONLY;
116       if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
117         return BROADCAST_AND_SCAN;
118     }
119   }
120
121   // Strategies for subscriptions.
122   if (request.has_manage_subscriptions_request()) {
123     const RepeatedPtrField<Subscription> subscriptions =
124         request.manage_subscriptions_request().subscription();
125     for (int i = 0; i < subscriptions.size(); ++i) {
126       BroadcastScanConfiguration config =
127           GetBroadcastScanConfig(subscriptions.Get(i));
128       broadcast_only = broadcast_only || config == BROADCAST_ONLY;
129       scan_only = scan_only || config == SCAN_ONLY;
130       if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
131         return BROADCAST_AND_SCAN;
132     }
133   }
134
135   if (broadcast_only)
136     return BROADCAST_ONLY;
137   if (scan_only)
138     return SCAN_ONLY;
139
140   // If nothing else is specified, default to both broadcast and scan.
141   return BROADCAST_AND_SCAN;
142 }
143
144 // TODO(rkc): Fix this hack once the server supports setting strategies per
145 // operation.
146 bool ExtractIsAudibleStrategy(const ReportRequest& request) {
147   if (request.has_manage_messages_request()) {
148     const RepeatedPtrField<PublishedMessage> messages =
149         request.manage_messages_request().message_to_publish();
150     for (int i = 0; i < messages.size(); ++i) {
151       const PublishedMessage& msg = messages.Get(i);
152       if (msg.has_token_exchange_strategy() &&
153           msg.token_exchange_strategy().has_use_audible() &&
154           msg.token_exchange_strategy().use_audible()) {
155         return true;
156       }
157     }
158   }
159   return false;
160 }
161
162 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
163   scoped_ptr<DeviceState> state(new DeviceState);
164
165 // TODO(ckehoe): Currently this code causes a linker error on Windows.
166 #ifndef OS_WIN
167   TokenTechnology* token_technology =
168       state->mutable_capabilities()->add_token_technology();
169   token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
170   if (ExtractIsAudibleStrategy(request))
171     token_technology->set_medium(AUDIO_AUDIBLE_DTMF);
172
173   BroadcastScanConfiguration config =
174       ExtractTokenExchangeStrategy(request);
175   if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
176     token_technology->add_instruction_type(TRANSMIT);
177   if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
178     token_technology->add_instruction_type(RECEIVE);
179 #endif
180
181   return state.Pass();
182 }
183
184 // TODO(ckehoe): We're keeping this code in a separate function for now
185 // because we get a version string from Chrome, but the proto expects
186 // an int64 version. We should probably change the version proto
187 // to handle a more detailed version.
188 ClientVersion* CreateVersion(const std::string& client,
189                              const std::string& version_name) {
190   ClientVersion* version = new ClientVersion;
191
192   version->set_client(client);
193   version->set_version_name(version_name);
194
195   return version;
196 }
197
198 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
199   TokenObservation* token_observation =
200       request->mutable_update_signals_request()->add_token_observation();
201   token_observation->set_token_id(ToUrlSafe(token.token));
202
203   TokenSignals* signals = token_observation->add_signals();
204   signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
205                                     : AUDIO_ULTRASOUND_PASSBAND);
206   signals->set_observed_time_millis(base::Time::Now().ToJsTime());
207 }
208
209 }  // namespace
210
211 // Public methods
212
213 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
214     : delegate_(delegate),
215       invalid_audio_token_cache_(
216           base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
217           kMaxInvalidTokens),
218       server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
219                                        base::Unretained(this))) {}
220
221 RpcHandler::~RpcHandler() {
222   for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
223        post != pending_posts_.end(); ++post) {
224     delete *post;
225   }
226
227   if (delegate_ && delegate_->GetWhispernetClient()) {
228     delegate_->GetWhispernetClient()->RegisterTokensCallback(
229         WhispernetClient::TokensCallback());
230   }
231 }
232
233 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
234   scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
235   DCHECK(device_id_.empty());
236
237   request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
238   Identity* identity =
239       request->mutable_device_identifiers()->mutable_registrant();
240   identity->set_type(CHROME);
241   identity->set_chrome_id(base::GenerateGUID());
242   SendServerRequest(
243       kRegisterDeviceRpcName,
244       std::string(),
245       request.Pass(),
246       base::Bind(&RpcHandler::RegisterResponseHandler,
247                  // On destruction, this request will be cancelled.
248                  base::Unretained(this),
249                  init_done_callback));
250 }
251
252 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
253   SendReportRequest(request.Pass(), std::string(), StatusCallback());
254 }
255
256 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
257                                    const std::string& app_id,
258                                    const StatusCallback& status_callback) {
259   DCHECK(request.get());
260   DCHECK(!device_id_.empty())
261       << "RpcHandler::Initialize() must complete successfully "
262       << "before other RpcHandler methods are called.";
263
264   DVLOG(3) << "Sending report request to server.";
265
266   // If we are unpublishing or unsubscribing, we need to stop those publish or
267   // subscribes right away, we don't need to wait for the server to tell us.
268   ProcessRemovedOperations(*request);
269
270   request->mutable_update_signals_request()->set_allocated_state(
271       GetDeviceCapabilities(*request).release());
272
273   AddPlayingTokens(request.get());
274
275   // TODO(ckehoe): Currently the server supports only BROADCAST_AND_SCAN.
276   // Remove this once b/16715253 is fixed.
277   if (request->has_manage_messages_request()) {
278     RepeatedPtrField<PublishedMessage>* messages = request
279         ->mutable_manage_messages_request()->mutable_message_to_publish();
280     for (int i = 0; i < messages->size(); ++i) {
281       messages->Mutable(i)->mutable_token_exchange_strategy()
282           ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN);
283     }
284   }
285   if (request->has_manage_subscriptions_request()) {
286     RepeatedPtrField<Subscription>* subscriptions =
287         request->mutable_manage_subscriptions_request()->mutable_subscription();
288     for (int i = 0; i < subscriptions->size(); ++i) {
289       subscriptions->Mutable(i)->mutable_token_exchange_strategy()
290           ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN);
291     }
292   }
293
294   SendServerRequest(kReportRequestRpcName,
295                     app_id,
296                     request.Pass(),
297                     // On destruction, this request will be cancelled.
298                     base::Bind(&RpcHandler::ReportResponseHandler,
299                                base::Unretained(this),
300                                status_callback));
301 }
302
303 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
304   DCHECK(!tokens.empty());
305
306   scoped_ptr<ReportRequest> request(new ReportRequest);
307   for (size_t i = 0; i < tokens.size(); ++i) {
308     if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
309       continue;
310     DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
311     AddTokenToRequest(request.get(), tokens[i]);
312   }
313   SendReportRequest(request.Pass());
314 }
315
316 void RpcHandler::ConnectToWhispernet() {
317   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
318
319   // |directive_handler_| will be destructed with us, so unretained is safe.
320   directive_handler_.reset(new DirectiveHandler);
321   directive_handler_->Initialize(
322       base::Bind(&WhispernetClient::DecodeSamples,
323                  base::Unretained(whispernet_client)),
324       base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
325                  base::Unretained(this)));
326
327   whispernet_client->RegisterTokensCallback(
328       base::Bind(&RpcHandler::ReportTokens,
329                  // On destruction, this callback will be disconnected.
330                  base::Unretained(this)));
331 }
332
333 // Private methods
334
335 void RpcHandler::RegisterResponseHandler(
336     const SuccessCallback& init_done_callback,
337     HttpPost* completed_post,
338     int http_status_code,
339     const std::string& response_data) {
340   if (completed_post) {
341     int elements_erased = pending_posts_.erase(completed_post);
342     DCHECK(elements_erased);
343     delete completed_post;
344   }
345
346   if (http_status_code != net::HTTP_OK) {
347     init_done_callback.Run(false);
348     return;
349   }
350
351   RegisterDeviceResponse response;
352   if (!response.ParseFromString(response_data)) {
353     LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
354     init_done_callback.Run(false);
355     return;
356   }
357
358   if (CopresenceErrorLogged(response.header().status()))
359     return;
360   device_id_ = response.registered_device_id();
361   DCHECK(!device_id_.empty());
362   DVLOG(2) << "Device registration successful: id " << device_id_;
363   init_done_callback.Run(true);
364 }
365
366 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
367                                        HttpPost* completed_post,
368                                        int http_status_code,
369                                        const std::string& response_data) {
370   if (completed_post) {
371     int elements_erased = pending_posts_.erase(completed_post);
372     DCHECK(elements_erased);
373     delete completed_post;
374   }
375
376   if (http_status_code != net::HTTP_OK) {
377     if (!status_callback.is_null())
378       status_callback.Run(FAIL);
379     return;
380   }
381
382   DVLOG(3) << "Received ReportResponse.";
383   ReportResponse response;
384   if (!response.ParseFromString(response_data)) {
385     LOG(ERROR) << "Invalid ReportResponse";
386     if (!status_callback.is_null())
387       status_callback.Run(FAIL);
388     return;
389   }
390
391   if (ReportErrorLogged(response)) {
392     if (!status_callback.is_null())
393       status_callback.Run(FAIL);
394     return;
395   }
396
397   const RepeatedPtrField<MessageResult>& message_results =
398       response.manage_messages_response().published_message_result();
399   for (int i = 0; i < message_results.size(); ++i) {
400     DVLOG(2) << "Published message with id "
401              << message_results.Get(i).published_message_id();
402   }
403
404   const RepeatedPtrField<SubscriptionResult>& subscription_results =
405       response.manage_subscriptions_response().subscription_result();
406   for (int i = 0; i < subscription_results.size(); ++i) {
407     DVLOG(2) << "Created subscription with id "
408              << subscription_results.Get(i).subscription_id();
409   }
410
411   if (response.has_update_signals_response()) {
412     const UpdateSignalsResponse& update_response =
413         response.update_signals_response();
414     DispatchMessages(update_response.message());
415
416     if (directive_handler_.get()) {
417       for (int i = 0; i < update_response.directive_size(); ++i)
418         directive_handler_->AddDirective(update_response.directive(i));
419     } else {
420       DVLOG(1) << "No directive handler.";
421     }
422
423     const RepeatedPtrField<Token>& tokens = update_response.token();
424     for (int i = 0; i < tokens.size(); ++i) {
425       switch (tokens.Get(i).status()) {
426         case VALID:
427           // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
428           // short TTL (like 10s) and send it up with every report request.
429           // Then we'll still get messages while we're waiting to hear it again.
430           VLOG(1) << "Got valid token " << tokens.Get(i).id();
431           break;
432         case INVALID:
433           DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
434           invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
435           break;
436         default:
437           DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
438                    << tokens.Get(i).status();
439       }
440     }
441   }
442
443   // TODO(ckehoe): Return a more detailed status response.
444   if (!status_callback.is_null())
445     status_callback.Run(SUCCESS);
446 }
447
448 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
449   // Remove unpublishes.
450   if (request.has_manage_messages_request()) {
451     const RepeatedPtrField<std::string>& unpublishes =
452         request.manage_messages_request().id_to_unpublish();
453     for (int i = 0; i < unpublishes.size(); ++i)
454       directive_handler_->RemoveDirectives(unpublishes.Get(i));
455   }
456
457   // Remove unsubscribes.
458   if (request.has_manage_subscriptions_request()) {
459     const RepeatedPtrField<std::string>& unsubscribes =
460         request.manage_subscriptions_request().id_to_unsubscribe();
461     for (int i = 0; i < unsubscribes.size(); ++i)
462       directive_handler_->RemoveDirectives(unsubscribes.Get(i));
463   }
464 }
465
466 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
467   if (!directive_handler_)
468     return;
469
470   const std::string& audible_token = directive_handler_->CurrentAudibleToken();
471   const std::string& inaudible_token =
472       directive_handler_->CurrentInaudibleToken();
473
474   if (!audible_token.empty())
475     AddTokenToRequest(request, AudioToken(audible_token, true));
476   if (!inaudible_token.empty())
477     AddTokenToRequest(request, AudioToken(inaudible_token, false));
478 }
479
480 void RpcHandler::DispatchMessages(
481     const RepeatedPtrField<SubscribedMessage>& messages) {
482   if (messages.size() == 0)
483     return;
484
485   // Index the messages by subscription id.
486   std::map<std::string, std::vector<Message> > messages_by_subscription;
487   DVLOG(3) << "Dispatching " << messages.size() << " messages";
488   for (int m = 0; m < messages.size(); ++m) {
489     const RepeatedPtrField<std::string>& subscription_ids =
490         messages.Get(m).subscription_id();
491     for (int s = 0; s < subscription_ids.size(); ++s) {
492       messages_by_subscription[subscription_ids.Get(s)].push_back(
493           messages.Get(m).published_message());
494     }
495   }
496
497   // Send the messages for each subscription.
498   for (std::map<std::string, std::vector<Message> >::const_iterator
499            subscription = messages_by_subscription.begin();
500        subscription != messages_by_subscription.end();
501        ++subscription) {
502     // TODO(ckehoe): Once we have the app ID from the server, we need to pass
503     // it in here and get rid of the app id registry from the main API class.
504     delegate_->HandleMessages("", subscription->first, subscription->second);
505   }
506 }
507
508 RequestHeader* RpcHandler::CreateRequestHeader(
509     const std::string& client_name) const {
510   RequestHeader* header = new RequestHeader;
511
512   header->set_allocated_framework_version(CreateVersion(
513       "Chrome", delegate_->GetPlatformVersionString()));
514   if (!client_name.empty()) {
515     header->set_allocated_client_version(
516         CreateVersion(client_name, std::string()));
517   }
518   header->set_current_time_millis(base::Time::Now().ToJsTime());
519   header->set_registered_device_id(device_id_);
520
521   DeviceFingerprint* fingerprint = new DeviceFingerprint;
522   fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
523   fingerprint->set_type(CHROME_PLATFORM_TYPE);
524   header->set_allocated_device_fingerprint(fingerprint);
525
526   return header;
527 }
528
529 template <class T>
530 void RpcHandler::SendServerRequest(
531     const std::string& rpc_name,
532     const std::string& app_id,
533     scoped_ptr<T> request,
534     const PostCleanupCallback& response_handler) {
535   request->set_allocated_header(CreateRequestHeader(app_id));
536   server_post_callback_.Run(delegate_->GetRequestContext(),
537                             rpc_name,
538                             make_scoped_ptr<MessageLite>(request.release()),
539                             response_handler);
540 }
541
542 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
543                               const std::string& rpc_name,
544                               scoped_ptr<MessageLite> request_proto,
545                               const PostCleanupCallback& callback) {
546   // Create the base URL to call.
547   CommandLine* command_line = CommandLine::ForCurrentProcess();
548   const std::string copresence_server_host =
549       command_line->HasSwitch(switches::kCopresenceServer) ?
550       command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
551       kDefaultCopresenceServer;
552
553   // Create the request and keep a pointer until it completes.
554   HttpPost* http_post = new HttpPost(
555       url_context_getter,
556       copresence_server_host,
557       rpc_name,
558       command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
559       delegate_->GetAPIKey(),
560       *request_proto);
561
562   http_post->Start(base::Bind(callback, http_post));
563   pending_posts_.insert(http_post);
564 }
565
566 void RpcHandler::AudioDirectiveListToWhispernetConnector(
567     const std::string& token,
568     bool audible,
569     const WhispernetClient::SamplesCallback& samples_callback) {
570   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
571   if (whispernet_client) {
572     whispernet_client->RegisterSamplesCallback(samples_callback);
573     whispernet_client->EncodeToken(token, audible);
574   }
575 }
576
577 }  // namespace copresence