2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
14 * * Neither the name of Google Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived from
16 * this software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 #include "modules/websockets/WorkerWebSocketChannel.h"
35 #include "bindings/core/v8/ScriptCallStackFactory.h"
36 #include "core/dom/CrossThreadTask.h"
37 #include "core/dom/Document.h"
38 #include "core/dom/ExecutionContext.h"
39 #include "core/dom/ExecutionContextTask.h"
40 #include "core/fileapi/Blob.h"
41 #include "core/inspector/ScriptCallFrame.h"
42 #include "core/inspector/ScriptCallStack.h"
43 #include "core/workers/WorkerGlobalScope.h"
44 #include "core/workers/WorkerLoaderProxy.h"
45 #include "core/workers/WorkerThread.h"
46 #include "modules/websockets/DocumentWebSocketChannel.h"
47 #include "public/platform/Platform.h"
48 #include "public/platform/WebWaitableEvent.h"
49 #include "wtf/ArrayBuffer.h"
50 #include "wtf/Assertions.h"
51 #include "wtf/Functional.h"
52 #include "wtf/MainThread.h"
53 #include "wtf/text/WTFString.h"
57 typedef WorkerWebSocketChannel::Bridge Bridge;
58 typedef WorkerWebSocketChannel::Peer Peer;
60 // Created and destroyed on the worker thread. All setters of this class are
61 // called on the main thread, while all getters are called on the worker
62 // thread. signalWorkerThread() must be called before any getters are called.
63 class WebSocketChannelSyncHelper : public GarbageCollectedFinalized<WebSocketChannelSyncHelper> {
65 static WebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event)
67 return new WebSocketChannelSyncHelper(event);
70 ~WebSocketChannelSyncHelper()
74 // All setters are called on the main thread.
75 void setConnectRequestResult(bool connectRequestResult)
77 m_connectRequestResult = connectRequestResult;
80 // All getter are called on the worker thread.
81 bool connectRequestResult() const
83 return m_connectRequestResult;
86 // This should be called after all setters are called and before any
87 // getters are called.
88 void signalWorkerThread()
97 void trace(Visitor* visitor) { }
100 explicit WebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)
102 , m_connectRequestResult(false)
106 OwnPtr<WebWaitableEvent> m_event;
107 bool m_connectRequestResult;
110 WorkerWebSocketChannel::WorkerWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
111 : m_bridge(new Bridge(client, workerGlobalScope))
112 , m_sourceURLAtConnection(sourceURL)
113 , m_lineNumberAtConnection(lineNumber)
115 m_bridge->initialize(sourceURL, lineNumber);
118 WorkerWebSocketChannel::~WorkerWebSocketChannel()
123 bool WorkerWebSocketChannel::connect(const KURL& url, const String& protocol)
126 return m_bridge->connect(url, protocol);
129 void WorkerWebSocketChannel::send(const String& message)
132 m_bridge->send(message);
135 void WorkerWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
138 m_bridge->send(binaryData, byteOffset, byteLength);
141 void WorkerWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
144 m_bridge->send(blobData);
147 void WorkerWebSocketChannel::close(int code, const String& reason)
150 m_bridge->close(code, reason);
153 void WorkerWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
158 RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
159 if (callStack && callStack->size()) {
160 // In order to emulate the ConsoleMessage behavior,
161 // we should ignore the specified url and line number if
162 // we can get the JavaScript context.
163 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
164 } else if (sourceURL.isEmpty() && !lineNumber) {
165 // No information is specified by the caller - use the url
166 // and the line number at the connection.
167 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
169 // Use the specified information.
170 m_bridge->fail(reason, level, sourceURL, lineNumber);
174 void WorkerWebSocketChannel::disconnect()
176 m_bridge->disconnect();
180 void WorkerWebSocketChannel::trace(Visitor* visitor)
182 visitor->trace(m_bridge);
183 WebSocketChannel::trace(visitor);
186 Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, WebSocketChannelSyncHelper* syncHelper)
188 , m_loaderProxy(loaderProxy)
189 , m_mainWebSocketChannel(nullptr)
190 , m_syncHelper(syncHelper)
192 ASSERT(!isMainThread());
197 ASSERT(!isMainThread());
200 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
202 ASSERT(isMainThread());
203 Document* document = toDocument(context);
204 m_mainWebSocketChannel = DocumentWebSocketChannel::create(document, this, sourceURL, lineNumber);
205 m_syncHelper->signalWorkerThread();
208 void Peer::connect(const KURL& url, const String& protocol)
210 ASSERT(isMainThread());
211 ASSERT(m_syncHelper);
212 if (!m_mainWebSocketChannel) {
213 m_syncHelper->setConnectRequestResult(false);
215 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
216 m_syncHelper->setConnectRequestResult(connectRequestResult);
218 m_syncHelper->signalWorkerThread();
221 void Peer::send(const String& message)
223 ASSERT(isMainThread());
224 if (m_mainWebSocketChannel)
225 m_mainWebSocketChannel->send(message);
228 void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
230 ASSERT(isMainThread());
231 if (m_mainWebSocketChannel)
232 m_mainWebSocketChannel->send(data);
235 void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
237 ASSERT(isMainThread());
238 if (m_mainWebSocketChannel)
239 m_mainWebSocketChannel->send(blobData);
242 void Peer::close(int code, const String& reason)
244 ASSERT(isMainThread());
245 ASSERT(m_syncHelper);
246 if (!m_mainWebSocketChannel)
248 m_mainWebSocketChannel->close(code, reason);
251 void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
253 ASSERT(isMainThread());
254 ASSERT(m_syncHelper);
255 if (!m_mainWebSocketChannel)
257 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
260 void Peer::disconnect()
262 ASSERT(isMainThread());
263 ASSERT(m_syncHelper);
264 if (m_mainWebSocketChannel) {
265 m_mainWebSocketChannel->disconnect();
266 m_mainWebSocketChannel = nullptr;
268 m_syncHelper->signalWorkerThread();
271 static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridge, const String& subprotocol, const String& extensions)
273 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
274 if (bridge->client())
275 bridge->client()->didConnect(subprotocol, extensions);
278 void Peer::didConnect(const String& subprotocol, const String& extensions)
280 ASSERT(isMainThread());
281 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConnect, m_bridge, subprotocol, extensions));
284 static void workerGlobalScopeDidReceiveTextMessage(ExecutionContext* context, Bridge* bridge, const String& payload)
286 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
287 if (bridge->client())
288 bridge->client()->didReceiveTextMessage(payload);
291 void Peer::didReceiveTextMessage(const String& payload)
293 ASSERT(isMainThread());
294 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveTextMessage, m_bridge, payload));
297 static void workerGlobalScopeDidReceiveBinaryMessage(ExecutionContext* context, Bridge* bridge, PassOwnPtr<Vector<char> > payload)
299 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
300 if (bridge->client())
301 bridge->client()->didReceiveBinaryMessage(payload);
304 void Peer::didReceiveBinaryMessage(PassOwnPtr<Vector<char> > payload)
306 ASSERT(isMainThread());
307 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryMessage, m_bridge, payload));
310 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, Bridge* bridge, unsigned long consumed)
312 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
313 if (bridge->client())
314 bridge->client()->didConsumeBufferedAmount(consumed);
317 void Peer::didConsumeBufferedAmount(unsigned long consumed)
319 ASSERT(isMainThread());
320 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_bridge, consumed));
323 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, Bridge* bridge)
325 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
326 if (bridge->client())
327 bridge->client()->didStartClosingHandshake();
330 void Peer::didStartClosingHandshake()
332 ASSERT(isMainThread());
333 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_bridge));
336 static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
338 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
339 if (bridge->client())
340 bridge->client()->didClose(closingHandshakeCompletion, code, reason);
343 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
345 ASSERT(isMainThread());
346 if (m_mainWebSocketChannel) {
347 m_mainWebSocketChannel->disconnect();
348 m_mainWebSocketChannel = nullptr;
350 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason));
353 static void workerGlobalScopeDidError(ExecutionContext* context, Bridge* bridge)
355 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
356 if (bridge->client())
357 bridge->client()->didError();
360 void Peer::didError()
362 ASSERT(isMainThread());
363 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidError, m_bridge));
366 void Peer::trace(Visitor* visitor)
368 visitor->trace(m_bridge);
369 visitor->trace(m_mainWebSocketChannel);
370 visitor->trace(m_syncHelper);
371 WebSocketChannelClient::trace(visitor);
374 Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalScope)
376 , m_workerGlobalScope(workerGlobalScope)
377 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
378 , m_syncHelper(WebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent())))
379 , m_peer(new Peer(this, m_loaderProxy, m_syncHelper))
388 void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
390 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber))) {
391 // The worker thread has been signalled to shutdown before method completion.
396 bool Bridge::connect(const KURL& url, const String& protocol)
401 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
404 return m_syncHelper->connectRequestResult();
407 void Bridge::send(const String& message)
410 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message));
413 void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
416 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
417 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
418 if (binaryData.byteLength())
419 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
421 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
424 void Bridge::send(PassRefPtr<BlobDataHandle> data)
427 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data));
430 void Bridge::close(int code, const String& reason)
433 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
436 void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
439 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
442 void Bridge::disconnect()
447 waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
448 // Here |m_peer| is detached from the main thread and we can delete it.
452 m_syncHelper = nullptr;
453 // We won't use this any more.
454 m_workerGlobalScope.clear();
457 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
458 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
459 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
461 ASSERT(m_workerGlobalScope);
462 ASSERT(m_syncHelper);
464 m_loaderProxy.postTaskToLoader(task);
466 // We wait for the syncHelper event even if a shutdown event is fired.
467 // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
468 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
469 m_syncHelper->wait();
470 // This is checking whether a shutdown event is fired or not.
471 return !m_workerGlobalScope->thread()->terminated();
474 void Bridge::trace(Visitor* visitor)
476 visitor->trace(m_client);
477 visitor->trace(m_workerGlobalScope);
478 visitor->trace(m_syncHelper);
479 visitor->trace(m_peer);