#include "modules/websockets/WorkerThreadableWebSocketChannel.h"
-#include "bindings/v8/ScriptCallStackFactory.h"
+#include "bindings/core/v8/ScriptCallStackFactory.h"
#include "core/dom/CrossThreadTask.h"
#include "core/dom/Document.h"
#include "core/dom/ExecutionContext.h"
#include "core/fileapi/Blob.h"
#include "core/inspector/ScriptCallFrame.h"
#include "core/inspector/ScriptCallStack.h"
-#include "core/frame/Settings.h"
#include "core/workers/WorkerLoaderProxy.h"
-#include "core/workers/WorkerRunLoop.h"
#include "core/workers/WorkerThread.h"
#include "modules/websockets/MainThreadWebSocketChannel.h"
+#include "modules/websockets/NewWebSocketChannelImpl.h"
#include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
+#include "platform/RuntimeEnabledFeatures.h"
+#include "public/platform/Platform.h"
+#include "public/platform/WebWaitableEvent.h"
#include "wtf/ArrayBuffer.h"
+#include "wtf/Assertions.h"
+#include "wtf/Functional.h"
#include "wtf/MainThread.h"
-namespace WebCore {
+namespace blink {
-WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& taskMode, const String& sourceURL, unsigned lineNumber)
- : m_workerGlobalScope(context)
- , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client))
- , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope, taskMode))
+typedef WorkerThreadableWebSocketChannel::Bridge Bridge;
+typedef WorkerThreadableWebSocketChannel::Peer Peer;
+
+// Created and destroyed on the worker thread. All setters of this class are
+// called on the main thread, while all getters are called on the worker
+// thread. signalWorkerThread() must be called before any getters are called.
+class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<ThreadableWebSocketChannelSyncHelper> {
+public:
+ static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event)
+ {
+ return new ThreadableWebSocketChannelSyncHelper(event);
+ }
+
+ ~ThreadableWebSocketChannelSyncHelper()
+ {
+ }
+
+ // All setters are called on the main thread.
+ void setConnectRequestResult(bool connectRequestResult)
+ {
+ m_connectRequestResult = connectRequestResult;
+ }
+
+ // All getter are called on the worker thread.
+ bool connectRequestResult() const
+ {
+ return m_connectRequestResult;
+ }
+
+ // This should be called after all setters are called and before any
+ // getters are called.
+ void signalWorkerThread()
+ {
+ m_event->signal();
+ }
+ void wait()
+ {
+ m_event->wait();
+ }
+
+ void trace(Visitor* visitor) { }
+
+private:
+ explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)
+ : m_event(event)
+ , m_connectRequestResult(false)
+ {
+ }
+
+ OwnPtr<WebWaitableEvent> m_event;
+ bool m_connectRequestResult;
+};
+
+WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
+ : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
+ , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
, m_sourceURLAtConnection(sourceURL)
, m_lineNumberAtConnection(lineNumber)
{
+ ASSERT(m_workerClientWrapper.get());
m_bridge->initialize(sourceURL, lineNumber);
}
WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
{
- if (m_bridge)
- m_bridge->disconnect();
-}
-
-void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
-{
- if (m_bridge)
- m_bridge->connect(url, protocol);
+ ASSERT(!m_bridge);
}
-String WorkerThreadableWebSocketChannel::subprotocol()
+bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
{
- ASSERT(m_workerClientWrapper);
- return m_workerClientWrapper->subprotocol();
+ ASSERT(m_bridge);
+ return m_bridge->connect(url, protocol);
}
-String WorkerThreadableWebSocketChannel::extensions()
+void WorkerThreadableWebSocketChannel::send(const String& message)
{
- ASSERT(m_workerClientWrapper);
- return m_workerClientWrapper->extensions();
+ ASSERT(m_bridge);
+ m_bridge->send(message);
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
+void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- if (!m_bridge)
- return WebSocketChannel::SendFail;
- return m_bridge->send(message);
-}
-
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
-{
- if (!m_bridge)
- return WebSocketChannel::SendFail;
- return m_bridge->send(binaryData, byteOffset, byteLength);
+ ASSERT(m_bridge);
+ m_bridge->send(binaryData, byteOffset, byteLength);
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
+void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
{
- if (!m_bridge)
- return WebSocketChannel::SendFail;
- return m_bridge->send(blobData);
-}
-
-unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
-{
- if (!m_bridge)
- return 0;
- return m_bridge->bufferedAmount();
+ ASSERT(m_bridge);
+ m_bridge->send(blobData);
}
void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
{
- if (m_bridge)
- m_bridge->close(code, reason);
+ ASSERT(m_bridge);
+ m_bridge->close(code, reason);
}
void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
if (!m_bridge)
return;
- RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
+ RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
if (callStack && callStack->size()) {
// In order to emulate the ConsoleMessage behavior,
// we should ignore the specified url and line number if
m_bridge.clear();
}
-void WorkerThreadableWebSocketChannel::suspend()
+void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
{
- m_workerClientWrapper->suspend();
- if (m_bridge)
- m_bridge->suspend();
+ visitor->trace(m_bridge);
+ visitor->trace(m_workerClientWrapper);
+ WebSocketChannel::trace(visitor);
}
-void WorkerThreadableWebSocketChannel::resume()
-{
- m_workerClientWrapper->resume();
- if (m_bridge)
- m_bridge->resume();
-}
-
-WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& taskMode, const String& sourceURL, unsigned lineNumber)
+Peer::Peer(ThreadableWebSocketChannelClientWrapper* clientWrapper, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper)
: m_workerClientWrapper(clientWrapper)
, m_loaderProxy(loaderProxy)
- , m_mainWebSocketChannel(0)
- , m_taskMode(taskMode)
+ , m_mainWebSocketChannel(nullptr)
+ , m_syncHelper(syncHelper)
{
- Document* document = toDocument(context);
- Settings* settings = document->settings();
- if (settings && settings->experimentalWebSocketEnabled()) {
- // FIXME: Create an "experimental" WebSocketChannel instead of a MainThreadWebSocketChannel.
- m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
- } else
- m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
- ASSERT(isMainThread());
+ ASSERT(!isMainThread());
}
-WorkerThreadableWebSocketChannel::Peer::~Peer()
+Peer::~Peer()
{
- ASSERT(isMainThread());
- if (m_mainWebSocketChannel)
- m_mainWebSocketChannel->disconnect();
-}
-
-void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
-{
- ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->connect(url, protocol);
+ ASSERT(!isMainThread());
}
-static void workerGlobalScopeDidSend(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketChannel::SendResult sendRequestResult)
+Peer* Peer::create(ThreadableWebSocketChannelClientWrapper* clientWrapper, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper)
{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setSendRequestResult(sendRequestResult);
+ return new Peer(clientWrapper, loaderProxy, syncHelper);
}
-void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
+void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
- return;
- WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
+ Document* document = toDocument(context);
+ if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
+ m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
+ } else {
+ m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
+ }
+ m_syncHelper->signalWorkerThread();
}
-void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData)
+void Peer::connect(const KURL& url, const String& protocol)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
- return;
- WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
+ ASSERT(m_syncHelper);
+ if (!m_mainWebSocketChannel) {
+ m_syncHelper->setConnectRequestResult(false);
+ } else {
+ bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
+ m_syncHelper->setConnectRequestResult(connectRequestResult);
+ }
+ m_syncHelper->signalWorkerThread();
}
-void WorkerThreadableWebSocketChannel::Peer::send(PassRefPtr<BlobDataHandle> blobData)
+void Peer::send(const String& message)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
- return;
- WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(blobData);
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
+ if (m_mainWebSocketChannel)
+ m_mainWebSocketChannel->send(message);
}
-static void workerGlobalScopeDidGetBufferedAmount(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
+void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setBufferedAmount(bufferedAmount);
+ ASSERT(isMainThread());
+ if (m_mainWebSocketChannel)
+ m_mainWebSocketChannel->send(data);
}
-void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
+void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
- return;
- unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
+ if (m_mainWebSocketChannel)
+ m_mainWebSocketChannel->send(blobData);
}
-void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
+void Peer::close(int code, const String& reason)
{
ASSERT(isMainThread());
+ ASSERT(m_syncHelper);
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->close(code, reason);
}
-void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
+void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
ASSERT(isMainThread());
+ ASSERT(m_syncHelper);
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
}
-void WorkerThreadableWebSocketChannel::Peer::disconnect()
-{
- ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->disconnect();
- m_mainWebSocketChannel = 0;
-}
-
-void WorkerThreadableWebSocketChannel::Peer::suspend()
+void Peer::disconnect()
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->suspend();
-}
-
-void WorkerThreadableWebSocketChannel::Peer::resume()
-{
- ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->resume();
+ ASSERT(m_syncHelper);
+ if (m_mainWebSocketChannel) {
+ m_mainWebSocketChannel->disconnect();
+ m_mainWebSocketChannel = nullptr;
+ }
+ m_syncHelper->signalWorkerThread();
}
-static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
+static void workerGlobalScopeDidConnect(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, const String& subprotocol, const String& extensions)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setSubprotocol(subprotocol);
- workerClientWrapper->setExtensions(extensions);
- workerClientWrapper->didConnect();
+ workerClientWrapper->didConnect(subprotocol, extensions);
}
-void WorkerThreadableWebSocketChannel::Peer::didConnect()
+void Peer::didConnect(const String& subprotocol, const String& extensions)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()), m_taskMode);
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, subprotocol, extensions);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
+static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, const String& message)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didReceiveMessage(message);
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
+void Peer::didReceiveMessage(const String& message)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
+static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didReceiveBinaryData(binaryData);
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
+void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode);
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidUpdateBufferedAmount(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
+static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, unsigned long consumed)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
+ workerClientWrapper->didConsumeBufferedAmount(consumed);
}
-void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
+void Peer::didConsumeBufferedAmount(unsigned long consumed)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper, consumed);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
+static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didStartClosingHandshake();
}
-void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
+void Peer::didStartClosingHandshake()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
+static void workerGlobalScopeDidClose(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
+ workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
}
-void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
+void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT(isMainThread());
- m_mainWebSocketChannel = 0;
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode);
+ if (m_mainWebSocketChannel) {
+ m_mainWebSocketChannel->disconnect();
+ m_mainWebSocketChannel = nullptr;
+ }
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidClose, m_workerClientWrapper, closingHandshakeCompletion, code, reason);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
+static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didReceiveMessageError();
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
+void Peer::didReceiveMessageError()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper), m_taskMode);
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
+}
+
+void Peer::trace(Visitor* visitor)
+{
+ visitor->trace(m_workerClientWrapper);
+ visitor->trace(m_mainWebSocketChannel);
+ visitor->trace(m_syncHelper);
+ WebSocketChannelClient::trace(visitor);
}
-WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerGlobalScope> workerGlobalScope, const String& taskMode)
+Bridge::Bridge(ThreadableWebSocketChannelClientWrapper* workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
: m_workerClientWrapper(workerClientWrapper)
, m_workerGlobalScope(workerGlobalScope)
, m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
- , m_taskMode(taskMode)
- , m_peer(0)
+ , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent())))
+ , m_peer(Peer::create(m_workerClientWrapper, m_loaderProxy, m_syncHelper))
{
ASSERT(m_workerClientWrapper.get());
}
-WorkerThreadableWebSocketChannel::Bridge::~Bridge()
-{
- disconnect();
-}
-
-class WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask FINAL : public ExecutionContextTask {
-public:
- static PassOwnPtr<ExecutionContextTask> create(WorkerThreadableWebSocketChannel::Peer* peer, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
- {
- return adoptPtr(new WorkerGlobalScopeDidInitializeTask(peer, loaderProxy, workerClientWrapper));
- }
-
- virtual ~WorkerGlobalScopeDidInitializeTask() { }
- virtual void performTask(ExecutionContext* context) OVERRIDE
- {
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- if (m_workerClientWrapper->failedWebSocketChannelCreation()) {
- // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer.
- OwnPtr<WorkerThreadableWebSocketChannel::Peer> peer = adoptPtr(m_peer);
- m_peer = 0;
- m_loaderProxy->postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
- } else
- m_workerClientWrapper->didCreateWebSocketChannel(m_peer);
- }
- virtual bool isCleanupTask() const OVERRIDE { return true; }
-
-private:
- WorkerGlobalScopeDidInitializeTask(WorkerThreadableWebSocketChannel::Peer* peer, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
- : m_peer(peer)
- , m_loaderProxy(loaderProxy)
- , m_workerClientWrapper(workerClientWrapper)
- {
- }
-
- WorkerThreadableWebSocketChannel::Peer* m_peer;
- WorkerLoaderProxy* m_loaderProxy;
- RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper;
-};
-
-void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ExecutionContext* context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode, const String& sourceURL, unsigned lineNumber)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
-
- RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
-
- Peer* peer = Peer::create(clientWrapper, *loaderProxy, context, taskMode, sourceURL, lineNumber);
- bool sent = loaderProxy->postTaskForModeToWorkerGlobalScope(
- WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask::create(peer, loaderProxy, clientWrapper), taskMode);
- if (!sent) {
- clientWrapper->clearPeer();
- delete peer;
- }
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceURL, unsigned lineNumber)
+Bridge::~Bridge()
{
ASSERT(!m_peer);
- setMethodNotCompleted();
- RefPtr<Bridge> protect(this);
- m_loaderProxy.postTaskToLoader(
- createCallbackTask(&Bridge::mainThreadInitialize, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, m_taskMode, sourceURL, lineNumber));
- waitForMethodCompletion();
- // m_peer may be null when the nested runloop exited before a peer has created.
- m_peer = m_workerClientWrapper->peer();
- if (!m_peer)
- m_workerClientWrapper->setFailedWebSocketChannelCreation();
}
-void WorkerThreadableWebSocketChannel::mainThreadConnect(ExecutionContext* context, Peer* peer, const KURL& url, const String& protocol)
+void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->connect(url, protocol);
+ // In order to assure all temporary objects to be destroyed before
+ // posting the task, we separate task creation and posting.
+ // In other words, it is dangerous to have a complicated expression
+ // as a waitForMethodCompletion argument.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber);
+ if (!waitForMethodCompletion(task.release())) {
+ // The worker thread has been signalled to shutdown before method completion.
+ disconnect();
+ }
}
-void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
+bool Bridge::connect(const KURL& url, const String& protocol)
{
- ASSERT(m_workerClientWrapper);
if (!m_peer)
- return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol));
-}
+ return false;
-void WorkerThreadableWebSocketChannel::mainThreadSend(ExecutionContext* context, Peer* peer, const String& message)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
+ return false;
- peer->send(message);
+ return m_syncHelper->connectRequestResult();
}
-void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char> > data)
+void Bridge::send(const String& message)
{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size());
- peer->send(*arrayBuffer);
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message));
}
-void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ExecutionContext* context, Peer* peer, PassRefPtr<BlobDataHandle> data)
+void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
- peer->send(data);
-}
-
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
-{
- if (!m_workerClientWrapper || !m_peer)
- return WebSocketChannel::SendFail;
- setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
- RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (!clientWrapper)
- return WebSocketChannel::SendFail;
- return clientWrapper->sendRequestResult();
-}
-
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
-{
- if (!m_workerClientWrapper || !m_peer)
- return WebSocketChannel::SendFail;
+ ASSERT(m_peer);
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
if (binaryData.byteLength())
memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
- setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release()));
- RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (!clientWrapper)
- return WebSocketChannel::SendFail;
- return clientWrapper->sendRequestResult();
-}
-
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(PassRefPtr<BlobDataHandle> data)
-{
- if (!m_workerClientWrapper || !m_peer)
- return WebSocketChannel::SendFail;
- setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), data));
- RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (!clientWrapper)
- return WebSocketChannel::SendFail;
- return clientWrapper->sendRequestResult();
-}
-
-void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ExecutionContext* context, Peer* peer)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->bufferedAmount();
-}
-
-unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
-{
- if (!m_workerClientWrapper || !m_peer)
- return 0;
- setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
- RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (clientWrapper)
- return clientWrapper->bufferedAmount();
- return 0;
-}
-void WorkerThreadableWebSocketChannel::mainThreadClose(ExecutionContext* context, Peer* peer, int code, const String& reason)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->close(code, reason);
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
-{
- if (!m_peer)
- return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason));
-}
-
-void WorkerThreadableWebSocketChannel::mainThreadFail(ExecutionContext* context, Peer* peer, const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->fail(reason, level, sourceURL, lineNumber);
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
-{
- if (!m_peer)
- return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason, level, sourceURL, lineNumber));
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
}
-void WorkerThreadableWebSocketChannel::mainThreadDestroy(ExecutionContext* context, PassOwnPtr<Peer> peer)
+void Bridge::send(PassRefPtr<BlobDataHandle> data)
{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT_UNUSED(peer, peer);
-
- // Peer object will be deleted even if the task does not run in the main thread's cleanup period, because
- // the destructor for the task object (created by createCallbackTask()) will automatically delete the peer.
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data));
}
-void WorkerThreadableWebSocketChannel::Bridge::disconnect()
+void Bridge::close(int code, const String& reason)
{
- clearClientWrapper();
- if (m_peer) {
- OwnPtr<Peer> peer = adoptPtr(m_peer);
- m_peer = 0;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
- }
- m_workerGlobalScope = 0;
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
}
-void WorkerThreadableWebSocketChannel::mainThreadSuspend(ExecutionContext* context, Peer* peer)
+void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->suspend();
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
}
-void WorkerThreadableWebSocketChannel::Bridge::suspend()
+void Bridge::disconnect()
{
if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
-}
-void WorkerThreadableWebSocketChannel::mainThreadResume(ExecutionContext* context, Peer* peer)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ m_workerClientWrapper->clearClient();
+ waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
+ // Here |m_peer| is detached from the main thread and we can delete it.
- peer->resume();
+ m_peer = nullptr;
+ m_syncHelper = nullptr;
+ // We won't use this any more.
+ m_workerGlobalScope.clear();
}
-void WorkerThreadableWebSocketChannel::Bridge::resume()
+// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
+// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
+bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
{
- if (!m_peer)
- return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
-}
+ ASSERT(m_workerGlobalScope);
+ ASSERT(m_syncHelper);
-void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
-{
- m_workerClientWrapper->clearClient();
-}
+ m_loaderProxy.postTaskToLoader(task);
-void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
-{
- ASSERT(m_workerClientWrapper);
- m_workerClientWrapper->clearSyncMethodDone();
+ // We wait for the syncHelper event even if a shutdown event is fired.
+ // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
+ ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
+ m_syncHelper->wait();
+ // This is checking whether a shutdown event is fired or not.
+ return !m_workerGlobalScope->thread()->terminated();
}
-// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
-// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
-void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
+void Bridge::trace(Visitor* visitor)
{
- if (!m_workerGlobalScope)
- return;
- WorkerRunLoop& runLoop = m_workerGlobalScope->thread()->runLoop();
- MessageQueueWaitResult result = MessageQueueMessageReceived;
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
- result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null.
- clientWrapper = m_workerClientWrapper.get();
- }
+ visitor->trace(m_workerClientWrapper);
+ visitor->trace(m_workerGlobalScope);
+ visitor->trace(m_syncHelper);
+ visitor->trace(m_peer);
}
-} // namespace WebCore
+} // namespace blink