2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
14 * * Neither the name of Google Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived from
16 * this software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
35 #include "bindings/v8/ScriptCallStackFactory.h"
36 #include "core/dom/CrossThreadTask.h"
37 #include "core/dom/Document.h"
38 #include "core/dom/ExecutionContext.h"
39 #include "core/fileapi/Blob.h"
40 #include "core/inspector/ScriptCallFrame.h"
41 #include "core/inspector/ScriptCallStack.h"
42 #include "core/workers/WorkerLoaderProxy.h"
43 #include "core/workers/WorkerRunLoop.h"
44 #include "core/workers/WorkerThread.h"
45 #include "modules/websockets/MainThreadWebSocketChannel.h"
46 #include "modules/websockets/NewWebSocketChannelImpl.h"
47 #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
48 #include "platform/RuntimeEnabledFeatures.h"
49 #include "public/platform/Platform.h"
50 #include "public/platform/WebWaitableEvent.h"
51 #include "wtf/ArrayBuffer.h"
52 #include "wtf/Assertions.h"
53 #include "wtf/Functional.h"
54 #include "wtf/MainThread.h"
58 // Created and destroyed on the worker thread. All setters of this class are
59 // called on the main thread, while all getters are called on the worker
60 // thread. signalWorkerThread() must be called before any getters are called.
61 class ThreadableWebSocketChannelSyncHelper {
63 static PassOwnPtr<ThreadableWebSocketChannelSyncHelper> create(PassOwnPtr<blink::WebWaitableEvent> event)
65 return adoptPtr(new ThreadableWebSocketChannelSyncHelper(event));
68 // All setters are called on the main thread.
69 void setConnectRequestResult(bool connectRequestResult)
71 m_connectRequestResult = connectRequestResult;
73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
75 m_sendRequestResult = sendRequestResult;
78 // All getter are called on the worker thread.
79 bool connectRequestResult() const
81 return m_connectRequestResult;
83 WebSocketChannel::SendResult sendRequestResult() const
85 return m_sendRequestResult;
88 // This should be called after all setters are called and before any
89 // getters are called.
90 void signalWorkerThread()
95 blink::WebWaitableEvent* event() const
101 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> event)
103 , m_connectRequestResult(false)
104 , m_sendRequestResult(WebSocketChannel::SendFail)
108 OwnPtr<blink::WebWaitableEvent> m_event;
109 bool m_connectRequestResult;
110 WebSocketChannel::SendResult m_sendRequestResult;
113 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
114 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
115 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
116 , m_sourceURLAtConnection(sourceURL)
117 , m_lineNumberAtConnection(lineNumber)
119 ASSERT(m_workerClientWrapper.get());
120 m_bridge->initialize(sourceURL, lineNumber);
123 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
126 m_bridge->disconnect();
129 bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
132 return m_bridge->connect(url, protocol);
136 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
139 return WebSocketChannel::SendFail;
140 return m_bridge->send(message);
143 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
146 return WebSocketChannel::SendFail;
147 return m_bridge->send(binaryData, byteOffset, byteLength);
150 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
153 return WebSocketChannel::SendFail;
154 return m_bridge->send(blobData);
157 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
160 m_bridge->close(code, reason);
163 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
168 RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
169 if (callStack && callStack->size()) {
170 // In order to emulate the ConsoleMessage behavior,
171 // we should ignore the specified url and line number if
172 // we can get the JavaScript context.
173 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
174 } else if (sourceURL.isEmpty() && !lineNumber) {
175 // No information is specified by the caller - use the url
176 // and the line number at the connection.
177 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
179 // Use the specified information.
180 m_bridge->fail(reason, level, sourceURL, lineNumber);
184 void WorkerThreadableWebSocketChannel::disconnect()
186 m_bridge->disconnect();
190 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
192 visitor->trace(m_workerClientWrapper);
193 WebSocketChannel::trace(visitor);
196 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<WeakReference<Peer> > reference, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
197 : m_workerClientWrapper(clientWrapper)
198 , m_loaderProxy(loaderProxy)
199 , m_mainWebSocketChannel(nullptr)
200 , m_syncHelper(syncHelper)
201 , m_weakFactory(reference, this)
203 ASSERT(isMainThread());
204 ASSERT(m_workerClientWrapper.get());
206 Document* document = toDocument(context);
207 if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
208 m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
210 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
213 m_syncHelper->signalWorkerThread();
216 WorkerThreadableWebSocketChannel::Peer::~Peer()
218 ASSERT(isMainThread());
219 if (m_mainWebSocketChannel)
220 m_mainWebSocketChannel->disconnect();
223 void WorkerThreadableWebSocketChannel::Peer::initialize(ExecutionContext* context, PassRefPtr<WeakReference<Peer> > reference, WorkerLoaderProxy* loaderProxy, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& sourceURLAtConnection, unsigned lineNumberAtConnection, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
225 // The caller must call destroy() to free the peer.
226 new Peer(reference, clientWrapper, *loaderProxy, context, sourceURLAtConnection, lineNumberAtConnection, syncHelper);
229 void WorkerThreadableWebSocketChannel::Peer::destroy()
231 ASSERT(isMainThread());
235 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
237 ASSERT(isMainThread());
238 if (!m_mainWebSocketChannel) {
239 m_syncHelper->setConnectRequestResult(false);
241 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
242 m_syncHelper->setConnectRequestResult(connectRequestResult);
244 m_syncHelper->signalWorkerThread();
247 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
249 ASSERT(isMainThread());
250 if (!m_mainWebSocketChannel) {
251 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
253 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
254 m_syncHelper->setSendRequestResult(sendRequestResult);
256 m_syncHelper->signalWorkerThread();
259 void WorkerThreadableWebSocketChannel::Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
261 ASSERT(isMainThread());
262 if (!m_mainWebSocketChannel) {
263 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
265 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(data);
266 m_syncHelper->setSendRequestResult(sendRequestResult);
268 m_syncHelper->signalWorkerThread();
271 void WorkerThreadableWebSocketChannel::Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
273 ASSERT(isMainThread());
274 if (!m_mainWebSocketChannel) {
275 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
277 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(blobData);
278 m_syncHelper->setSendRequestResult(sendRequestResult);
280 m_syncHelper->signalWorkerThread();
283 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
285 ASSERT(isMainThread());
286 if (!m_mainWebSocketChannel)
288 m_mainWebSocketChannel->close(code, reason);
291 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
293 ASSERT(isMainThread());
294 if (!m_mainWebSocketChannel)
296 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
299 void WorkerThreadableWebSocketChannel::Peer::disconnect()
301 ASSERT(isMainThread());
302 if (!m_mainWebSocketChannel)
304 m_mainWebSocketChannel->disconnect();
305 m_mainWebSocketChannel = nullptr;
308 static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
310 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
311 workerClientWrapper->didConnect(subprotocol, extensions);
314 void WorkerThreadableWebSocketChannel::Peer::didConnect(const String& subprotocol, const String& extensions)
316 ASSERT(isMainThread());
317 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper.get(), subprotocol, extensions));
320 static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
322 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
323 workerClientWrapper->didReceiveMessage(message);
326 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
328 ASSERT(isMainThread());
329 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper.get(), message));
332 static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
334 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
335 workerClientWrapper->didReceiveBinaryData(binaryData);
338 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
340 ASSERT(isMainThread());
341 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData));
344 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long consumed)
346 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
347 workerClientWrapper->didConsumeBufferedAmount(consumed);
350 void WorkerThreadableWebSocketChannel::Peer::didConsumeBufferedAmount(unsigned long consumed)
352 ASSERT(isMainThread());
353 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed));
356 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
358 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
359 workerClientWrapper->didStartClosingHandshake();
362 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
364 ASSERT(isMainThread());
365 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper.get()));
368 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
370 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
371 workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
374 void WorkerThreadableWebSocketChannel::Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
376 ASSERT(isMainThread());
377 m_mainWebSocketChannel = nullptr;
378 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reason));
381 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
383 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
384 workerClientWrapper->didReceiveMessageError();
387 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
389 ASSERT(isMainThread());
390 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper.get()));
393 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
394 : m_workerClientWrapper(workerClientWrapper)
395 , m_workerGlobalScope(workerGlobalScope)
396 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
399 ASSERT(m_workerClientWrapper.get());
402 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
407 void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceURL, unsigned lineNumber)
409 RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound();
410 m_peer = WeakPtr<Peer>(reference);
412 OwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = ThreadableWebSocketChannelSyncHelper::create(adoptPtr(blink::Platform::current()->createWaitableEvent()));
413 // This pointer is guaranteed to be valid until we call terminatePeer.
414 m_syncHelper = syncHelper.get();
416 RefPtr<Bridge> protect(this);
417 if (!waitForMethodCompletion(createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper.get(), sourceURL, lineNumber, syncHelper.release()))) {
418 // The worker thread has been signalled to shutdown before method completion.
423 bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
425 if (hasTerminatedPeer())
428 RefPtr<Bridge> protect(this);
429 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy()))))
432 return m_syncHelper->connectRequestResult();
435 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
437 if (hasTerminatedPeer())
438 return WebSocketChannel::SendFail;
440 RefPtr<Bridge> protect(this);
441 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy()))))
442 return WebSocketChannel::SendFail;
444 return m_syncHelper->sendRequestResult();
447 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
449 if (hasTerminatedPeer())
450 return WebSocketChannel::SendFail;
452 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
453 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
454 if (binaryData.byteLength())
455 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
457 RefPtr<Bridge> protect(this);
458 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release()))))
459 return WebSocketChannel::SendFail;
461 return m_syncHelper->sendRequestResult();
464 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(PassRefPtr<BlobDataHandle> data)
466 if (hasTerminatedPeer())
467 return WebSocketChannel::SendFail;
469 RefPtr<Bridge> protect(this);
470 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data))))
471 return WebSocketChannel::SendFail;
473 return m_syncHelper->sendRequestResult();
476 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
478 if (hasTerminatedPeer())
481 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_peer, code, reason.isolatedCopy())));
484 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
486 if (hasTerminatedPeer())
489 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::fail, m_peer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber)));
492 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
494 clearClientWrapper();
498 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
500 m_workerClientWrapper->clearClient();
503 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
504 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
505 bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
507 ASSERT(m_workerGlobalScope);
508 ASSERT(m_syncHelper);
510 m_loaderProxy.postTaskToLoader(task);
512 // We wait for the syncHelper event even if a shutdown event is fired.
513 // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
514 Vector<blink::WebWaitableEvent*> events;
515 events.append(m_syncHelper->event());
516 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
517 blink::Platform::current()->waitMultipleEvents(events);
518 // This is checking whether a shutdown event is fired or not.
519 return !m_workerGlobalScope->thread()->runLoop().terminated();
522 void WorkerThreadableWebSocketChannel::Bridge::terminatePeer()
524 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
525 // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
526 // We must not touch m_syncHelper any more.
529 // We won't use this any more.
530 m_workerGlobalScope = nullptr;
533 } // namespace WebCore