47114f846455490edbb434cf4890d1a1e28b08a7
[platform/framework/web/crosswalk.git] / src / net / websockets / websocket_channel.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_channel.h"
6
7 #include <limits.h>  // for INT_MAX
8
9 #include <algorithm>
10 #include <deque>
11
12 #include "base/basictypes.h"  // for size_t
13 #include "base/big_endian.h"
14 #include "base/bind.h"
15 #include "base/compiler_specific.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/weak_ptr.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/metrics/histogram.h"
20 #include "base/numerics/safe_conversions.h"
21 #include "base/stl_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/time/time.h"
24 #include "net/base/io_buffer.h"
25 #include "net/base/net_log.h"
26 #include "net/http/http_request_headers.h"
27 #include "net/http/http_response_headers.h"
28 #include "net/http/http_util.h"
29 #include "net/websockets/websocket_errors.h"
30 #include "net/websockets/websocket_event_interface.h"
31 #include "net/websockets/websocket_frame.h"
32 #include "net/websockets/websocket_handshake_request_info.h"
33 #include "net/websockets/websocket_handshake_response_info.h"
34 #include "net/websockets/websocket_mux.h"
35 #include "net/websockets/websocket_stream.h"
36 #include "url/origin.h"
37
38 namespace net {
39
40 namespace {
41
42 using base::StreamingUtf8Validator;
43
44 const int kDefaultSendQuotaLowWaterMark = 1 << 16;
45 const int kDefaultSendQuotaHighWaterMark = 1 << 17;
46 const size_t kWebSocketCloseCodeLength = 2;
47 // This timeout is based on TCPMaximumSegmentLifetime * 2 from
48 // MainThreadWebSocketChannel.cpp in Blink.
49 const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;
50
51 typedef WebSocketEventInterface::ChannelState ChannelState;
52 const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
53 const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
54
55 // Maximum close reason length = max control frame payload -
56 //                               status code length
57 //                             = 125 - 2
58 const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
59
60 // Check a close status code for strict compliance with RFC6455. This is only
61 // used for close codes received from a renderer that we are intending to send
62 // out over the network. See ParseClose() for the restrictions on incoming close
63 // codes. The |code| parameter is type int for convenience of implementation;
64 // the real type is uint16. Code 1005 is treated specially; it cannot be set
65 // explicitly by Javascript but the renderer uses it to indicate we should send
66 // a Close frame with no payload.
67 bool IsStrictlyValidCloseStatusCode(int code) {
68   static const int kInvalidRanges[] = {
69       // [BAD, OK)
70       0,    1000,   // 1000 is the first valid code
71       1006, 1007,   // 1006 MUST NOT be set.
72       1014, 3000,   // 1014 unassigned; 1015 up to 2999 are reserved.
73       5000, 65536,  // Codes above 5000 are invalid.
74   };
75   const int* const kInvalidRangesEnd =
76       kInvalidRanges + arraysize(kInvalidRanges);
77
78   DCHECK_GE(code, 0);
79   DCHECK_LT(code, 65536);
80   const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
81   DCHECK_NE(kInvalidRangesEnd, upper);
82   DCHECK_GT(upper, kInvalidRanges);
83   DCHECK_GT(*upper, code);
84   DCHECK_LE(*(upper - 1), code);
85   return ((upper - kInvalidRanges) % 2) == 0;
86 }
87
88 // This function avoids a bunch of boilerplate code.
89 void AllowUnused(ChannelState ALLOW_UNUSED unused) {}
90
91 // Sets |name| to the name of the frame type for the given |opcode|. Note that
92 // for all of Text, Binary and Continuation opcode, this method returns
93 // "Data frame".
94 void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
95                            std::string* name) {
96   switch (opcode) {
97     case WebSocketFrameHeader::kOpCodeText:    // fall-thru
98     case WebSocketFrameHeader::kOpCodeBinary:  // fall-thru
99     case WebSocketFrameHeader::kOpCodeContinuation:
100       *name = "Data frame";
101       break;
102
103     case WebSocketFrameHeader::kOpCodePing:
104       *name = "Ping";
105       break;
106
107     case WebSocketFrameHeader::kOpCodePong:
108       *name = "Pong";
109       break;
110
111     case WebSocketFrameHeader::kOpCodeClose:
112       *name = "Close";
113       break;
114
115     default:
116       *name = "Unknown frame type";
117       break;
118   }
119
120   return;
121 }
122
123 }  // namespace
124
125 // A class to encapsulate a set of frames and information about the size of
126 // those frames.
127 class WebSocketChannel::SendBuffer {
128  public:
129   SendBuffer() : total_bytes_(0) {}
130
131   // Add a WebSocketFrame to the buffer and increase total_bytes_.
132   void AddFrame(scoped_ptr<WebSocketFrame> chunk);
133
134   // Return a pointer to the frames_ for write purposes.
135   ScopedVector<WebSocketFrame>* frames() { return &frames_; }
136
137  private:
138   // The frames_ that will be sent in the next call to WriteFrames().
139   ScopedVector<WebSocketFrame> frames_;
140
141   // The total size of the payload data in |frames_|. This will be used to
142   // measure the throughput of the link.
143   // TODO(ricea): Measure the throughput of the link.
144   size_t total_bytes_;
145 };
146
147 void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
148   total_bytes_ += frame->header.payload_length;
149   frames_.push_back(frame.release());
150 }
151
152 // Implementation of WebSocketStream::ConnectDelegate that simply forwards the
153 // calls on to the WebSocketChannel that created it.
154 class WebSocketChannel::ConnectDelegate
155     : public WebSocketStream::ConnectDelegate {
156  public:
157   explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
158
159   virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
160     creator_->OnConnectSuccess(stream.Pass());
161     // |this| may have been deleted.
162   }
163
164   virtual void OnFailure(const std::string& message) OVERRIDE {
165     creator_->OnConnectFailure(message);
166     // |this| has been deleted.
167   }
168
169   virtual void OnStartOpeningHandshake(
170       scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
171     creator_->OnStartOpeningHandshake(request.Pass());
172   }
173
174   virtual void OnFinishOpeningHandshake(
175       scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
176     creator_->OnFinishOpeningHandshake(response.Pass());
177   }
178
179  private:
180   // A pointer to the WebSocketChannel that created this object. There is no
181   // danger of this pointer being stale, because deleting the WebSocketChannel
182   // cancels the connect process, deleting this object and preventing its
183   // callbacks from being called.
184   WebSocketChannel* const creator_;
185
186   DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
187 };
188
189 class WebSocketChannel::HandshakeNotificationSender
190     : public base::SupportsWeakPtr<HandshakeNotificationSender> {
191  public:
192   explicit HandshakeNotificationSender(WebSocketChannel* channel);
193   ~HandshakeNotificationSender();
194
195   static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
196
197   ChannelState SendImmediately(WebSocketEventInterface* event_interface);
198
199   const WebSocketHandshakeRequestInfo* handshake_request_info() const {
200     return handshake_request_info_.get();
201   }
202
203   void set_handshake_request_info(
204       scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
205     handshake_request_info_ = request_info.Pass();
206   }
207
208   const WebSocketHandshakeResponseInfo* handshake_response_info() const {
209     return handshake_response_info_.get();
210   }
211
212   void set_handshake_response_info(
213       scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
214     handshake_response_info_ = response_info.Pass();
215   }
216
217  private:
218   WebSocketChannel* owner_;
219   scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
220   scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
221 };
222
223 WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
224     WebSocketChannel* channel)
225     : owner_(channel) {}
226
227 WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
228
229 void WebSocketChannel::HandshakeNotificationSender::Send(
230     base::WeakPtr<HandshakeNotificationSender> sender) {
231   // Do nothing if |sender| is already destructed.
232   if (sender) {
233     WebSocketChannel* channel = sender->owner_;
234     AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
235   }
236 }
237
238 ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
239     WebSocketEventInterface* event_interface) {
240
241   if (handshake_request_info_.get()) {
242     if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
243                                handshake_request_info_.Pass()))
244       return CHANNEL_DELETED;
245   }
246
247   if (handshake_response_info_.get()) {
248     if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
249                                handshake_response_info_.Pass()))
250       return CHANNEL_DELETED;
251
252     // TODO(yhirano): We can release |this| to save memory because
253     // there will be no more opening handshake notification.
254   }
255
256   return CHANNEL_ALIVE;
257 }
258
259 WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
260     bool final,
261     WebSocketFrameHeader::OpCode opcode,
262     const scoped_refptr<IOBuffer>& data,
263     size_t offset,
264     size_t size)
265     : final_(final),
266       opcode_(opcode),
267       data_(data),
268       offset_(offset),
269       size_(size) {}
270
271 WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
272
273 void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
274   DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
275   opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
276 }
277
278 void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
279   DCHECK_LE(offset_, size_);
280   DCHECK_LE(bytes, size_ - offset_);
281   offset_ += bytes;
282 }
283
284 WebSocketChannel::WebSocketChannel(
285     scoped_ptr<WebSocketEventInterface> event_interface,
286     URLRequestContext* url_request_context)
287     : event_interface_(event_interface.Pass()),
288       url_request_context_(url_request_context),
289       send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
290       send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
291       current_send_quota_(0),
292       current_receive_quota_(0),
293       timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
294       received_close_code_(0),
295       state_(FRESHLY_CONSTRUCTED),
296       notification_sender_(new HandshakeNotificationSender(this)),
297       sending_text_message_(false),
298       receiving_text_message_(false),
299       expecting_to_handle_continuation_(false),
300       initial_frame_forwarded_(false) {}
301
302 WebSocketChannel::~WebSocketChannel() {
303   // The stream may hold a pointer to read_frames_, and so it needs to be
304   // destroyed first.
305   stream_.reset();
306   // The timer may have a callback pointing back to us, so stop it just in case
307   // someone decides to run the event loop from their destructor.
308   timer_.Stop();
309 }
310
311 void WebSocketChannel::SendAddChannelRequest(
312     const GURL& socket_url,
313     const std::vector<std::string>& requested_subprotocols,
314     const url::Origin& origin) {
315   // Delegate to the tested version.
316   SendAddChannelRequestWithSuppliedCreator(
317       socket_url,
318       requested_subprotocols,
319       origin,
320       base::Bind(&WebSocketStream::CreateAndConnectStream));
321 }
322
323 void WebSocketChannel::SetState(State new_state) {
324   DCHECK_NE(state_, new_state);
325
326   if (new_state == CONNECTED)
327     established_on_ = base::TimeTicks::Now();
328   if (state_ == CONNECTED && !established_on_.is_null()) {
329     UMA_HISTOGRAM_LONG_TIMES(
330         "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_);
331   }
332
333   state_ = new_state;
334 }
335
336 bool WebSocketChannel::InClosingState() const {
337   // The state RECV_CLOSED is not supported here, because it is only used in one
338   // code path and should not leak into the code in general.
339   DCHECK_NE(RECV_CLOSED, state_)
340       << "InClosingState called with state_ == RECV_CLOSED";
341   return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
342 }
343
344 void WebSocketChannel::SendFrame(bool fin,
345                                  WebSocketFrameHeader::OpCode op_code,
346                                  const std::vector<char>& data) {
347   if (data.size() > INT_MAX) {
348     NOTREACHED() << "Frame size sanity check failed";
349     return;
350   }
351   if (stream_ == NULL) {
352     LOG(DFATAL) << "Got SendFrame without a connection established; "
353                 << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
354                 << " data.size()=" << data.size();
355     return;
356   }
357   if (InClosingState()) {
358     DVLOG(1) << "SendFrame called in state " << state_
359              << ". This may be a bug, or a harmless race.";
360     return;
361   }
362   if (state_ != CONNECTED) {
363     NOTREACHED() << "SendFrame() called in state " << state_;
364     return;
365   }
366   if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
367     // TODO(ricea): Kill renderer.
368     AllowUnused(
369         FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
370     // |this| has been deleted.
371     return;
372   }
373   if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
374     LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
375                 << "; misbehaving renderer? fin=" << fin
376                 << " data.size()=" << data.size();
377     return;
378   }
379   if (op_code == WebSocketFrameHeader::kOpCodeText ||
380       (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
381        sending_text_message_)) {
382     StreamingUtf8Validator::State state =
383         outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
384     if (state == StreamingUtf8Validator::INVALID ||
385         (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
386       // TODO(ricea): Kill renderer.
387       AllowUnused(
388           FailChannel("Browser sent a text frame containing invalid UTF-8",
389                       kWebSocketErrorGoingAway,
390                       ""));
391       // |this| has been deleted.
392       return;
393     }
394     sending_text_message_ = !fin;
395     DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
396   }
397   current_send_quota_ -= data.size();
398   // TODO(ricea): If current_send_quota_ has dropped below
399   // send_quota_low_water_mark_, it might be good to increase the "low
400   // water mark" and "high water mark", but only if the link to the WebSocket
401   // server is not saturated.
402   scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
403   std::copy(data.begin(), data.end(), buffer->data());
404   AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
405   // |this| may have been deleted.
406 }
407
408 void WebSocketChannel::SendFlowControl(int64 quota) {
409   DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
410          state_ == CLOSE_WAIT);
411   // TODO(ricea): Kill the renderer if it tries to send us a negative quota
412   // value or > INT_MAX.
413   DCHECK_GE(quota, 0);
414   DCHECK_LE(quota, INT_MAX);
415   if (!pending_received_frames_.empty()) {
416     DCHECK_EQ(0, current_receive_quota_);
417   }
418   while (!pending_received_frames_.empty() && quota > 0) {
419     PendingReceivedFrame& front = pending_received_frames_.front();
420     const size_t data_size = front.size() - front.offset();
421     const size_t bytes_to_send =
422         std::min(base::checked_cast<size_t>(quota), data_size);
423     const bool final = front.final() && data_size == bytes_to_send;
424     const char* data = front.data() ?
425         front.data()->data() + front.offset() : NULL;
426     DCHECK(!bytes_to_send || data) << "Non empty data should not be null.";
427     const std::vector<char> data_vector(data, data + bytes_to_send);
428     DVLOG(3) << "Sending frame previously split due to quota to the "
429              << "renderer: quota=" << quota << " data_size=" << data_size
430              << " bytes_to_send=" << bytes_to_send;
431     if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
432         CHANNEL_DELETED)
433       return;
434     if (bytes_to_send < data_size) {
435       front.DidConsume(bytes_to_send);
436       front.ResetOpcode();
437       return;
438     }
439     const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
440     DCHECK_GE(quota, signed_bytes_to_send);
441     quota -= signed_bytes_to_send;
442
443     pending_received_frames_.pop();
444   }
445   // If current_receive_quota_ == 0 then there is no pending ReadFrames()
446   // operation.
447   const bool start_read =
448       current_receive_quota_ == 0 && quota > 0 &&
449       (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
450   current_receive_quota_ += base::checked_cast<int>(quota);
451   if (start_read)
452     AllowUnused(ReadFrames());
453   // |this| may have been deleted.
454 }
455
456 void WebSocketChannel::StartClosingHandshake(uint16 code,
457                                              const std::string& reason) {
458   if (InClosingState()) {
459     DVLOG(1) << "StartClosingHandshake called in state " << state_
460              << ". This may be a bug, or a harmless race.";
461     return;
462   }
463   if (state_ == CONNECTING) {
464     // Abort the in-progress handshake and drop the connection immediately.
465     stream_request_.reset();
466     SetState(CLOSED);
467     AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
468     return;
469   }
470   if (state_ != CONNECTED) {
471     NOTREACHED() << "StartClosingHandshake() called in state " << state_;
472     return;
473   }
474   // Javascript actually only permits 1000 and 3000-4999, but the implementation
475   // itself may produce different codes. The length of |reason| is also checked
476   // by Javascript.
477   if (!IsStrictlyValidCloseStatusCode(code) ||
478       reason.size() > kMaximumCloseReasonLength) {
479     // "InternalServerError" is actually used for errors from any endpoint, per
480     // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
481     // reason it must be malfunctioning in some way, and based on that we
482     // interpret this as an internal error.
483     if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) {
484       DCHECK_EQ(CONNECTED, state_);
485       SetState(SEND_CLOSED);
486     }
487     return;
488   }
489   if (SendClose(
490           code,
491           StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
492       CHANNEL_DELETED)
493     return;
494   DCHECK_EQ(CONNECTED, state_);
495   SetState(SEND_CLOSED);
496 }
497
498 void WebSocketChannel::SendAddChannelRequestForTesting(
499     const GURL& socket_url,
500     const std::vector<std::string>& requested_subprotocols,
501     const url::Origin& origin,
502     const WebSocketStreamCreator& creator) {
503   SendAddChannelRequestWithSuppliedCreator(
504       socket_url, requested_subprotocols, origin, creator);
505 }
506
507 void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
508     base::TimeDelta delay) {
509   timeout_ = delay;
510 }
511
512 void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
513     const GURL& socket_url,
514     const std::vector<std::string>& requested_subprotocols,
515     const url::Origin& origin,
516     const WebSocketStreamCreator& creator) {
517   DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
518   if (!socket_url.SchemeIsWSOrWSS()) {
519     // TODO(ricea): Kill the renderer (this error should have been caught by
520     // Javascript).
521     AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
522     // |this| is deleted here.
523     return;
524   }
525   socket_url_ = socket_url;
526   scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
527       new ConnectDelegate(this));
528   stream_request_ = creator.Run(socket_url_,
529                                 requested_subprotocols,
530                                 origin,
531                                 url_request_context_,
532                                 BoundNetLog(),
533                                 connect_delegate.Pass());
534   SetState(CONNECTING);
535 }
536
537 void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
538   DCHECK(stream);
539   DCHECK_EQ(CONNECTING, state_);
540
541   stream_ = stream.Pass();
542
543   SetState(CONNECTED);
544
545   if (event_interface_->OnAddChannelResponse(
546           false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
547       CHANNEL_DELETED)
548     return;
549
550   // TODO(ricea): Get flow control information from the WebSocketStream once we
551   // have a multiplexing WebSocketStream.
552   current_send_quota_ = send_quota_high_water_mark_;
553   if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
554       CHANNEL_DELETED)
555     return;
556
557   // |stream_request_| is not used once the connection has succeeded.
558   stream_request_.reset();
559
560   AllowUnused(ReadFrames());
561   // |this| may have been deleted.
562 }
563
564 void WebSocketChannel::OnConnectFailure(const std::string& message) {
565   DCHECK_EQ(CONNECTING, state_);
566
567   SetState(CLOSED);
568   stream_request_.reset();
569
570   if (CHANNEL_DELETED ==
571       notification_sender_->SendImmediately(event_interface_.get())) {
572     // |this| has been deleted.
573     return;
574   }
575   AllowUnused(event_interface_->OnFailChannel(message));
576   // |this| has been deleted.
577 }
578
579 void WebSocketChannel::OnStartOpeningHandshake(
580     scoped_ptr<WebSocketHandshakeRequestInfo> request) {
581   DCHECK(!notification_sender_->handshake_request_info());
582
583   // Because it is hard to handle an IPC error synchronously is difficult,
584   // we asynchronously notify the information.
585   notification_sender_->set_handshake_request_info(request.Pass());
586   ScheduleOpeningHandshakeNotification();
587 }
588
589 void WebSocketChannel::OnFinishOpeningHandshake(
590     scoped_ptr<WebSocketHandshakeResponseInfo> response) {
591   DCHECK(!notification_sender_->handshake_response_info());
592
593   // Because it is hard to handle an IPC error synchronously is difficult,
594   // we asynchronously notify the information.
595   notification_sender_->set_handshake_response_info(response.Pass());
596   ScheduleOpeningHandshakeNotification();
597 }
598
599 void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
600   base::MessageLoop::current()->PostTask(
601       FROM_HERE,
602       base::Bind(HandshakeNotificationSender::Send,
603                  notification_sender_->AsWeakPtr()));
604 }
605
606 ChannelState WebSocketChannel::WriteFrames() {
607   int result = OK;
608   do {
609     // This use of base::Unretained is safe because this object owns the
610     // WebSocketStream and destroying it cancels all callbacks.
611     result = stream_->WriteFrames(
612         data_being_sent_->frames(),
613         base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
614                    base::Unretained(this),
615                    false));
616     if (result != ERR_IO_PENDING) {
617       if (OnWriteDone(true, result) == CHANNEL_DELETED)
618         return CHANNEL_DELETED;
619       // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
620       // guaranteed to be the same as before OnWriteDone() call.
621     }
622   } while (result == OK && data_being_sent_);
623   return CHANNEL_ALIVE;
624 }
625
626 ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
627   DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
628   DCHECK_NE(CONNECTING, state_);
629   DCHECK_NE(ERR_IO_PENDING, result);
630   DCHECK(data_being_sent_);
631   switch (result) {
632     case OK:
633       if (data_to_send_next_) {
634         data_being_sent_ = data_to_send_next_.Pass();
635         if (!synchronous)
636           return WriteFrames();
637       } else {
638         data_being_sent_.reset();
639         if (current_send_quota_ < send_quota_low_water_mark_) {
640           // TODO(ricea): Increase low_water_mark and high_water_mark if
641           // throughput is high, reduce them if throughput is low.  Low water
642           // mark needs to be >= the bandwidth delay product *of the IPC
643           // channel*. Because factors like context-switch time, thread wake-up
644           // time, and bus speed come into play it is complex and probably needs
645           // to be determined empirically.
646           DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
647           // TODO(ricea): Truncate quota by the quota specified by the remote
648           // server, if the protocol in use supports quota.
649           int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
650           current_send_quota_ += fresh_quota;
651           return event_interface_->OnFlowControl(fresh_quota);
652         }
653       }
654       return CHANNEL_ALIVE;
655
656     // If a recoverable error condition existed, it would go here.
657
658     default:
659       DCHECK_LT(result, 0)
660           << "WriteFrames() should only return OK or ERR_ codes";
661
662       stream_->Close();
663       SetState(CLOSED);
664       return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
665   }
666 }
667
668 ChannelState WebSocketChannel::ReadFrames() {
669   int result = OK;
670   while (result == OK && current_receive_quota_ > 0) {
671     // This use of base::Unretained is safe because this object owns the
672     // WebSocketStream, and any pending reads will be cancelled when it is
673     // destroyed.
674     result = stream_->ReadFrames(
675         &read_frames_,
676         base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
677                    base::Unretained(this),
678                    false));
679     if (result != ERR_IO_PENDING) {
680       if (OnReadDone(true, result) == CHANNEL_DELETED)
681         return CHANNEL_DELETED;
682     }
683     DCHECK_NE(CLOSED, state_);
684   }
685   return CHANNEL_ALIVE;
686 }
687
688 ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
689   DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
690   DCHECK_NE(CONNECTING, state_);
691   DCHECK_NE(ERR_IO_PENDING, result);
692   switch (result) {
693     case OK:
694       // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
695       // with no data read, not an empty response.
696       DCHECK(!read_frames_.empty())
697           << "ReadFrames() returned OK, but nothing was read.";
698       for (size_t i = 0; i < read_frames_.size(); ++i) {
699         scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
700         read_frames_[i] = NULL;
701         if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
702           return CHANNEL_DELETED;
703       }
704       read_frames_.clear();
705       // There should always be a call to ReadFrames pending.
706       // TODO(ricea): Unless we are out of quota.
707       DCHECK_NE(CLOSED, state_);
708       if (!synchronous)
709         return ReadFrames();
710       return CHANNEL_ALIVE;
711
712     case ERR_WS_PROTOCOL_ERROR:
713       // This could be kWebSocketErrorProtocolError (specifically, non-minimal
714       // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
715       // extension-specific error.
716       return FailChannel("Invalid frame header",
717                          kWebSocketErrorProtocolError,
718                          "WebSocket Protocol Error");
719
720     default:
721       DCHECK_LT(result, 0)
722           << "ReadFrames() should only return OK or ERR_ codes";
723
724       stream_->Close();
725       SetState(CLOSED);
726
727       uint16 code = kWebSocketErrorAbnormalClosure;
728       std::string reason = "";
729       bool was_clean = false;
730       if (received_close_code_ != 0) {
731         code = received_close_code_;
732         reason = received_close_reason_;
733         was_clean = (result == ERR_CONNECTION_CLOSED);
734       }
735
736       return DoDropChannel(was_clean, code, reason);
737   }
738 }
739
740 ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
741   if (frame->header.masked) {
742     // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
743     // masked frame."
744     return FailChannel(
745         "A server must not mask any frames that it sends to the "
746         "client.",
747         kWebSocketErrorProtocolError,
748         "Masked frame from server");
749   }
750   const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
751   DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
752          frame->header.final);
753   if (frame->header.reserved1 || frame->header.reserved2 ||
754       frame->header.reserved3) {
755     return FailChannel(base::StringPrintf(
756                            "One or more reserved bits are on: reserved1 = %d, "
757                            "reserved2 = %d, reserved3 = %d",
758                            static_cast<int>(frame->header.reserved1),
759                            static_cast<int>(frame->header.reserved2),
760                            static_cast<int>(frame->header.reserved3)),
761                        kWebSocketErrorProtocolError,
762                        "Invalid reserved bit");
763   }
764
765   // Respond to the frame appropriately to its type.
766   return HandleFrameByState(
767       opcode, frame->header.final, frame->data, frame->header.payload_length);
768 }
769
770 ChannelState WebSocketChannel::HandleFrameByState(
771     const WebSocketFrameHeader::OpCode opcode,
772     bool final,
773     const scoped_refptr<IOBuffer>& data_buffer,
774     size_t size) {
775   DCHECK_NE(RECV_CLOSED, state_)
776       << "HandleFrame() does not support being called re-entrantly from within "
777          "SendClose()";
778   DCHECK_NE(CLOSED, state_);
779   if (state_ == CLOSE_WAIT) {
780     std::string frame_name;
781     GetFrameTypeForOpcode(opcode, &frame_name);
782
783     // FailChannel() won't send another Close frame.
784     return FailChannel(
785         frame_name + " received after close", kWebSocketErrorProtocolError, "");
786   }
787   switch (opcode) {
788     case WebSocketFrameHeader::kOpCodeText:  // fall-thru
789     case WebSocketFrameHeader::kOpCodeBinary:
790     case WebSocketFrameHeader::kOpCodeContinuation:
791       return HandleDataFrame(opcode, final, data_buffer, size);
792
793     case WebSocketFrameHeader::kOpCodePing:
794       DVLOG(1) << "Got Ping of size " << size;
795       if (state_ == CONNECTED)
796         return SendFrameFromIOBuffer(
797             true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
798       DVLOG(3) << "Ignored ping in state " << state_;
799       return CHANNEL_ALIVE;
800
801     case WebSocketFrameHeader::kOpCodePong:
802       DVLOG(1) << "Got Pong of size " << size;
803       // There is no need to do anything with pong messages.
804       return CHANNEL_ALIVE;
805
806     case WebSocketFrameHeader::kOpCodeClose: {
807       // TODO(ricea): If there is a message which is queued for transmission to
808       // the renderer, then the renderer should not receive an
809       // OnClosingHandshake or OnDropChannel IPC until the queued message has
810       // been completedly transmitted.
811       uint16 code = kWebSocketNormalClosure;
812       std::string reason;
813       std::string message;
814       if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
815         return FailChannel(message, code, reason);
816       }
817       // TODO(ricea): Find a way to safely log the message from the close
818       // message (escape control codes and so on).
819       DVLOG(1) << "Got Close with code " << code;
820       switch (state_) {
821         case CONNECTED:
822           SetState(RECV_CLOSED);
823           if (SendClose(code, reason) == CHANNEL_DELETED)
824             return CHANNEL_DELETED;
825           DCHECK_EQ(RECV_CLOSED, state_);
826           SetState(CLOSE_WAIT);
827
828           if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
829             return CHANNEL_DELETED;
830           received_close_code_ = code;
831           received_close_reason_ = reason;
832           break;
833
834         case SEND_CLOSED:
835           SetState(CLOSE_WAIT);
836           // From RFC6455 section 7.1.5: "Each endpoint
837           // will see the status code sent by the other end as _The WebSocket
838           // Connection Close Code_."
839           received_close_code_ = code;
840           received_close_reason_ = reason;
841           break;
842
843         default:
844           LOG(DFATAL) << "Got Close in unexpected state " << state_;
845           break;
846       }
847       return CHANNEL_ALIVE;
848     }
849
850     default:
851       return FailChannel(
852           base::StringPrintf("Unrecognized frame opcode: %d", opcode),
853           kWebSocketErrorProtocolError,
854           "Unknown opcode");
855   }
856 }
857
858 ChannelState WebSocketChannel::HandleDataFrame(
859     WebSocketFrameHeader::OpCode opcode,
860     bool final,
861     const scoped_refptr<IOBuffer>& data_buffer,
862     size_t size) {
863   if (state_ != CONNECTED) {
864     DVLOG(3) << "Ignored data packet received in state " << state_;
865     return CHANNEL_ALIVE;
866   }
867   DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
868          opcode == WebSocketFrameHeader::kOpCodeText ||
869          opcode == WebSocketFrameHeader::kOpCodeBinary);
870   const bool got_continuation =
871       (opcode == WebSocketFrameHeader::kOpCodeContinuation);
872   if (got_continuation != expecting_to_handle_continuation_) {
873     const std::string console_log = got_continuation
874         ? "Received unexpected continuation frame."
875         : "Received start of new message but previous message is unfinished.";
876     const std::string reason = got_continuation
877         ? "Unexpected continuation"
878         : "Previous data frame unfinished";
879     return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
880   }
881   expecting_to_handle_continuation_ = !final;
882   WebSocketFrameHeader::OpCode opcode_to_send = opcode;
883   if (!initial_frame_forwarded_ &&
884       opcode == WebSocketFrameHeader::kOpCodeContinuation) {
885     opcode_to_send = receiving_text_message_
886                          ? WebSocketFrameHeader::kOpCodeText
887                          : WebSocketFrameHeader::kOpCodeBinary;
888   }
889   if (opcode == WebSocketFrameHeader::kOpCodeText ||
890       (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
891        receiving_text_message_)) {
892     // This call is not redundant when size == 0 because it tells us what
893     // the current state is.
894     StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
895         size ? data_buffer->data() : NULL, size);
896     if (state == StreamingUtf8Validator::INVALID ||
897         (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
898       return FailChannel("Could not decode a text frame as UTF-8.",
899                          kWebSocketErrorProtocolError,
900                          "Invalid UTF-8 in text frame");
901     }
902     receiving_text_message_ = !final;
903     DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
904   }
905   if (size == 0U && !final)
906     return CHANNEL_ALIVE;
907
908   initial_frame_forwarded_ = !final;
909   if (size > base::checked_cast<size_t>(current_receive_quota_) ||
910       !pending_received_frames_.empty()) {
911     const bool no_quota = (current_receive_quota_ == 0);
912     DCHECK(no_quota || pending_received_frames_.empty());
913     DVLOG(3) << "Queueing frame to renderer due to quota. quota="
914              << current_receive_quota_ << " size=" << size;
915     WebSocketFrameHeader::OpCode opcode_to_queue =
916         no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
917     pending_received_frames_.push(PendingReceivedFrame(
918         final, opcode_to_queue, data_buffer, current_receive_quota_, size));
919     if (no_quota)
920       return CHANNEL_ALIVE;
921     size = current_receive_quota_;
922     final = false;
923   }
924
925   // TODO(ricea): Can this copy be eliminated?
926   const char* const data_begin = size ? data_buffer->data() : NULL;
927   const char* const data_end = data_begin + size;
928   const std::vector<char> data(data_begin, data_end);
929   current_receive_quota_ -= size;
930   DCHECK_GE(current_receive_quota_, 0);
931
932   // Sends the received frame to the renderer process.
933   return event_interface_->OnDataFrame(final, opcode_to_send, data);
934 }
935
936 ChannelState WebSocketChannel::SendFrameFromIOBuffer(
937     bool fin,
938     WebSocketFrameHeader::OpCode op_code,
939     const scoped_refptr<IOBuffer>& buffer,
940     size_t size) {
941   DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
942   DCHECK(stream_);
943
944   scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
945   WebSocketFrameHeader& header = frame->header;
946   header.final = fin;
947   header.masked = true;
948   header.payload_length = size;
949   frame->data = buffer;
950
951   if (data_being_sent_) {
952     // Either the link to the WebSocket server is saturated, or several messages
953     // are being sent in a batch.
954     // TODO(ricea): Keep some statistics to work out the situation and adjust
955     // quota appropriately.
956     if (!data_to_send_next_)
957       data_to_send_next_.reset(new SendBuffer);
958     data_to_send_next_->AddFrame(frame.Pass());
959     return CHANNEL_ALIVE;
960   }
961
962   data_being_sent_.reset(new SendBuffer);
963   data_being_sent_->AddFrame(frame.Pass());
964   return WriteFrames();
965 }
966
967 ChannelState WebSocketChannel::FailChannel(const std::string& message,
968                                            uint16 code,
969                                            const std::string& reason) {
970   DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
971   DCHECK_NE(CONNECTING, state_);
972   DCHECK_NE(CLOSED, state_);
973
974   // TODO(ricea): Logging.
975   if (state_ == CONNECTED) {
976     if (SendClose(code, reason) == CHANNEL_DELETED)
977       return CHANNEL_DELETED;
978   }
979
980   // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
981   // should close the connection itself without waiting for the closing
982   // handshake.
983   stream_->Close();
984   SetState(CLOSED);
985   return event_interface_->OnFailChannel(message);
986 }
987
988 ChannelState WebSocketChannel::SendClose(uint16 code,
989                                          const std::string& reason) {
990   DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
991   DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
992   scoped_refptr<IOBuffer> body;
993   size_t size = 0;
994   if (code == kWebSocketErrorNoStatusReceived) {
995     // Special case: translate kWebSocketErrorNoStatusReceived into a Close
996     // frame with no payload.
997     DCHECK(reason.empty());
998     body = new IOBuffer(0);
999   } else {
1000     const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
1001     body = new IOBuffer(payload_length);
1002     size = payload_length;
1003     base::WriteBigEndian(body->data(), code);
1004     COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
1005                    they_should_both_be_two);
1006     std::copy(
1007         reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
1008   }
1009   // This use of base::Unretained() is safe because we stop the timer in the
1010   // destructor.
1011   timer_.Start(
1012       FROM_HERE,
1013       timeout_,
1014       base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
1015   if (SendFrameFromIOBuffer(
1016           true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
1017       CHANNEL_DELETED)
1018     return CHANNEL_DELETED;
1019   return CHANNEL_ALIVE;
1020 }
1021
1022 bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
1023                                   size_t size,
1024                                   uint16* code,
1025                                   std::string* reason,
1026                                   std::string* message) {
1027   reason->clear();
1028   if (size < kWebSocketCloseCodeLength) {
1029     if (size == 0U) {
1030       *code = kWebSocketErrorNoStatusReceived;
1031       return true;
1032     }
1033
1034     DVLOG(1) << "Close frame with payload size " << size << " received "
1035              << "(the first byte is " << std::hex
1036              << static_cast<int>(buffer->data()[0]) << ")";
1037     *code = kWebSocketErrorProtocolError;
1038     *message =
1039         "Received a broken close frame containing an invalid size body.";
1040     return false;
1041   }
1042
1043   const char* data = buffer->data();
1044   uint16 unchecked_code = 0;
1045   base::ReadBigEndian(data, &unchecked_code);
1046   COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
1047                  they_should_both_be_two_bytes);
1048
1049   switch (unchecked_code) {
1050     case kWebSocketErrorNoStatusReceived:
1051     case kWebSocketErrorAbnormalClosure:
1052     case kWebSocketErrorTlsHandshake:
1053       *code = kWebSocketErrorProtocolError;
1054       *message =
1055           "Received a broken close frame containing a reserved status code.";
1056       return false;
1057
1058     default:
1059       *code = unchecked_code;
1060       break;
1061   }
1062
1063   std::string text(data + kWebSocketCloseCodeLength, data + size);
1064   if (StreamingUtf8Validator::Validate(text)) {
1065     reason->swap(text);
1066     return true;
1067   }
1068
1069   *code = kWebSocketErrorProtocolError;
1070   *reason = "Invalid UTF-8 in Close frame";
1071   *message = "Received a broken close frame containing invalid UTF-8.";
1072   return false;
1073 }
1074
1075 ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
1076                                              uint16 code,
1077                                              const std::string& reason) {
1078   if (CHANNEL_DELETED ==
1079       notification_sender_->SendImmediately(event_interface_.get()))
1080     return CHANNEL_DELETED;
1081   ChannelState result =
1082       event_interface_->OnDropChannel(was_clean, code, reason);
1083   DCHECK_EQ(CHANNEL_DELETED, result);
1084   return result;
1085 }
1086
1087 void WebSocketChannel::CloseTimeout() {
1088   stream_->Close();
1089   SetState(CLOSED);
1090   AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
1091   // |this| has been deleted.
1092 }
1093
1094 }  // namespace net