1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/memory/weak_ptr.h"
6 #include "base/message_loop/message_loop.h"
7 #include "base/rand_util.h"
8 #include "chrome/browser/devtools/device/android_device_manager.h"
9 #include "content/public/browser/browser_thread.h"
10 #include "net/base/io_buffer.h"
11 #include "net/base/net_errors.h"
12 #include "net/server/web_socket.h"
13 #include "net/socket/stream_socket.h"
15 using content::BrowserThread;
20 const int kBufferSize = 16 * 1024;
24 typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate;
26 WebSocketImpl(Delegate* delegate,
27 scoped_ptr<net::StreamSocket> socket);
28 void StartListening();
29 void SendFrame(const std::string& message);
32 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result);
33 void SendPendingRequests(int result);
37 scoped_ptr<net::StreamSocket> socket_;
38 std::string response_buffer_;
39 std::string request_buffer_;
40 base::ThreadChecker thread_checker_;
41 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
45 : public AndroidDeviceManager::AndroidWebSocket::Delegate {
47 DelegateWrapper(base::WeakPtr<Delegate> weak_delegate,
48 scoped_refptr<base::MessageLoopProxy> message_loop)
49 : weak_delegate_(weak_delegate),
50 message_loop_(message_loop) {
53 virtual ~DelegateWrapper() {}
55 // AndroidWebSocket::Delegate implementation
56 virtual void OnSocketOpened() OVERRIDE {
57 message_loop_->PostTask(FROM_HERE,
58 base::Bind(&Delegate::OnSocketOpened, weak_delegate_));
61 virtual void OnFrameRead(const std::string& message) OVERRIDE {
62 message_loop_->PostTask(FROM_HERE,
63 base::Bind(&Delegate::OnFrameRead, weak_delegate_, message));
66 virtual void OnSocketClosed() OVERRIDE {
67 message_loop_->PostTask(FROM_HERE,
68 base::Bind(&Delegate::OnSocketClosed, weak_delegate_));
72 base::WeakPtr<Delegate> weak_delegate_;
73 scoped_refptr<base::MessageLoopProxy> message_loop_;
76 class AndroidWebSocketImpl
77 : public AndroidDeviceManager::AndroidWebSocket,
78 public AndroidDeviceManager::AndroidWebSocket::Delegate {
80 typedef AndroidDeviceManager::Device Device;
82 scoped_refptr<base::MessageLoopProxy> device_message_loop,
83 scoped_refptr<Device> device,
84 const std::string& socket_name,
85 const std::string& url,
86 AndroidWebSocket::Delegate* delegate);
88 virtual ~AndroidWebSocketImpl();
90 // AndroidWebSocket implementation
91 virtual void SendFrame(const std::string& message) OVERRIDE;
93 // AndroidWebSocket::Delegate implementation
94 virtual void OnSocketOpened() OVERRIDE;
95 virtual void OnFrameRead(const std::string& message) OVERRIDE;
96 virtual void OnSocketClosed() OVERRIDE;
99 void Connected(int result, scoped_ptr<net::StreamSocket> socket);
101 scoped_refptr<base::MessageLoopProxy> device_message_loop_;
102 scoped_refptr<Device> device_;
103 std::string socket_name_;
105 WebSocketImpl* connection_;
106 DelegateWrapper* delegate_wrapper_;
107 AndroidWebSocket::Delegate* delegate_;
108 base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_;
109 DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl);
112 AndroidWebSocketImpl::AndroidWebSocketImpl(
113 scoped_refptr<base::MessageLoopProxy> device_message_loop,
114 scoped_refptr<Device> device,
115 const std::string& socket_name,
116 const std::string& url,
117 AndroidWebSocket::Delegate* delegate)
118 : device_message_loop_(device_message_loop),
120 socket_name_(socket_name),
123 weak_factory_(this) {
124 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
126 device_->HttpUpgrade(
128 base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr()));
131 void AndroidWebSocketImpl::SendFrame(const std::string& message) {
132 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
133 device_message_loop_->PostTask(
135 base::Bind(&WebSocketImpl::SendFrame,
136 base::Unretained(connection_), message));
139 void WebSocketImpl::SendFrame(const std::string& message) {
140 DCHECK(thread_checker_.CalledOnValidThread());
143 int mask = base::RandInt(0, 0x7FFFFFFF);
144 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
145 request_buffer_ += encoded_frame;
146 if (request_buffer_.length() == encoded_frame.length())
147 SendPendingRequests(0);
150 AndroidWebSocketImpl::~AndroidWebSocketImpl() {
151 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
152 device_message_loop_->DeleteSoon(FROM_HERE, connection_);
153 device_message_loop_->DeleteSoon(FROM_HERE, delegate_wrapper_);
156 WebSocketImpl::WebSocketImpl(Delegate* delegate,
157 scoped_ptr<net::StreamSocket> socket)
158 : delegate_(delegate),
159 socket_(socket.Pass()) {
160 thread_checker_.DetachFromThread();
163 void AndroidWebSocketImpl::Connected(int result,
164 scoped_ptr<net::StreamSocket> socket) {
165 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
166 if (result != net::OK || socket == NULL) {
170 delegate_wrapper_ = new DelegateWrapper(weak_factory_.GetWeakPtr(),
171 base::MessageLoopProxy::current());
172 connection_ = new WebSocketImpl(delegate_wrapper_, socket.Pass());
173 device_message_loop_->PostTask(
175 base::Bind(&WebSocketImpl::StartListening,
176 base::Unretained(connection_)));
180 void WebSocketImpl::StartListening() {
181 DCHECK(thread_checker_.CalledOnValidThread());
183 scoped_refptr<net::IOBuffer> response_buffer =
184 new net::IOBuffer(kBufferSize);
185 int result = socket_->Read(
186 response_buffer.get(),
188 base::Bind(&WebSocketImpl::OnBytesRead,
189 base::Unretained(this), response_buffer));
190 if (result != net::ERR_IO_PENDING)
191 OnBytesRead(response_buffer, result);
194 void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer,
196 DCHECK(thread_checker_.CalledOnValidThread());
202 response_buffer_.append(response_buffer->data(), result);
206 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
207 response_buffer_, false, &bytes_consumed, &output);
209 while (parse_result == WebSocket::FRAME_OK) {
210 response_buffer_ = response_buffer_.substr(bytes_consumed);
211 delegate_->OnFrameRead(output);
212 parse_result = WebSocket::DecodeFrameHybi17(
213 response_buffer_, false, &bytes_consumed, &output);
216 if (parse_result == WebSocket::FRAME_ERROR ||
217 parse_result == WebSocket::FRAME_CLOSE) {
222 result = socket_->Read(
223 response_buffer.get(),
225 base::Bind(&WebSocketImpl::OnBytesRead,
226 base::Unretained(this), response_buffer));
227 if (result != net::ERR_IO_PENDING)
228 OnBytesRead(response_buffer, result);
231 void WebSocketImpl::SendPendingRequests(int result) {
232 DCHECK(thread_checker_.CalledOnValidThread());
237 request_buffer_ = request_buffer_.substr(result);
238 if (request_buffer_.empty())
241 scoped_refptr<net::StringIOBuffer> buffer =
242 new net::StringIOBuffer(request_buffer_);
243 result = socket_->Write(buffer.get(), buffer->size(),
244 base::Bind(&WebSocketImpl::SendPendingRequests,
245 base::Unretained(this)));
246 if (result != net::ERR_IO_PENDING)
247 SendPendingRequests(result);
250 void WebSocketImpl::Disconnect() {
251 DCHECK(thread_checker_.CalledOnValidThread());
253 delegate_->OnSocketClosed();
256 void AndroidWebSocketImpl::OnSocketOpened() {
257 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
258 delegate_->OnSocketOpened();
261 void AndroidWebSocketImpl::OnFrameRead(const std::string& message) {
262 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
263 delegate_->OnFrameRead(message);
266 void AndroidWebSocketImpl::OnSocketClosed() {
267 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
268 delegate_->OnSocketClosed();
273 AndroidDeviceManager::AndroidWebSocket*
274 AndroidDeviceManager::Device::CreateWebSocket(
275 const std::string& socket,
276 const std::string& url,
277 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) {
278 return new AndroidWebSocketImpl(
279 device_message_loop_, this, socket, url, delegate);