Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / third_party / WebKit / Source / modules / websockets / WorkerWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2011, 2012 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #include "modules/websockets/WorkerWebSocketChannel.h"
34
35 #include "bindings/core/v8/ScriptCallStackFactory.h"
36 #include "core/dom/CrossThreadTask.h"
37 #include "core/dom/Document.h"
38 #include "core/dom/ExecutionContext.h"
39 #include "core/dom/ExecutionContextTask.h"
40 #include "core/fileapi/Blob.h"
41 #include "core/inspector/ScriptCallFrame.h"
42 #include "core/inspector/ScriptCallStack.h"
43 #include "core/workers/WorkerGlobalScope.h"
44 #include "core/workers/WorkerLoaderProxy.h"
45 #include "core/workers/WorkerThread.h"
46 #include "modules/websockets/DocumentWebSocketChannel.h"
47 #include "public/platform/Platform.h"
48 #include "public/platform/WebWaitableEvent.h"
49 #include "wtf/ArrayBuffer.h"
50 #include "wtf/Assertions.h"
51 #include "wtf/Functional.h"
52 #include "wtf/MainThread.h"
53 #include "wtf/text/WTFString.h"
54
55 namespace blink {
56
57 typedef WorkerWebSocketChannel::Bridge Bridge;
58 typedef WorkerWebSocketChannel::Peer Peer;
59
60 // Created and destroyed on the worker thread. All setters of this class are
61 // called on the main thread, while all getters are called on the worker
62 // thread. signalWorkerThread() must be called before any getters are called.
63 class WebSocketChannelSyncHelper : public GarbageCollectedFinalized<WebSocketChannelSyncHelper> {
64 public:
65     static WebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event)
66     {
67         return new WebSocketChannelSyncHelper(event);
68     }
69
70     ~WebSocketChannelSyncHelper()
71     {
72     }
73
74     // All setters are called on the main thread.
75     void setConnectRequestResult(bool connectRequestResult)
76     {
77         m_connectRequestResult = connectRequestResult;
78     }
79
80     // All getter are called on the worker thread.
81     bool connectRequestResult() const
82     {
83         return m_connectRequestResult;
84     }
85
86     // This should be called after all setters are called and before any
87     // getters are called.
88     void signalWorkerThread()
89     {
90         m_event->signal();
91     }
92     void wait()
93     {
94         m_event->wait();
95     }
96
97     void trace(Visitor* visitor) { }
98
99 private:
100     explicit WebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)
101         : m_event(event)
102         , m_connectRequestResult(false)
103     {
104     }
105
106     OwnPtr<WebWaitableEvent> m_event;
107     bool m_connectRequestResult;
108 };
109
110 WorkerWebSocketChannel::WorkerWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
111     : m_bridge(new Bridge(client, workerGlobalScope))
112     , m_sourceURLAtConnection(sourceURL)
113     , m_lineNumberAtConnection(lineNumber)
114 {
115     m_bridge->initialize(sourceURL, lineNumber);
116 }
117
118 WorkerWebSocketChannel::~WorkerWebSocketChannel()
119 {
120     ASSERT(!m_bridge);
121 }
122
123 bool WorkerWebSocketChannel::connect(const KURL& url, const String& protocol)
124 {
125     ASSERT(m_bridge);
126     return m_bridge->connect(url, protocol);
127 }
128
129 void WorkerWebSocketChannel::send(const String& message)
130 {
131     ASSERT(m_bridge);
132     m_bridge->send(message);
133 }
134
135 void WorkerWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
136 {
137     ASSERT(m_bridge);
138     m_bridge->send(binaryData, byteOffset, byteLength);
139 }
140
141 void WorkerWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
142 {
143     ASSERT(m_bridge);
144     m_bridge->send(blobData);
145 }
146
147 void WorkerWebSocketChannel::close(int code, const String& reason)
148 {
149     ASSERT(m_bridge);
150     m_bridge->close(code, reason);
151 }
152
153 void WorkerWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
154 {
155     if (!m_bridge)
156         return;
157
158     RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
159     if (callStack && callStack->size())  {
160         // In order to emulate the ConsoleMessage behavior,
161         // we should ignore the specified url and line number if
162         // we can get the JavaScript context.
163         m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
164     } else if (sourceURL.isEmpty() && !lineNumber) {
165         // No information is specified by the caller - use the url
166         // and the line number at the connection.
167         m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
168     } else {
169         // Use the specified information.
170         m_bridge->fail(reason, level, sourceURL, lineNumber);
171     }
172 }
173
174 void WorkerWebSocketChannel::disconnect()
175 {
176     m_bridge->disconnect();
177     m_bridge.clear();
178 }
179
180 void WorkerWebSocketChannel::trace(Visitor* visitor)
181 {
182     visitor->trace(m_bridge);
183     WebSocketChannel::trace(visitor);
184 }
185
186 Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, WebSocketChannelSyncHelper* syncHelper)
187     : m_bridge(bridge)
188     , m_loaderProxy(loaderProxy)
189     , m_mainWebSocketChannel(nullptr)
190     , m_syncHelper(syncHelper)
191 {
192     ASSERT(!isMainThread());
193 }
194
195 Peer::~Peer()
196 {
197     ASSERT(!isMainThread());
198 }
199
200 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
201 {
202     ASSERT(isMainThread());
203     Document* document = toDocument(context);
204     m_mainWebSocketChannel = DocumentWebSocketChannel::create(document, this, sourceURL, lineNumber);
205     m_syncHelper->signalWorkerThread();
206 }
207
208 void Peer::connect(const KURL& url, const String& protocol)
209 {
210     ASSERT(isMainThread());
211     ASSERT(m_syncHelper);
212     if (!m_mainWebSocketChannel) {
213         m_syncHelper->setConnectRequestResult(false);
214     } else {
215         bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
216         m_syncHelper->setConnectRequestResult(connectRequestResult);
217     }
218     m_syncHelper->signalWorkerThread();
219 }
220
221 void Peer::send(const String& message)
222 {
223     ASSERT(isMainThread());
224     if (m_mainWebSocketChannel)
225         m_mainWebSocketChannel->send(message);
226 }
227
228 void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
229 {
230     ASSERT(isMainThread());
231     if (m_mainWebSocketChannel)
232         m_mainWebSocketChannel->send(data);
233 }
234
235 void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
236 {
237     ASSERT(isMainThread());
238     if (m_mainWebSocketChannel)
239         m_mainWebSocketChannel->send(blobData);
240 }
241
242 void Peer::close(int code, const String& reason)
243 {
244     ASSERT(isMainThread());
245     ASSERT(m_syncHelper);
246     if (!m_mainWebSocketChannel)
247         return;
248     m_mainWebSocketChannel->close(code, reason);
249 }
250
251 void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
252 {
253     ASSERT(isMainThread());
254     ASSERT(m_syncHelper);
255     if (!m_mainWebSocketChannel)
256         return;
257     m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
258 }
259
260 void Peer::disconnect()
261 {
262     ASSERT(isMainThread());
263     ASSERT(m_syncHelper);
264     if (m_mainWebSocketChannel) {
265         m_mainWebSocketChannel->disconnect();
266         m_mainWebSocketChannel = nullptr;
267     }
268     m_syncHelper->signalWorkerThread();
269 }
270
271 static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridge, const String& subprotocol, const String& extensions)
272 {
273     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
274     if (bridge->client())
275         bridge->client()->didConnect(subprotocol, extensions);
276 }
277
278 void Peer::didConnect(const String& subprotocol, const String& extensions)
279 {
280     ASSERT(isMainThread());
281     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConnect, m_bridge, subprotocol, extensions));
282 }
283
284 static void workerGlobalScopeDidReceiveTextMessage(ExecutionContext* context, Bridge* bridge, const String& payload)
285 {
286     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
287     if (bridge->client())
288         bridge->client()->didReceiveTextMessage(payload);
289 }
290
291 void Peer::didReceiveTextMessage(const String& payload)
292 {
293     ASSERT(isMainThread());
294     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveTextMessage, m_bridge, payload));
295 }
296
297 static void workerGlobalScopeDidReceiveBinaryMessage(ExecutionContext* context, Bridge* bridge, PassOwnPtr<Vector<char> > payload)
298 {
299     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
300     if (bridge->client())
301         bridge->client()->didReceiveBinaryMessage(payload);
302 }
303
304 void Peer::didReceiveBinaryMessage(PassOwnPtr<Vector<char> > payload)
305 {
306     ASSERT(isMainThread());
307     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryMessage, m_bridge, payload));
308 }
309
310 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, Bridge* bridge, unsigned long consumed)
311 {
312     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
313     if (bridge->client())
314         bridge->client()->didConsumeBufferedAmount(consumed);
315 }
316
317 void Peer::didConsumeBufferedAmount(unsigned long consumed)
318 {
319     ASSERT(isMainThread());
320     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_bridge, consumed));
321 }
322
323 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, Bridge* bridge)
324 {
325     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
326     if (bridge->client())
327         bridge->client()->didStartClosingHandshake();
328 }
329
330 void Peer::didStartClosingHandshake()
331 {
332     ASSERT(isMainThread());
333     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_bridge));
334 }
335
336 static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
337 {
338     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
339     if (bridge->client())
340         bridge->client()->didClose(closingHandshakeCompletion, code, reason);
341 }
342
343 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
344 {
345     ASSERT(isMainThread());
346     if (m_mainWebSocketChannel) {
347         m_mainWebSocketChannel->disconnect();
348         m_mainWebSocketChannel = nullptr;
349     }
350     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason));
351 }
352
353 static void workerGlobalScopeDidError(ExecutionContext* context, Bridge* bridge)
354 {
355     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
356     if (bridge->client())
357         bridge->client()->didError();
358 }
359
360 void Peer::didError()
361 {
362     ASSERT(isMainThread());
363     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidError, m_bridge));
364 }
365
366 void Peer::trace(Visitor* visitor)
367 {
368     visitor->trace(m_bridge);
369     visitor->trace(m_mainWebSocketChannel);
370     visitor->trace(m_syncHelper);
371     WebSocketChannelClient::trace(visitor);
372 }
373
374 Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalScope)
375     : m_client(client)
376     , m_workerGlobalScope(workerGlobalScope)
377     , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
378     , m_syncHelper(WebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent())))
379     , m_peer(new Peer(this, m_loaderProxy, m_syncHelper))
380 {
381 }
382
383 Bridge::~Bridge()
384 {
385     ASSERT(!m_peer);
386 }
387
388 void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
389 {
390     if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber))) {
391         // The worker thread has been signalled to shutdown before method completion.
392         disconnect();
393     }
394 }
395
396 bool Bridge::connect(const KURL& url, const String& protocol)
397 {
398     if (!m_peer)
399         return false;
400
401     if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
402         return false;
403
404     return m_syncHelper->connectRequestResult();
405 }
406
407 void Bridge::send(const String& message)
408 {
409     ASSERT(m_peer);
410     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message));
411 }
412
413 void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
414 {
415     ASSERT(m_peer);
416     // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
417     OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
418     if (binaryData.byteLength())
419         memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
420
421     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
422 }
423
424 void Bridge::send(PassRefPtr<BlobDataHandle> data)
425 {
426     ASSERT(m_peer);
427     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data));
428 }
429
430 void Bridge::close(int code, const String& reason)
431 {
432     ASSERT(m_peer);
433     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
434 }
435
436 void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
437 {
438     ASSERT(m_peer);
439     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
440 }
441
442 void Bridge::disconnect()
443 {
444     if (!m_peer)
445         return;
446
447     waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
448     // Here |m_peer| is detached from the main thread and we can delete it.
449
450     m_client = nullptr;
451     m_peer = nullptr;
452     m_syncHelper = nullptr;
453     // We won't use this any more.
454     m_workerGlobalScope.clear();
455 }
456
457 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
458 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
459 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
460 {
461     ASSERT(m_workerGlobalScope);
462     ASSERT(m_syncHelper);
463
464     m_loaderProxy.postTaskToLoader(task);
465
466     // We wait for the syncHelper event even if a shutdown event is fired.
467     // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
468     ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
469     m_syncHelper->wait();
470     // This is checking whether a shutdown event is fired or not.
471     return !m_workerGlobalScope->thread()->terminated();
472 }
473
474 void Bridge::trace(Visitor* visitor)
475 {
476     visitor->trace(m_client);
477     visitor->trace(m_workerGlobalScope);
478     visitor->trace(m_syncHelper);
479     visitor->trace(m_peer);
480 }
481
482 } // namespace blink