Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / mojo / spy / spy.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/spy/spy.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/strings/string_number_conversions.h"
16 #include "base/strings/string_split.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/worker_pool.h"
19 #include "base/time/time.h"
20 #include "mojo/application_manager/application_manager.h"
21 #include "mojo/public/cpp/system/core.h"
22 #include "mojo/spy/common.h"
23 #include "mojo/spy/public/spy.mojom.h"
24 #include "mojo/spy/spy_server_impl.h"
25 #include "mojo/spy/websocket_server.h"
26 #include "url/gurl.h"
27
28 namespace {
29
30 mojo::WebSocketServer* ws_server = NULL;
31
32 const size_t kMessageBufSize = 2 * 1024;
33 const size_t kHandleBufSize = 64;
34 const int kDefaultWebSocketPort = 42424;
35
36 void CloseHandles(MojoHandle* handles, size_t count) {
37   for (size_t ix = 0; ix != count; ++count)
38     MojoClose(handles[ix]);
39 }
40
41 // In charge of processing messages that flow over a
42 // single message pipe.
43 class MessageProcessor :
44     public base::RefCountedThreadSafe<MessageProcessor> {
45  public:
46   MessageProcessor(base::MessageLoopProxy* control_loop_proxy)
47       : last_result_(MOJO_RESULT_OK),
48         bytes_transfered_(0),
49         control_loop_proxy_(control_loop_proxy) {
50     message_count_[0] = 0;
51     message_count_[1] = 0;
52     handle_count_[0] = 0;
53     handle_count_[1] = 0;
54   }
55
56   void Start(mojo::ScopedMessagePipeHandle client,
57              mojo::ScopedMessagePipeHandle interceptor,
58              const GURL& url) {
59     std::vector<mojo::MessagePipeHandle> pipes;
60     pipes.push_back(client.get());
61     pipes.push_back(interceptor.get());
62     std::vector<MojoHandleSignals> handle_signals;
63     handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
64     handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
65
66     scoped_ptr<char[]> mbuf(new char[kMessageBufSize]);
67     scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]);
68
69     // Main processing loop:
70     // 1- Wait for an endpoint to have a message.
71     // 2- Read the message
72     // 3- Log data
73     // 4- Wait until the opposite port is ready for writting
74     // 4- Write the message to opposite port.
75
76     for (;;) {
77       int r = WaitMany(pipes, handle_signals, MOJO_DEADLINE_INDEFINITE);
78       if ((r < 0) || (r > 1)) {
79         last_result_ = r;
80         break;
81       }
82
83       uint32_t bytes_read = kMessageBufSize;
84       uint32_t handles_read = kHandleBufSize;
85
86       if (!CheckResult(ReadMessageRaw(pipes[r],
87                                       mbuf.get(), &bytes_read,
88                                       hbuf.get(), &handles_read,
89                                       MOJO_READ_MESSAGE_FLAG_NONE)))
90         break;
91
92       if (!bytes_read && !handles_read)
93         continue;
94
95       if (handles_read) {
96         handle_count_[r] += handles_read;
97
98         // Intercept message pipes which are returned via the ReadMessageRaw
99         // call
100         for (uint32_t i = 0; i < handles_read; i++) {
101           // Hack to determine if a handle is a message pipe.
102           // TODO(ananta)
103           // We should have an API which given a handle returns additional
104           // information about the handle which includes its type, etc.
105           if (MojoReadMessage(hbuf[i], NULL, NULL, NULL, NULL,
106                               MOJO_READ_MESSAGE_FLAG_NONE) !=
107                   MOJO_RESULT_INVALID_ARGUMENT) {
108             mojo::ScopedMessagePipeHandle message_pipe_handle;
109             message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i]));
110
111             mojo::ScopedMessagePipeHandle faux_client;
112             mojo::ScopedMessagePipeHandle interceptor;
113             CreateMessagePipe(NULL, &faux_client, &interceptor);
114
115             base::WorkerPool::PostTask(
116                 FROM_HERE,
117                 base::Bind(&MessageProcessor::Start,
118                             this,
119                             base::Passed(&message_pipe_handle),
120                               base::Passed(&interceptor),
121                             url),
122                 true);
123             hbuf.get()[i] = faux_client.release().value();
124           }
125         }
126       }
127       ++message_count_[r];
128       bytes_transfered_ += bytes_read;
129
130       LogMessageInfo(mbuf.get(), url);
131
132       mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0];
133       if (!CheckResult(Wait(write_handle,
134                             MOJO_HANDLE_SIGNAL_WRITABLE,
135                             MOJO_DEADLINE_INDEFINITE)))
136         break;
137
138       if (!CheckResult(WriteMessageRaw(write_handle,
139                                        mbuf.get(), bytes_read,
140                                        hbuf.get(), handles_read,
141                                        MOJO_WRITE_MESSAGE_FLAG_NONE))) {
142         // On failure we own the handles. For now just close them.
143         if (handles_read)
144           CloseHandles(hbuf.get(), handles_read);
145         break;
146       }
147     }
148   }
149
150  private:
151   friend class base::RefCountedThreadSafe<MessageProcessor>;
152   virtual ~MessageProcessor() {}
153
154   bool CheckResult(MojoResult mr) {
155     if (mr == MOJO_RESULT_OK)
156       return true;
157     last_result_ = mr;
158     return false;
159   }
160
161   void LogInvalidMessage(const mojo::MojoMessageHeader& header) {
162     LOG(ERROR) << "Invalid message: Number of Fields: "
163                << header.num_fields
164                << " Number of bytes: "
165                << header.num_bytes
166                << " Flags: "
167                << header.flags;
168   }
169
170   // Validates the message as per the mojo spec.
171   bool IsValidMessage(const mojo::MojoMessageHeader& header) {
172     if (header.num_fields == 2) {
173       if (header.num_bytes != sizeof(mojo::MojoMessageHeader)) {
174         LogInvalidMessage(header);
175         return false;
176       }
177     } else if (header.num_fields == 3) {
178       if (header.num_bytes != sizeof(mojo::MojoRequestHeader)) {
179         LogInvalidMessage(header);
180       }
181     } else if (header.num_fields > 3) {
182       if (header.num_bytes < sizeof(mojo::MojoRequestHeader)) {
183         LogInvalidMessage(header);
184         return false;
185       }
186     }
187     // These flags should be specified in request or response messages.
188     if (header.num_fields < 3 &&
189           ((header.flags & mojo::kMessageExpectsResponse) ||
190            (header.flags & mojo::kMessageIsResponse))) {
191       LOG(ERROR) << "Invalid request message.";
192       LogInvalidMessage(header);
193       return false;
194     }
195     // These flags are mutually exclusive.
196     if ((header.flags & mojo::kMessageExpectsResponse) &&
197         (header.flags & mojo::kMessageIsResponse)) {
198       LOG(ERROR) << "Invalid flags combination in request message.";
199       LogInvalidMessage(header);
200       return false;
201     }
202     return true;
203   }
204
205   void LogMessageInfo(void* data, const GURL& url) {
206     mojo::MojoMessageData* message_data =
207         reinterpret_cast<mojo::MojoMessageData*>(data);
208     if (IsValidMessage(message_data->header)) {
209       control_loop_proxy_->PostTask(
210           FROM_HERE,
211           base::Bind(&mojo::WebSocketServer::LogMessageInfo,
212                       base::Unretained(ws_server),
213                       message_data->header, url, base::Time::Now()));
214     }
215   }
216
217   MojoResult last_result_;
218   uint32_t bytes_transfered_;
219   uint32_t message_count_[2];
220   uint32_t handle_count_[2];
221   scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
222 };
223
224 // In charge of intercepting access to the service manager.
225 class SpyInterceptor : public mojo::ApplicationManager::Interceptor {
226  public:
227   explicit SpyInterceptor(scoped_refptr<mojo::SpyServerImpl> spy_server,
228                           base::MessageLoopProxy* control_loop_proxy)
229       : spy_server_(spy_server),
230         proxy_(base::MessageLoopProxy::current()),
231         control_loop_proxy_(control_loop_proxy){
232   }
233
234  private:
235   virtual mojo::ServiceProviderPtr OnConnectToClient(
236     const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE {
237       if (!MustIntercept(url))
238         return real_client.Pass();
239
240       // You can get an invalid handle if the app (or service) is
241       // created by unconventional means, for example the command line.
242       if (!real_client)
243         return real_client.Pass();
244
245       mojo::ScopedMessagePipeHandle faux_client;
246       mojo::ScopedMessagePipeHandle interceptor;
247       CreateMessagePipe(NULL, &faux_client, &interceptor);
248
249       scoped_refptr<MessageProcessor> processor = new MessageProcessor(
250           control_loop_proxy_);
251       mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe();
252       base::WorkerPool::PostTask(
253           FROM_HERE,
254           base::Bind(&MessageProcessor::Start,
255                      processor,
256                      base::Passed(&real_handle), base::Passed(&interceptor),
257                      url),
258           true);
259
260       mojo::ServiceProviderPtr faux_provider;
261       faux_provider.Bind(faux_client.Pass());
262       return faux_provider.Pass();
263   }
264
265   bool MustIntercept(const GURL& url) {
266     // TODO(cpu): manage who and when to intercept.
267     proxy_->PostTask(
268         FROM_HERE,
269         base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url));
270     return true;
271   }
272
273   scoped_refptr<mojo::SpyServerImpl> spy_server_;
274   scoped_refptr<base::MessageLoopProxy> proxy_;
275   scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
276 };
277
278 void StartWebServer(int port, mojo::ScopedMessagePipeHandle pipe) {
279   // TODO(cpu) figure out lifetime of the server. See Spy() dtor.
280   ws_server = new mojo::WebSocketServer(port, pipe.Pass());
281   ws_server->Start();
282 }
283
284 struct SpyOptions {
285   int websocket_port;
286
287   SpyOptions()
288       : websocket_port(kDefaultWebSocketPort) {
289   }
290 };
291
292 SpyOptions ProcessOptions(const std::string& options) {
293   SpyOptions spy_options;
294   if (options.empty())
295     return spy_options;
296   base::StringPairs kv_pairs;
297   base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs);
298   base::StringPairs::iterator it = kv_pairs.begin();
299   for (; it != kv_pairs.end(); ++it) {
300     if (it->first == "port") {
301       int port;
302       if (base::StringToInt(it->second, &port))
303         spy_options.websocket_port = port;
304     }
305   }
306   return spy_options;
307 }
308
309 }  // namespace
310
311 namespace mojo {
312
313 Spy::Spy(mojo::ApplicationManager* application_manager,
314          const std::string& options) {
315   SpyOptions spy_options = ProcessOptions(options);
316
317   spy_server_ = new SpyServerImpl();
318
319   // Start the tread what will accept commands from the frontend.
320   control_thread_.reset(new base::Thread("mojo_spy_control_thread"));
321   base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0);
322   control_thread_->StartWithOptions(thread_options);
323   control_thread_->message_loop_proxy()->PostTask(
324       FROM_HERE, base::Bind(&StartWebServer,
325                             spy_options.websocket_port,
326                             base::Passed(spy_server_->ServerPipe())));
327
328   // Start intercepting mojo services.
329   application_manager->SetInterceptor(
330       new SpyInterceptor(spy_server_, control_thread_->message_loop_proxy()));
331 }
332
333 Spy::~Spy() {
334   // TODO(cpu): Do not leak the interceptor. Lifetime between the
335   // application_manager and the spy is still unclear hence the leak.
336 }
337
338 }  // namespace mojo