#include "modules/websockets/WorkerThreadableWebSocketChannel.h"
-#include "RuntimeEnabledFeatures.h"
-#include "bindings/v8/ScriptCallStackFactory.h"
+#include "bindings/core/v8/ScriptCallStackFactory.h"
#include "core/dom/CrossThreadTask.h"
#include "core/dom/Document.h"
#include "core/dom/ExecutionContext.h"
#include "core/inspector/ScriptCallFrame.h"
#include "core/inspector/ScriptCallStack.h"
#include "core/workers/WorkerLoaderProxy.h"
-#include "core/workers/WorkerRunLoop.h"
#include "core/workers/WorkerThread.h"
#include "modules/websockets/MainThreadWebSocketChannel.h"
#include "modules/websockets/NewWebSocketChannelImpl.h"
#include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
+#include "platform/RuntimeEnabledFeatures.h"
#include "public/platform/Platform.h"
#include "public/platform/WebWaitableEvent.h"
#include "wtf/ArrayBuffer.h"
#include "wtf/Functional.h"
#include "wtf/MainThread.h"
-namespace WebCore {
+namespace blink {
+
+typedef WorkerThreadableWebSocketChannel::Bridge Bridge;
+typedef WorkerThreadableWebSocketChannel::Peer Peer;
// Created and destroyed on the worker thread. All setters of this class are
// called on the main thread, while all getters are called on the worker
// thread. signalWorkerThread() must be called before any getters are called.
-class ThreadableWebSocketChannelSyncHelper {
+class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<ThreadableWebSocketChannelSyncHelper> {
public:
- static PassOwnPtr<ThreadableWebSocketChannelSyncHelper> create(PassOwnPtr<blink::WebWaitableEvent> event)
+ static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event)
+ {
+ return new ThreadableWebSocketChannelSyncHelper(event);
+ }
+
+ ~ThreadableWebSocketChannelSyncHelper()
{
- return adoptPtr(new ThreadableWebSocketChannelSyncHelper(event));
}
// All setters are called on the main thread.
{
m_connectRequestResult = connectRequestResult;
}
- void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
- {
- m_sendRequestResult = sendRequestResult;
- }
- void setBufferedAmount(unsigned long bufferedAmount)
- {
- m_bufferedAmount = bufferedAmount;
- }
// All getter are called on the worker thread.
bool connectRequestResult() const
{
return m_connectRequestResult;
}
- WebSocketChannel::SendResult sendRequestResult() const
- {
- return m_sendRequestResult;
- }
- unsigned long bufferedAmount() const
- {
- return m_bufferedAmount;
- }
// This should be called after all setters are called and before any
// getters are called.
{
m_event->signal();
}
-
- blink::WebWaitableEvent* event() const
+ void wait()
{
- return m_event.get();
+ m_event->wait();
}
+ void trace(Visitor* visitor) { }
+
private:
- ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> event)
+ explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)
: m_event(event)
, m_connectRequestResult(false)
- , m_sendRequestResult(WebSocketChannel::SendFail)
- , m_bufferedAmount(0)
{
}
- OwnPtr<blink::WebWaitableEvent> m_event;
+ OwnPtr<WebWaitableEvent> m_event;
bool m_connectRequestResult;
- WebSocketChannel::SendResult m_sendRequestResult;
- unsigned long m_bufferedAmount;
};
WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
{
- if (m_bridge)
- m_bridge->disconnect();
+ ASSERT(!m_bridge);
}
bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
{
- if (m_bridge)
- return m_bridge->connect(url, protocol);
- return false;
-}
-
-String WorkerThreadableWebSocketChannel::subprotocol()
-{
- return m_workerClientWrapper->subprotocol();
+ ASSERT(m_bridge);
+ return m_bridge->connect(url, protocol);
}
-String WorkerThreadableWebSocketChannel::extensions()
+void WorkerThreadableWebSocketChannel::send(const String& message)
{
- return m_workerClientWrapper->extensions();
+ ASSERT(m_bridge);
+ m_bridge->send(message);
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
+void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- if (!m_bridge)
- return WebSocketChannel::SendFail;
- return m_bridge->send(message);
+ ASSERT(m_bridge);
+ m_bridge->send(binaryData, byteOffset, byteLength);
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
+void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
{
- if (!m_bridge)
- return WebSocketChannel::SendFail;
- return m_bridge->send(binaryData, byteOffset, byteLength);
-}
-
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
-{
- if (!m_bridge)
- return WebSocketChannel::SendFail;
- return m_bridge->send(blobData);
-}
-
-unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
-{
- if (!m_bridge)
- return 0;
- return m_bridge->bufferedAmount();
+ ASSERT(m_bridge);
+ m_bridge->send(blobData);
}
void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
{
- if (m_bridge)
- m_bridge->close(code, reason);
+ ASSERT(m_bridge);
+ m_bridge->close(code, reason);
}
void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
if (!m_bridge)
return;
- RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
+ RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
if (callStack && callStack->size()) {
// In order to emulate the ConsoleMessage behavior,
// we should ignore the specified url and line number if
m_bridge.clear();
}
-void WorkerThreadableWebSocketChannel::suspend()
-{
- m_workerClientWrapper->suspend();
- if (m_bridge)
- m_bridge->suspend();
-}
-
-void WorkerThreadableWebSocketChannel::resume()
+void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
{
- m_workerClientWrapper->resume();
- if (m_bridge)
- m_bridge->resume();
+ visitor->trace(m_bridge);
+ visitor->trace(m_workerClientWrapper);
+ WebSocketChannel::trace(visitor);
}
-WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<WeakReference<Peer> > reference, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
+Peer::Peer(ThreadableWebSocketChannelClientWrapper* clientWrapper, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper)
: m_workerClientWrapper(clientWrapper)
, m_loaderProxy(loaderProxy)
, m_mainWebSocketChannel(nullptr)
, m_syncHelper(syncHelper)
- , m_weakFactory(reference, this)
{
- ASSERT(isMainThread());
- ASSERT(m_workerClientWrapper.get());
-
- Document* document = toDocument(context);
- if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
- m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
- } else {
- m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
- }
-
- m_syncHelper->signalWorkerThread();
+ ASSERT(!isMainThread());
}
-WorkerThreadableWebSocketChannel::Peer::~Peer()
+Peer::~Peer()
{
- ASSERT(isMainThread());
- if (m_mainWebSocketChannel)
- m_mainWebSocketChannel->disconnect();
-}
-
-void WorkerThreadableWebSocketChannel::Peer::initialize(ExecutionContext* context, PassRefPtr<WeakReference<Peer> > reference, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& sourceURLAtConnection, unsigned lineNumberAtConnection, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
-{
- // The caller must call destroy() to free the peer.
- new Peer(reference, clientWrapper, *loaderProxy, context, sourceURLAtConnection, lineNumberAtConnection, syncHelper);
+ ASSERT(!isMainThread());
}
-void WorkerThreadableWebSocketChannel::Peer::destroy()
+Peer* Peer::create(ThreadableWebSocketChannelClientWrapper* clientWrapper, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper)
{
- ASSERT(isMainThread());
- delete this;
+ return new Peer(clientWrapper, loaderProxy, syncHelper);
}
-void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
+void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel) {
- m_syncHelper->setConnectRequestResult(false);
+ Document* document = toDocument(context);
+ if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
+ m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
} else {
- bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
- m_syncHelper->setConnectRequestResult(connectRequestResult);
+ m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
}
m_syncHelper->signalWorkerThread();
}
-void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
+void Peer::connect(const KURL& url, const String& protocol)
{
ASSERT(isMainThread());
+ ASSERT(m_syncHelper);
if (!m_mainWebSocketChannel) {
- m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
+ m_syncHelper->setConnectRequestResult(false);
} else {
- WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
- m_syncHelper->setSendRequestResult(sendRequestResult);
+ bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
+ m_syncHelper->setConnectRequestResult(connectRequestResult);
}
m_syncHelper->signalWorkerThread();
}
-void WorkerThreadableWebSocketChannel::Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
+void Peer::send(const String& message)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel) {
- m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
- } else {
- RefPtr<ArrayBuffer> binaryData = ArrayBuffer::create(data->data(), data->size());
- if (!binaryData) {
- // Failed to allocate an ArrayBuffer. We need to crash the renderer
- // since there's no way defined in the spec to tell this to the
- // user.
- CRASH();
- }
- WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(*binaryData, 0, binaryData->byteLength());
- m_syncHelper->setSendRequestResult(sendRequestResult);
- }
- m_syncHelper->signalWorkerThread();
+ if (m_mainWebSocketChannel)
+ m_mainWebSocketChannel->send(message);
}
-void WorkerThreadableWebSocketChannel::Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
+void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel) {
- m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
- } else {
- WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(blobData);
- m_syncHelper->setSendRequestResult(sendRequestResult);
- }
- m_syncHelper->signalWorkerThread();
+ if (m_mainWebSocketChannel)
+ m_mainWebSocketChannel->send(data);
}
-void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
+void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel) {
- m_syncHelper->setBufferedAmount(0);
- } else {
- unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
- m_syncHelper->setBufferedAmount(bufferedAmount);
- }
- m_syncHelper->signalWorkerThread();
+ if (m_mainWebSocketChannel)
+ m_mainWebSocketChannel->send(blobData);
}
-void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
+void Peer::close(int code, const String& reason)
{
ASSERT(isMainThread());
+ ASSERT(m_syncHelper);
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->close(code, reason);
}
-void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
+void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
ASSERT(isMainThread());
+ ASSERT(m_syncHelper);
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
}
-void WorkerThreadableWebSocketChannel::Peer::disconnect()
-{
- ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->disconnect();
- m_mainWebSocketChannel = nullptr;
-}
-
-void WorkerThreadableWebSocketChannel::Peer::suspend()
+void Peer::disconnect()
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->suspend();
-}
-
-void WorkerThreadableWebSocketChannel::Peer::resume()
-{
- ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->resume();
+ ASSERT(m_syncHelper);
+ if (m_mainWebSocketChannel) {
+ m_mainWebSocketChannel->disconnect();
+ m_mainWebSocketChannel = nullptr;
+ }
+ m_syncHelper->signalWorkerThread();
}
-static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
+static void workerGlobalScopeDidConnect(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, const String& subprotocol, const String& extensions)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setSubprotocol(subprotocol);
- workerClientWrapper->setExtensions(extensions);
- workerClientWrapper->didConnect();
+ workerClientWrapper->didConnect(subprotocol, extensions);
}
-void WorkerThreadableWebSocketChannel::Peer::didConnect()
+void Peer::didConnect(const String& subprotocol, const String& extensions)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()));
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, subprotocol, extensions);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
+static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, const String& message)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didReceiveMessage(message);
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
+void Peer::didReceiveMessage(const String& message)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message));
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
+static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didReceiveBinaryData(binaryData);
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
+void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData));
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidUpdateBufferedAmount(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
+static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, unsigned long consumed)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
+ workerClientWrapper->didConsumeBufferedAmount(consumed);
}
-void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
+void Peer::didConsumeBufferedAmount(unsigned long consumed)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount));
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper, consumed);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
+static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didStartClosingHandshake();
}
-void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
+void Peer::didStartClosingHandshake()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper));
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
+static void workerGlobalScopeDidClose(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
+ workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
}
-void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
+void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT(isMainThread());
- m_mainWebSocketChannel = nullptr;
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason));
+ if (m_mainWebSocketChannel) {
+ m_mainWebSocketChannel->disconnect();
+ m_mainWebSocketChannel = nullptr;
+ }
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidClose, m_workerClientWrapper, closingHandshakeCompletion, code, reason);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
-static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
+static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, ThreadableWebSocketChannelClientWrapper* workerClientWrapper)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didReceiveMessageError();
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
+void Peer::didReceiveMessageError()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper));
+ // It is important to seprate task creation from posting
+ // the task. See the above comment.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper);
+ m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
+}
+
+void Peer::trace(Visitor* visitor)
+{
+ visitor->trace(m_workerClientWrapper);
+ visitor->trace(m_mainWebSocketChannel);
+ visitor->trace(m_syncHelper);
+ WebSocketChannelClient::trace(visitor);
}
-WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
+Bridge::Bridge(ThreadableWebSocketChannelClientWrapper* workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
: m_workerClientWrapper(workerClientWrapper)
, m_workerGlobalScope(workerGlobalScope)
, m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
- , m_syncHelper(0)
+ , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent())))
+ , m_peer(Peer::create(m_workerClientWrapper, m_loaderProxy, m_syncHelper))
{
ASSERT(m_workerClientWrapper.get());
}
-WorkerThreadableWebSocketChannel::Bridge::~Bridge()
+Bridge::~Bridge()
{
- disconnect();
+ ASSERT(!m_peer);
}
-void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceURL, unsigned lineNumber)
+void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
{
- RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound();
- m_peer = WeakPtr<Peer>(reference);
-
- OwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = ThreadableWebSocketChannelSyncHelper::create(adoptPtr(blink::Platform::current()->createWaitableEvent()));
- // This pointer is guaranteed to be valid until we call terminatePeer.
- m_syncHelper = syncHelper.get();
-
- RefPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sourceURL, lineNumber, syncHelper.release()))) {
+ // In order to assure all temporary objects to be destroyed before
+ // posting the task, we separate task creation and posting.
+ // In other words, it is dangerous to have a complicated expression
+ // as a waitForMethodCompletion argument.
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber);
+ if (!waitForMethodCompletion(task.release())) {
// The worker thread has been signalled to shutdown before method completion.
- terminatePeer();
+ disconnect();
}
}
-bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
+bool Bridge::connect(const KURL& url, const String& protocol)
{
- if (hasTerminatedPeer())
+ if (!m_peer)
return false;
- RefPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy()))))
+ if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
return false;
return m_syncHelper->connectRequestResult();
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
+void Bridge::send(const String& message)
{
- if (hasTerminatedPeer())
- return WebSocketChannel::SendFail;
-
- RefPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy()))))
- return WebSocketChannel::SendFail;
-
- return m_syncHelper->sendRequestResult();
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message));
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
+void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- if (hasTerminatedPeer())
- return WebSocketChannel::SendFail;
-
+ ASSERT(m_peer);
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
if (binaryData.byteLength())
memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
- RefPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release()))))
- return WebSocketChannel::SendFail;
-
- return m_syncHelper->sendRequestResult();
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
}
-WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(PassRefPtr<BlobDataHandle> data)
+void Bridge::send(PassRefPtr<BlobDataHandle> data)
{
- if (hasTerminatedPeer())
- return WebSocketChannel::SendFail;
-
- RefPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data))))
- return WebSocketChannel::SendFail;
-
- return m_syncHelper->sendRequestResult();
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data));
}
-unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
+void Bridge::close(int code, const String& reason)
{
- if (hasTerminatedPeer())
- return 0;
-
- RefPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmount, m_peer))))
- return 0;
-
- return m_syncHelper->bufferedAmount();
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
}
-void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
+void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
- if (hasTerminatedPeer())
- return;
-
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_peer, code, reason.isolatedCopy())));
+ ASSERT(m_peer);
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
}
-void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
+void Bridge::disconnect()
{
- if (hasTerminatedPeer())
+ if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::fail, m_peer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber)));
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::disconnect()
-{
- clearClientWrapper();
- terminatePeer();
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::suspend()
-{
- if (hasTerminatedPeer())
- return;
-
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::suspend, m_peer)));
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::resume()
-{
- if (hasTerminatedPeer())
- return;
-
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::resume, m_peer)));
-}
-
-void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
-{
m_workerClientWrapper->clearClient();
+ waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
+ // Here |m_peer| is detached from the main thread and we can delete it.
+
+ m_peer = nullptr;
+ m_syncHelper = nullptr;
+ // We won't use this any more.
+ m_workerGlobalScope.clear();
}
// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
-bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
+bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
{
ASSERT(m_workerGlobalScope);
ASSERT(m_syncHelper);
m_loaderProxy.postTaskToLoader(task);
- blink::WebWaitableEvent* shutdownEvent = m_workerGlobalScope->thread()->shutdownEvent();
- Vector<blink::WebWaitableEvent*> events;
- events.append(shutdownEvent);
- events.append(m_syncHelper->event());
-
+ // We wait for the syncHelper event even if a shutdown event is fired.
+ // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
- blink::WebWaitableEvent* signalled = blink::Platform::current()->waitMultipleEvents(events);
- return signalled != shutdownEvent;
+ m_syncHelper->wait();
+ // This is checking whether a shutdown event is fired or not.
+ return !m_workerGlobalScope->thread()->terminated();
}
-void WorkerThreadableWebSocketChannel::Bridge::terminatePeer()
+void Bridge::trace(Visitor* visitor)
{
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
- // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
- // We must not touch m_syncHelper any more.
- m_syncHelper = 0;
-
- // We won't use this any more.
- m_workerGlobalScope = nullptr;
+ visitor->trace(m_workerClientWrapper);
+ visitor->trace(m_workerGlobalScope);
+ visitor->trace(m_syncHelper);
+ visitor->trace(m_peer);
}
-} // namespace WebCore
+} // namespace blink