Upstream version 8.37.180.0
[platform/framework/web/crosswalk.git] / src / mojo / spy / spy.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/spy/spy.h"
6
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/strings/string_number_conversions.h"
12 #include "base/strings/string_split.h"
13 #include "base/threading/thread.h"
14 #include "base/threading/worker_pool.h"
15
16 #include "mojo/public/cpp/system/core.h"
17 #include "mojo/service_manager/service_manager.h"
18 #include "mojo/spy/websocket_server.h"
19
20 namespace {
21
22 const size_t kMessageBufSize = 2 * 1024;
23 const size_t kHandleBufSize = 64;
24 const int kDefaultWebSocketPort = 42424;
25
26 void CloseHandles(MojoHandle* handles, size_t count) {
27   for (size_t ix = 0; ix != count; ++count)
28     MojoClose(handles[ix]);
29 }
30
31 // In charge of processing messages that flow over a
32 // single message pipe.
33 class MessageProcessor :
34     public base::RefCountedThreadSafe<MessageProcessor> {
35  public:
36
37   MessageProcessor()
38       : last_result_(MOJO_RESULT_OK),
39         bytes_transfered_(0) {
40
41     message_count_[0] = 0;
42     message_count_[1] = 0;
43     handle_count_[0] = 0;
44     handle_count_[1] = 0;
45   }
46
47   void Start(mojo::ScopedMessagePipeHandle client,
48              mojo::ScopedMessagePipeHandle interceptor) {
49     std::vector<mojo::MessagePipeHandle> pipes;
50     pipes.push_back(client.get());
51     pipes.push_back(interceptor.get());
52     std::vector<MojoHandleSignals> handle_signals;
53     handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
54     handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
55
56     scoped_ptr<char[]> mbuf(new char[kMessageBufSize]);
57     scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]);
58
59     // Main processing loop:
60     // 1- Wait for an endpoint to have a message.
61     // 2- Read the message
62     // 3- Log data
63     // 4- Wait until the opposite port is ready for writting
64     // 4- Write the message to opposite port.
65
66     for (;;) {
67       int r = WaitMany(pipes, handle_signals, MOJO_DEADLINE_INDEFINITE);
68       if ((r < 0) || (r > 1)) {
69         last_result_ = r;
70         break;
71       }
72
73       uint32_t bytes_read = kMessageBufSize;
74       uint32_t handles_read = kHandleBufSize;
75
76       if (!CheckResult(ReadMessageRaw(pipes[r],
77                                       mbuf.get(), &bytes_read,
78                                       hbuf.get(), &handles_read,
79                                       MOJO_READ_MESSAGE_FLAG_NONE)))
80         break;
81
82       if (!bytes_read && !handles_read)
83         continue;
84
85       if (handles_read)
86         handle_count_[r] += handles_read;
87
88       ++message_count_[r];
89       bytes_transfered_ += bytes_read;
90
91       mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0];
92       if (!CheckResult(Wait(write_handle,
93                             MOJO_HANDLE_SIGNAL_WRITABLE,
94                             MOJO_DEADLINE_INDEFINITE)))
95         break;
96
97       if (!CheckResult(WriteMessageRaw(write_handle,
98                                        mbuf.get(), bytes_read,
99                                        hbuf.get(), handles_read,
100                                        MOJO_WRITE_MESSAGE_FLAG_NONE))) {
101         // On failure we own the handles. For now just close them.
102         if (handles_read)
103           CloseHandles(hbuf.get(), handles_read);
104         break;
105       }
106     }
107   }
108
109  private:
110    friend class base::RefCountedThreadSafe<MessageProcessor>;
111    virtual ~MessageProcessor() {}
112
113    bool CheckResult(MojoResult mr) {
114      if (mr == MOJO_RESULT_OK)
115        return true;
116      last_result_ = mr;
117      return false;
118    }
119
120    MojoResult last_result_;
121    uint32_t bytes_transfered_;
122    uint32_t message_count_[2];
123    uint32_t handle_count_[2];
124 };
125
126 // In charge of intercepting access to the service manager.
127 class SpyInterceptor : public mojo::ServiceManager::Interceptor {
128  private:
129   virtual mojo::ScopedMessagePipeHandle OnConnectToClient(
130     const GURL& url, mojo::ScopedMessagePipeHandle real_client) OVERRIDE {
131       if (!MustIntercept(url))
132         return real_client.Pass();
133
134       // You can get an invalid handle if the app (or service) is
135       // created by unconventional means, for example the command line.
136       if (!real_client.is_valid())
137         return real_client.Pass();
138
139       mojo::ScopedMessagePipeHandle faux_client;
140       mojo::ScopedMessagePipeHandle interceptor;
141       CreateMessagePipe(NULL, &faux_client, &interceptor);
142
143       scoped_refptr<MessageProcessor> processor = new MessageProcessor();
144       base::WorkerPool::PostTask(
145           FROM_HERE,
146           base::Bind(&MessageProcessor::Start,
147                      processor,
148                      base::Passed(&real_client), base::Passed(&interceptor)),
149           true);
150
151       return faux_client.Pass();
152   }
153
154   bool MustIntercept(const GURL& url) {
155     // TODO(cpu): manage who and when to intercept.
156     return true;
157   }
158 };
159
160 spy::WebSocketServer* ws_server = NULL;
161
162 void StartServer(int port) {
163   // TODO(cpu) figure out lifetime of the server. See Spy() dtor.
164   ws_server = new spy::WebSocketServer(port);
165   ws_server->Start();
166 }
167
168 struct SpyOptions {
169   int websocket_port;
170
171   SpyOptions()
172       : websocket_port(kDefaultWebSocketPort) {
173   }
174 };
175
176 SpyOptions ProcessOptions(const std::string& options) {
177   SpyOptions spy_options;
178   if (options.empty())
179     return spy_options;
180   base::StringPairs kv_pairs;
181   base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs);
182   base::StringPairs::iterator it = kv_pairs.begin();
183   for (; it != kv_pairs.end(); ++it) {
184     if (it->first == "port") {
185       int port;
186       if (base::StringToInt(it->second, &port))
187         spy_options.websocket_port = port;
188     }
189   }
190   return spy_options;
191 }
192
193 }  // namespace
194
195 namespace mojo {
196
197 Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) {
198   SpyOptions spy_options = ProcessOptions(options);
199   // Start the tread what will accept commands from the frontend.
200   control_thread_.reset(new base::Thread("mojo_spy_control_thread"));
201   base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0);
202   control_thread_->StartWithOptions(thread_options);
203   control_thread_->message_loop_proxy()->PostTask(
204       FROM_HERE, base::Bind(&StartServer, spy_options.websocket_port));
205
206   // Start intercepting mojo services.
207   service_manager->SetInterceptor(new SpyInterceptor());
208 }
209
210 Spy::~Spy(){
211   // TODO(cpu): Do not leak the interceptor. Lifetime between the
212   // service_manager and the spy is still unclear hence the leak.
213 }
214
215 }  // namespace mojo