#include "core/dom/CrossThreadTask.h"
#include "core/dom/Document.h"
#include "core/dom/ExecutionContext.h"
-#include "core/dom/ExecutionContextTask.h"
#include "core/fileapi/Blob.h"
#include "core/inspector/ScriptCallFrame.h"
#include "core/inspector/ScriptCallStack.h"
#include "public/platform/Platform.h"
#include "public/platform/WebWaitableEvent.h"
#include "wtf/ArrayBuffer.h"
+#include "wtf/Assertions.h"
#include "wtf/Functional.h"
#include "wtf/MainThread.h"
}
// All setters are called on the main thread.
+ void setConnectRequestResult(bool connectRequestResult)
+ {
+ m_connectRequestResult = connectRequestResult;
+ }
void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
{
m_sendRequestResult = sendRequestResult;
}
// All getter are called on the worker thread.
+ bool connectRequestResult() const
+ {
+ return m_connectRequestResult;
+ }
WebSocketChannel::SendResult sendRequestResult() const
{
return m_sendRequestResult;
private:
ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> event)
: m_event(event)
+ , m_connectRequestResult(false)
, m_sendRequestResult(WebSocketChannel::SendFail)
, m_bufferedAmount(0)
{
}
OwnPtr<blink::WebWaitableEvent> m_event;
+ bool m_connectRequestResult;
WebSocketChannel::SendResult m_sendRequestResult;
unsigned long m_bufferedAmount;
};
-WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
- : m_workerGlobalScope(context)
- , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
- , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope))
+WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
+ : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
+ , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
, m_sourceURLAtConnection(sourceURL)
, m_lineNumberAtConnection(lineNumber)
{
+ ASSERT(m_workerClientWrapper.get());
m_bridge->initialize(sourceURL, lineNumber);
}
m_bridge->disconnect();
}
-void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
+bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
{
if (m_bridge)
- m_bridge->connect(url, protocol);
+ return m_bridge->connect(url, protocol);
+ return false;
}
String WorkerThreadableWebSocketChannel::subprotocol()
{
- ASSERT(m_workerClientWrapper);
return m_workerClientWrapper->subprotocol();
}
String WorkerThreadableWebSocketChannel::extensions()
{
- ASSERT(m_workerClientWrapper);
return m_workerClientWrapper->extensions();
}
, m_weakFactory(reference, this)
{
ASSERT(isMainThread());
+ ASSERT(m_workerClientWrapper.get());
+
Document* document = toDocument(context);
if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->connect(url, protocol);
+ if (!m_mainWebSocketChannel) {
+ m_syncHelper->setConnectRequestResult(false);
+ } else {
+ bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
+ m_syncHelper->setConnectRequestResult(connectRequestResult);
+ }
+ m_syncHelper->signalWorkerThread();
}
void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper) {
+ if (!m_mainWebSocketChannel) {
m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
} else {
WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
void WorkerThreadableWebSocketChannel::Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper) {
+ if (!m_mainWebSocketChannel) {
m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
} else {
RefPtr<ArrayBuffer> binaryData = ArrayBuffer::create(data->data(), data->size());
+ if (!binaryData) {
+ // Failed to allocate an ArrayBuffer. We need to crash the renderer
+ // since there's no way defined in the spec to tell this to the
+ // user.
+ CRASH();
+ }
WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(*binaryData, 0, binaryData->byteLength());
m_syncHelper->setSendRequestResult(sendRequestResult);
}
void WorkerThreadableWebSocketChannel::Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper) {
+ if (!m_mainWebSocketChannel) {
m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
} else {
WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(blobData);
void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper) {
+ if (!m_mainWebSocketChannel) {
m_syncHelper->setBufferedAmount(0);
} else {
unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper));
}
-WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtrWillBeRawPtr<WorkerGlobalScope> workerGlobalScope)
+WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
: m_workerClientWrapper(workerClientWrapper)
, m_workerGlobalScope(workerGlobalScope)
, m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
m_syncHelper = syncHelper.get();
RefPtr<Bridge> protect(this);
- m_loaderProxy.postTaskToLoader(createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sourceURL, lineNumber, syncHelper.release()));
- if (!waitForMethodCompletion()) {
+ if (!waitForMethodCompletion(createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sourceURL, lineNumber, syncHelper.release()))) {
// The worker thread has been signalled to shutdown before method completion.
terminatePeer();
}
}
-void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
+bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
{
- ASSERT(m_workerClientWrapper);
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy())));
+ if (hasTerminatedPeer())
+ return false;
+
+ RefPtr<Bridge> protect(this);
+ if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy()))))
+ return false;
+
+ return m_syncHelper->connectRequestResult();
}
WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
{
- if (!m_workerClientWrapper || !m_workerGlobalScope)
+ if (hasTerminatedPeer())
return WebSocketChannel::SendFail;
- ASSERT(m_syncHelper);
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy())));
+
RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
+ if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy()))))
+ return WebSocketChannel::SendFail;
+
return m_syncHelper->sendRequestResult();
}
WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- if (!m_workerClientWrapper || !m_workerGlobalScope)
+ if (hasTerminatedPeer())
return WebSocketChannel::SendFail;
- ASSERT(m_syncHelper);
+
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
if (binaryData.byteLength())
memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release())));
+
RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
+ if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release()))))
+ return WebSocketChannel::SendFail;
+
return m_syncHelper->sendRequestResult();
}
WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(PassRefPtr<BlobDataHandle> data)
{
- if (!m_workerClientWrapper || !m_workerGlobalScope)
+ if (hasTerminatedPeer())
return WebSocketChannel::SendFail;
- ASSERT(m_syncHelper);
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data)));
+
RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
+ if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data))))
+ return WebSocketChannel::SendFail;
+
return m_syncHelper->sendRequestResult();
}
unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
{
- if (!m_workerClientWrapper || !m_workerGlobalScope)
+ if (hasTerminatedPeer())
return 0;
- ASSERT(m_syncHelper);
- m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::bufferedAmount, m_peer)));
+
RefPtr<Bridge> protect(this);
- waitForMethodCompletion();
+ if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmount, m_peer))))
+ return 0;
+
return m_syncHelper->bufferedAmount();
}
void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
{
+ if (hasTerminatedPeer())
+ return;
+
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_peer, code, reason.isolatedCopy())));
}
void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
+ if (hasTerminatedPeer())
+ return;
+
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::fail, m_peer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber)));
}
void WorkerThreadableWebSocketChannel::Bridge::suspend()
{
+ if (hasTerminatedPeer())
+ return;
+
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::suspend, m_peer)));
}
void WorkerThreadableWebSocketChannel::Bridge::resume()
{
+ if (hasTerminatedPeer())
+ return;
+
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::resume, m_peer)));
}
// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
-bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
+bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
{
- if (!m_workerClientWrapper || !m_syncHelper)
- return true;
+ ASSERT(m_workerGlobalScope);
+ ASSERT(m_syncHelper);
+
+ m_loaderProxy.postTaskToLoader(task);
blink::WebWaitableEvent* shutdownEvent = m_workerGlobalScope->thread()->shutdownEvent();
Vector<blink::WebWaitableEvent*> events;
events.append(shutdownEvent);
events.append(m_syncHelper->event());
+ ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
blink::WebWaitableEvent* signalled = blink::Platform::current()->waitMultipleEvents(events);
return signalled != shutdownEvent;
}
void WorkerThreadableWebSocketChannel::Bridge::terminatePeer()
{
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
- m_workerGlobalScope = nullptr;
+ // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
+ // We must not touch m_syncHelper any more.
m_syncHelper = 0;
+
+ // We won't use this any more.
+ m_workerGlobalScope = nullptr;
}
} // namespace WebCore