- add sources.
[platform/framework/web/crosswalk.git] / src / net / websockets / websocket_job.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_job.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
24 #include "url/gurl.h"
25
26 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
27
28 namespace {
29
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
32   "cookie", "cookie2"
33 };
34 const char* const kSetCookieHeaders[] = {
35   "set-cookie", "set-cookie2"
36 };
37
38 net::SocketStreamJob* WebSocketJobFactory(
39     const GURL& url, net::SocketStream::Delegate* delegate) {
40   net::WebSocketJob* job = new net::WebSocketJob(delegate);
41   job->InitSocketStream(new net::SocketStream(url, job));
42   return job;
43 }
44
45 class WebSocketJobInitSingleton {
46  private:
47   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
48   WebSocketJobInitSingleton() {
49     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
50     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
51   }
52 };
53
54 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
55     LAZY_INSTANCE_INITIALIZER;
56
57 }  // anonymous namespace
58
59 namespace net {
60
61 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
62
63 // static
64 void WebSocketJob::EnsureInit() {
65   g_websocket_job_init.Get();
66 }
67
68 // static
69 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
70   websocket_over_spdy_enabled_ = enabled;
71 }
72
73 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
74     : delegate_(delegate),
75       state_(INITIALIZED),
76       waiting_(false),
77       handshake_request_(new WebSocketHandshakeRequestHandler),
78       handshake_response_(new WebSocketHandshakeResponseHandler),
79       started_to_send_handshake_request_(false),
80       handshake_request_sent_(0),
81       response_cookies_save_index_(0),
82       spdy_protocol_version_(0),
83       save_next_cookie_running_(false),
84       callback_pending_(false),
85       weak_ptr_factory_(this),
86       weak_ptr_factory_for_send_pending_(this) {
87 }
88
89 WebSocketJob::~WebSocketJob() {
90   DCHECK_EQ(CLOSED, state_);
91   DCHECK(!delegate_);
92   DCHECK(!socket_.get());
93 }
94
95 void WebSocketJob::Connect() {
96   DCHECK(socket_.get());
97   DCHECK_EQ(state_, INITIALIZED);
98   state_ = CONNECTING;
99   socket_->Connect();
100 }
101
102 bool WebSocketJob::SendData(const char* data, int len) {
103   switch (state_) {
104     case INITIALIZED:
105       return false;
106
107     case CONNECTING:
108       return SendHandshakeRequest(data, len);
109
110     case OPEN:
111       {
112         scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
113         memcpy(buffer->data(), data, len);
114         if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
115           send_buffer_queue_.push_back(buffer);
116           return true;
117         }
118         current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
119         return SendDataInternal(current_send_buffer_->data(),
120                                 current_send_buffer_->BytesRemaining());
121       }
122
123     case CLOSING:
124     case CLOSED:
125       return false;
126   }
127   return false;
128 }
129
130 void WebSocketJob::Close() {
131   if (state_ == CLOSED)
132     return;
133
134   state_ = CLOSING;
135   if (current_send_buffer_.get()) {
136     // Will close in SendPending.
137     return;
138   }
139   state_ = CLOSED;
140   CloseInternal();
141 }
142
143 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
144   state_ = CONNECTING;
145   socket_->RestartWithAuth(credentials);
146 }
147
148 void WebSocketJob::DetachDelegate() {
149   state_ = CLOSED;
150   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
151
152   scoped_refptr<WebSocketJob> protect(this);
153   weak_ptr_factory_.InvalidateWeakPtrs();
154   weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
155
156   delegate_ = NULL;
157   if (socket_.get())
158     socket_->DetachDelegate();
159   socket_ = NULL;
160   if (!callback_.is_null()) {
161     waiting_ = false;
162     callback_.Reset();
163     Release();  // Balanced with OnStartOpenConnection().
164   }
165 }
166
167 int WebSocketJob::OnStartOpenConnection(
168     SocketStream* socket, const CompletionCallback& callback) {
169   DCHECK(callback_.is_null());
170   state_ = CONNECTING;
171
172   addresses_ = socket->address_list();
173   if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
174     return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
175   }
176
177   if (delegate_) {
178     int result = delegate_->OnStartOpenConnection(socket, callback);
179     DCHECK_EQ(OK, result);
180   }
181   if (waiting_) {
182     // PutInQueue() may set |waiting_| true for throttling. In this case,
183     // Wakeup() will be called later.
184     callback_ = callback;
185     AddRef();  // Balanced when callback_ is cleared.
186     return ERR_IO_PENDING;
187   }
188   return TrySpdyStream();
189 }
190
191 void WebSocketJob::OnConnected(
192     SocketStream* socket, int max_pending_send_allowed) {
193   if (state_ == CLOSED)
194     return;
195   DCHECK_EQ(CONNECTING, state_);
196   if (delegate_)
197     delegate_->OnConnected(socket, max_pending_send_allowed);
198 }
199
200 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
201   DCHECK_NE(INITIALIZED, state_);
202   DCHECK_GT(amount_sent, 0);
203   if (state_ == CLOSED)
204     return;
205   if (state_ == CONNECTING) {
206     OnSentHandshakeRequest(socket, amount_sent);
207     return;
208   }
209   if (delegate_) {
210     DCHECK(state_ == OPEN || state_ == CLOSING);
211     if (!current_send_buffer_.get()) {
212       VLOG(1)
213           << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
214       return;
215     }
216     current_send_buffer_->DidConsume(amount_sent);
217     if (current_send_buffer_->BytesRemaining() > 0)
218       return;
219
220     // We need to report amount_sent of original buffer size, instead of
221     // amount sent to |socket|.
222     amount_sent = current_send_buffer_->size();
223     DCHECK_GT(amount_sent, 0);
224     current_send_buffer_ = NULL;
225     if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
226       base::MessageLoopForIO::current()->PostTask(
227           FROM_HERE,
228           base::Bind(&WebSocketJob::SendPending,
229                      weak_ptr_factory_for_send_pending_.GetWeakPtr()));
230     }
231     delegate_->OnSentData(socket, amount_sent);
232   }
233 }
234
235 void WebSocketJob::OnReceivedData(
236     SocketStream* socket, const char* data, int len) {
237   DCHECK_NE(INITIALIZED, state_);
238   if (state_ == CLOSED)
239     return;
240   if (state_ == CONNECTING) {
241     OnReceivedHandshakeResponse(socket, data, len);
242     return;
243   }
244   DCHECK(state_ == OPEN || state_ == CLOSING);
245   if (delegate_ && len > 0)
246     delegate_->OnReceivedData(socket, data, len);
247 }
248
249 void WebSocketJob::OnClose(SocketStream* socket) {
250   state_ = CLOSED;
251   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
252
253   scoped_refptr<WebSocketJob> protect(this);
254   weak_ptr_factory_.InvalidateWeakPtrs();
255
256   SocketStream::Delegate* delegate = delegate_;
257   delegate_ = NULL;
258   socket_ = NULL;
259   if (!callback_.is_null()) {
260     waiting_ = false;
261     callback_.Reset();
262     Release();  // Balanced with OnStartOpenConnection().
263   }
264   if (delegate)
265     delegate->OnClose(socket);
266 }
267
268 void WebSocketJob::OnAuthRequired(
269     SocketStream* socket, AuthChallengeInfo* auth_info) {
270   if (delegate_)
271     delegate_->OnAuthRequired(socket, auth_info);
272 }
273
274 void WebSocketJob::OnSSLCertificateError(
275     SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
276   if (delegate_)
277     delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
278 }
279
280 void WebSocketJob::OnError(const SocketStream* socket, int error) {
281   if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
282     delegate_->OnError(socket, error);
283 }
284
285 void WebSocketJob::OnCreatedSpdyStream(int result) {
286   DCHECK(spdy_websocket_stream_.get());
287   DCHECK(socket_.get());
288   DCHECK_NE(ERR_IO_PENDING, result);
289
290   if (state_ == CLOSED) {
291     result = ERR_ABORTED;
292   } else if (result == OK) {
293     state_ = CONNECTING;
294     result = ERR_PROTOCOL_SWITCHED;
295   } else {
296     spdy_websocket_stream_.reset();
297   }
298
299   CompleteIO(result);
300 }
301
302 void WebSocketJob::OnSentSpdyHeaders() {
303   DCHECK_NE(INITIALIZED, state_);
304   if (state_ != CONNECTING)
305     return;
306   if (delegate_)
307     delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
308   handshake_request_.reset();
309 }
310
311 void WebSocketJob::OnSpdyResponseHeadersUpdated(
312     const SpdyHeaderBlock& response_headers) {
313   DCHECK_NE(INITIALIZED, state_);
314   if (state_ != CONNECTING)
315     return;
316   // TODO(toyoshim): Fallback to non-spdy connection?
317   handshake_response_->ParseResponseHeaderBlock(response_headers,
318                                                 challenge_,
319                                                 spdy_protocol_version_);
320
321   SaveCookiesAndNotifyHeadersComplete();
322 }
323
324 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
325   DCHECK_NE(INITIALIZED, state_);
326   DCHECK_NE(CONNECTING, state_);
327   if (state_ == CLOSED)
328     return;
329   if (!spdy_websocket_stream_.get())
330     return;
331   OnSentData(socket_.get(), static_cast<int>(bytes_sent));
332 }
333
334 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
335   DCHECK_NE(INITIALIZED, state_);
336   DCHECK_NE(CONNECTING, state_);
337   if (state_ == CLOSED)
338     return;
339   if (!spdy_websocket_stream_.get())
340     return;
341   if (buffer) {
342     OnReceivedData(
343         socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
344   } else {
345     OnReceivedData(socket_.get(), NULL, 0);
346   }
347 }
348
349 void WebSocketJob::OnCloseSpdyStream() {
350   spdy_websocket_stream_.reset();
351   OnClose(socket_.get());
352 }
353
354 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
355   DCHECK_EQ(state_, CONNECTING);
356   if (started_to_send_handshake_request_)
357     return false;
358   if (!handshake_request_->ParseRequest(data, len))
359     return false;
360
361   AddCookieHeaderAndSend();
362   return true;
363 }
364
365 void WebSocketJob::AddCookieHeaderAndSend() {
366   bool allow = true;
367   if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
368     allow = false;
369
370   if (socket_.get() && delegate_ && state_ == CONNECTING) {
371     handshake_request_->RemoveHeaders(kCookieHeaders,
372                                       arraysize(kCookieHeaders));
373     if (allow && socket_->context()->cookie_store()) {
374       // Add cookies, including HttpOnly cookies.
375       CookieOptions cookie_options;
376       cookie_options.set_include_httponly();
377       socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
378           GetURLForCookies(), cookie_options,
379           base::Bind(&WebSocketJob::LoadCookieCallback,
380                      weak_ptr_factory_.GetWeakPtr()));
381     } else {
382       DoSendData();
383     }
384   }
385 }
386
387 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
388   if (!cookie.empty())
389     // TODO(tyoshino): Sending cookie means that connection doesn't need
390     // kPrivacyModeEnabled as cookies may be server-bound and channel id
391     // wouldn't negatively affect privacy anyway. Need to restart connection
392     // or refactor to determine cookie status prior to connecting.
393     handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
394   DoSendData();
395 }
396
397 void WebSocketJob::DoSendData() {
398   if (spdy_websocket_stream_.get()) {
399     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
400     handshake_request_->GetRequestHeaderBlock(
401         socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
402     spdy_websocket_stream_->SendRequest(headers.Pass());
403   } else {
404     const std::string& handshake_request =
405         handshake_request_->GetRawRequest();
406     handshake_request_sent_ = 0;
407     socket_->net_log()->AddEvent(
408         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
409         base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
410     socket_->SendData(handshake_request.data(),
411                       handshake_request.size());
412   }
413   // Just buffered in |handshake_request_|.
414   started_to_send_handshake_request_ = true;
415 }
416
417 void WebSocketJob::OnSentHandshakeRequest(
418     SocketStream* socket, int amount_sent) {
419   DCHECK_EQ(state_, CONNECTING);
420   handshake_request_sent_ += amount_sent;
421   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
422   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
423     // handshake request has been sent.
424     // notify original size of handshake request to delegate.
425     if (delegate_)
426       delegate_->OnSentData(
427           socket,
428           handshake_request_->original_length());
429     handshake_request_.reset();
430   }
431 }
432
433 void WebSocketJob::OnReceivedHandshakeResponse(
434     SocketStream* socket, const char* data, int len) {
435   DCHECK_EQ(state_, CONNECTING);
436   if (handshake_response_->HasResponse()) {
437     // If we already has handshake response, received data should be frame
438     // data, not handshake message.
439     received_data_after_handshake_.insert(
440         received_data_after_handshake_.end(), data, data + len);
441     return;
442   }
443
444   size_t response_length = handshake_response_->ParseRawResponse(data, len);
445   if (!handshake_response_->HasResponse()) {
446     // not yet. we need more data.
447     return;
448   }
449   // handshake message is completed.
450   std::string raw_response = handshake_response_->GetRawResponse();
451   socket_->net_log()->AddEvent(
452       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
453       base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
454   if (len - response_length > 0) {
455     // If we received extra data, it should be frame data.
456     DCHECK(received_data_after_handshake_.empty());
457     received_data_after_handshake_.assign(data + response_length, data + len);
458   }
459   SaveCookiesAndNotifyHeadersComplete();
460 }
461
462 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
463   // handshake message is completed.
464   DCHECK(handshake_response_->HasResponse());
465
466   // Extract cookies from the handshake response into a temporary vector.
467   response_cookies_.clear();
468   response_cookies_save_index_ = 0;
469
470   handshake_response_->GetHeaders(
471       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
472
473   // Now, loop over the response cookies, and attempt to persist each.
474   SaveNextCookie();
475 }
476
477 void WebSocketJob::NotifyHeadersComplete() {
478   // Remove cookie headers, with malformed headers preserved.
479   // Actual handshake should be done in Blink.
480   handshake_response_->RemoveHeaders(
481       kSetCookieHeaders, arraysize(kSetCookieHeaders));
482   std::string handshake_response = handshake_response_->GetResponse();
483   handshake_response_.reset();
484   std::vector<char> received_data(handshake_response.begin(),
485                                   handshake_response.end());
486   received_data.insert(received_data.end(),
487                        received_data_after_handshake_.begin(),
488                        received_data_after_handshake_.end());
489   received_data_after_handshake_.clear();
490
491   state_ = OPEN;
492
493   DCHECK(!received_data.empty());
494   if (delegate_)
495     delegate_->OnReceivedData(
496         socket_.get(), &received_data.front(), received_data.size());
497
498   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
499 }
500
501 void WebSocketJob::SaveNextCookie() {
502   if (!socket_.get() || !delegate_ || state_ != CONNECTING)
503     return;
504
505   callback_pending_ = false;
506   save_next_cookie_running_ = true;
507
508   if (socket_->context()->cookie_store()) {
509     GURL url_for_cookies = GetURLForCookies();
510
511     CookieOptions options;
512     options.set_include_httponly();
513
514     // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
515     // CookieMonster's asynchronous operation APIs queue the callback to run it
516     // on the thread where the API was called, there won't be race. I.e. unless
517     // the callback is run synchronously, it won't be run in parallel with this
518     // method.
519     while (!callback_pending_ &&
520            response_cookies_save_index_ < response_cookies_.size()) {
521       std::string cookie = response_cookies_[response_cookies_save_index_];
522       response_cookies_save_index_++;
523
524       if (!delegate_->CanSetCookie(
525               socket_.get(), url_for_cookies, cookie, &options))
526         continue;
527
528       callback_pending_ = true;
529       socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
530           url_for_cookies, cookie, options,
531           base::Bind(&WebSocketJob::OnCookieSaved,
532                      weak_ptr_factory_.GetWeakPtr()));
533     }
534   }
535
536   save_next_cookie_running_ = false;
537
538   if (callback_pending_)
539     return;
540
541   response_cookies_.clear();
542   response_cookies_save_index_ = 0;
543
544   NotifyHeadersComplete();
545 }
546
547 void WebSocketJob::OnCookieSaved(bool cookie_status) {
548   // Tell the caller of SetCookieWithOptionsAsync() that this completion
549   // callback is invoked.
550   // - If the caller checks callback_pending earlier than this callback, the
551   //   caller exits to let this method continue iteration.
552   // - Otherwise, the caller continues iteration.
553   callback_pending_ = false;
554
555   // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
556   // the loop. Otherwise, return.
557   if (save_next_cookie_running_)
558     return;
559
560   SaveNextCookie();
561 }
562
563 GURL WebSocketJob::GetURLForCookies() const {
564   GURL url = socket_->url();
565   std::string scheme = socket_->is_secure() ? "https" : "http";
566   url_canon::Replacements<char> replacements;
567   replacements.SetScheme(scheme.c_str(),
568                          url_parse::Component(0, scheme.length()));
569   return url.ReplaceComponents(replacements);
570 }
571
572 const AddressList& WebSocketJob::address_list() const {
573   return addresses_;
574 }
575
576 int WebSocketJob::TrySpdyStream() {
577   if (!socket_.get())
578     return ERR_FAILED;
579
580   if (!websocket_over_spdy_enabled_)
581     return OK;
582
583   // Check if we have a SPDY session available.
584   HttpTransactionFactory* factory =
585       socket_->context()->http_transaction_factory();
586   if (!factory)
587     return OK;
588   scoped_refptr<HttpNetworkSession> session = factory->GetSession();
589   if (!session.get())
590     return OK;
591   SpdySessionPool* spdy_pool = session->spdy_session_pool();
592   PrivacyMode privacy_mode = socket_->privacy_mode();
593   const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
594                            socket_->proxy_server(), privacy_mode);
595   // Forbid wss downgrade to SPDY without SSL.
596   // TODO(toyoshim): Does it realize the same policy with HTTP?
597   base::WeakPtr<SpdySession> spdy_session =
598       spdy_pool->FindAvailableSession(key, *socket_->net_log());
599   if (!spdy_session)
600     return OK;
601
602   SSLInfo ssl_info;
603   bool was_npn_negotiated;
604   NextProto protocol_negotiated = kProtoUnknown;
605   bool use_ssl = spdy_session->GetSSLInfo(
606       &ssl_info, &was_npn_negotiated, &protocol_negotiated);
607   if (socket_->is_secure() && !use_ssl)
608     return OK;
609
610   // Create SpdyWebSocketStream.
611   spdy_protocol_version_ = spdy_session->GetProtocolVersion();
612   spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
613
614   int result = spdy_websocket_stream_->InitializeStream(
615       socket_->url(), MEDIUM, *socket_->net_log());
616   if (result == OK) {
617     OnConnected(socket_.get(), kMaxPendingSendAllowed);
618     return ERR_PROTOCOL_SWITCHED;
619   }
620   if (result != ERR_IO_PENDING) {
621     spdy_websocket_stream_.reset();
622     return OK;
623   }
624
625   return ERR_IO_PENDING;
626 }
627
628 void WebSocketJob::SetWaiting() {
629   waiting_ = true;
630 }
631
632 bool WebSocketJob::IsWaiting() const {
633   return waiting_;
634 }
635
636 void WebSocketJob::Wakeup() {
637   if (!waiting_)
638     return;
639   waiting_ = false;
640   DCHECK(!callback_.is_null());
641   base::MessageLoopForIO::current()->PostTask(
642       FROM_HERE,
643       base::Bind(&WebSocketJob::RetryPendingIO,
644                  weak_ptr_factory_.GetWeakPtr()));
645 }
646
647 void WebSocketJob::RetryPendingIO() {
648   int result = TrySpdyStream();
649
650   // In the case of ERR_IO_PENDING, CompleteIO() will be called from
651   // OnCreatedSpdyStream().
652   if (result != ERR_IO_PENDING)
653     CompleteIO(result);
654 }
655
656 void WebSocketJob::CompleteIO(int result) {
657   // |callback_| may be null if OnClose() or DetachDelegate() was called.
658   if (!callback_.is_null()) {
659     CompletionCallback callback = callback_;
660     callback_.Reset();
661     callback.Run(result);
662     Release();  // Balanced with OnStartOpenConnection().
663   }
664 }
665
666 bool WebSocketJob::SendDataInternal(const char* data, int length) {
667   if (spdy_websocket_stream_.get())
668     return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
669   if (socket_.get())
670     return socket_->SendData(data, length);
671   return false;
672 }
673
674 void WebSocketJob::CloseInternal() {
675   if (spdy_websocket_stream_.get())
676     spdy_websocket_stream_->Close();
677   if (socket_.get())
678     socket_->Close();
679 }
680
681 void WebSocketJob::SendPending() {
682   if (current_send_buffer_.get())
683     return;
684
685   // Current buffer has been sent. Try next if any.
686   if (send_buffer_queue_.empty()) {
687     // No more data to send.
688     if (state_ == CLOSING)
689       CloseInternal();
690     return;
691   }
692
693   scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
694   send_buffer_queue_.pop_front();
695   current_send_buffer_ =
696       new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
697   SendDataInternal(current_send_buffer_->data(),
698                    current_send_buffer_->BytesRemaining());
699 }
700
701 }  // namespace net