Upstream version 9.37.197.0
[platform/framework/web/crosswalk.git] / src / third_party / trace-viewer / third_party / pywebsocket / src / mod_pywebsocket / _stream_hybi.py
1 # Copyright 2012, Google Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 #     * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 #     * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 #     * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """This file provides classes and helper functions for parsing/building frames
32 of the WebSocket protocol (RFC 6455).
33
34 Specification:
35 http://tools.ietf.org/html/rfc6455
36 """
37
38
39 from collections import deque
40 import logging
41 import os
42 import struct
43 import time
44
45 from mod_pywebsocket import common
46 from mod_pywebsocket import util
47 from mod_pywebsocket._stream_base import BadOperationException
48 from mod_pywebsocket._stream_base import ConnectionTerminatedException
49 from mod_pywebsocket._stream_base import InvalidFrameException
50 from mod_pywebsocket._stream_base import InvalidUTF8Exception
51 from mod_pywebsocket._stream_base import StreamBase
52 from mod_pywebsocket._stream_base import UnsupportedFrameException
53
54
55 _NOOP_MASKER = util.NoopMasker()
56
57
58 class Frame(object):
59
60     def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
61                  opcode=None, payload=''):
62         self.fin = fin
63         self.rsv1 = rsv1
64         self.rsv2 = rsv2
65         self.rsv3 = rsv3
66         self.opcode = opcode
67         self.payload = payload
68
69
70 # Helper functions made public to be used for writing unittests for WebSocket
71 # clients.
72
73
74 def create_length_header(length, mask):
75     """Creates a length header.
76
77     Args:
78         length: Frame length. Must be less than 2^63.
79         mask: Mask bit. Must be boolean.
80
81     Raises:
82         ValueError: when bad data is given.
83     """
84
85     if mask:
86         mask_bit = 1 << 7
87     else:
88         mask_bit = 0
89
90     if length < 0:
91         raise ValueError('length must be non negative integer')
92     elif length <= 125:
93         return chr(mask_bit | length)
94     elif length < (1 << 16):
95         return chr(mask_bit | 126) + struct.pack('!H', length)
96     elif length < (1 << 63):
97         return chr(mask_bit | 127) + struct.pack('!Q', length)
98     else:
99         raise ValueError('Payload is too big for one frame')
100
101
102 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
103     """Creates a frame header.
104
105     Raises:
106         Exception: when bad data is given.
107     """
108
109     if opcode < 0 or 0xf < opcode:
110         raise ValueError('Opcode out of range')
111
112     if payload_length < 0 or (1 << 63) <= payload_length:
113         raise ValueError('payload_length out of range')
114
115     if (fin | rsv1 | rsv2 | rsv3) & ~1:
116         raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
117
118     header = ''
119
120     first_byte = ((fin << 7)
121                   | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
122                   | opcode)
123     header += chr(first_byte)
124     header += create_length_header(payload_length, mask)
125
126     return header
127
128
129 def _build_frame(header, body, mask):
130     if not mask:
131         return header + body
132
133     masking_nonce = os.urandom(4)
134     masker = util.RepeatedXorMasker(masking_nonce)
135
136     return header + masking_nonce + masker.mask(body)
137
138
139 def _filter_and_format_frame_object(frame, mask, frame_filters):
140     for frame_filter in frame_filters:
141         frame_filter.filter(frame)
142
143     header = create_header(
144         frame.opcode, len(frame.payload), frame.fin,
145         frame.rsv1, frame.rsv2, frame.rsv3, mask)
146     return _build_frame(header, frame.payload, mask)
147
148
149 def create_binary_frame(
150     message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
151     """Creates a simple binary frame with no extension, reserved bit."""
152
153     frame = Frame(fin=fin, opcode=opcode, payload=message)
154     return _filter_and_format_frame_object(frame, mask, frame_filters)
155
156
157 def create_text_frame(
158     message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
159     """Creates a simple text frame with no extension, reserved bit."""
160
161     encoded_message = message.encode('utf-8')
162     return create_binary_frame(encoded_message, opcode, fin, mask,
163                                frame_filters)
164
165
166 def parse_frame(receive_bytes, logger=None,
167                 ws_version=common.VERSION_HYBI_LATEST,
168                 unmask_receive=True):
169     """Parses a frame. Returns a tuple containing each header field and
170     payload.
171
172     Args:
173         receive_bytes: a function that reads frame data from a stream or
174             something similar. The function takes length of the bytes to be
175             read. The function must raise ConnectionTerminatedException if
176             there is not enough data to be read.
177         logger: a logging object.
178         ws_version: the version of WebSocket protocol.
179         unmask_receive: unmask received frames. When received unmasked
180             frame, raises InvalidFrameException.
181
182     Raises:
183         ConnectionTerminatedException: when receive_bytes raises it.
184         InvalidFrameException: when the frame contains invalid data.
185     """
186
187     if not logger:
188         logger = logging.getLogger()
189
190     logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
191
192     received = receive_bytes(2)
193
194     first_byte = ord(received[0])
195     fin = (first_byte >> 7) & 1
196     rsv1 = (first_byte >> 6) & 1
197     rsv2 = (first_byte >> 5) & 1
198     rsv3 = (first_byte >> 4) & 1
199     opcode = first_byte & 0xf
200
201     second_byte = ord(received[1])
202     mask = (second_byte >> 7) & 1
203     payload_length = second_byte & 0x7f
204
205     logger.log(common.LOGLEVEL_FINE,
206                'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
207                'Mask=%s, Payload_length=%s',
208                fin, rsv1, rsv2, rsv3, opcode, mask, payload_length)
209
210     if (mask == 1) != unmask_receive:
211         raise InvalidFrameException(
212             'Mask bit on the received frame did\'nt match masking '
213             'configuration for received frames')
214
215     # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
216     # into the 8-octet extended payload length field (or 0x0-0xFD in
217     # 2-octet field).
218     valid_length_encoding = True
219     length_encoding_bytes = 1
220     if payload_length == 127:
221         logger.log(common.LOGLEVEL_FINE,
222                    'Receive 8-octet extended payload length')
223
224         extended_payload_length = receive_bytes(8)
225         payload_length = struct.unpack(
226             '!Q', extended_payload_length)[0]
227         if payload_length > 0x7FFFFFFFFFFFFFFF:
228             raise InvalidFrameException(
229                 'Extended payload length >= 2^63')
230         if ws_version >= 13 and payload_length < 0x10000:
231             valid_length_encoding = False
232             length_encoding_bytes = 8
233
234         logger.log(common.LOGLEVEL_FINE,
235                    'Decoded_payload_length=%s', payload_length)
236     elif payload_length == 126:
237         logger.log(common.LOGLEVEL_FINE,
238                    'Receive 2-octet extended payload length')
239
240         extended_payload_length = receive_bytes(2)
241         payload_length = struct.unpack(
242             '!H', extended_payload_length)[0]
243         if ws_version >= 13 and payload_length < 126:
244             valid_length_encoding = False
245             length_encoding_bytes = 2
246
247         logger.log(common.LOGLEVEL_FINE,
248                    'Decoded_payload_length=%s', payload_length)
249
250     if not valid_length_encoding:
251         logger.warning(
252             'Payload length is not encoded using the minimal number of '
253             'bytes (%d is encoded using %d bytes)',
254             payload_length,
255             length_encoding_bytes)
256
257     if mask == 1:
258         logger.log(common.LOGLEVEL_FINE, 'Receive mask')
259
260         masking_nonce = receive_bytes(4)
261         masker = util.RepeatedXorMasker(masking_nonce)
262
263         logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
264     else:
265         masker = _NOOP_MASKER
266
267     logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
268     if logger.isEnabledFor(common.LOGLEVEL_FINE):
269         receive_start = time.time()
270
271     raw_payload_bytes = receive_bytes(payload_length)
272
273     if logger.isEnabledFor(common.LOGLEVEL_FINE):
274         logger.log(
275             common.LOGLEVEL_FINE,
276             'Done receiving payload data at %s MB/s',
277             payload_length / (time.time() - receive_start) / 1000 / 1000)
278     logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
279
280     if logger.isEnabledFor(common.LOGLEVEL_FINE):
281         unmask_start = time.time()
282
283     bytes = masker.mask(raw_payload_bytes)
284
285     if logger.isEnabledFor(common.LOGLEVEL_FINE):
286         logger.log(
287             common.LOGLEVEL_FINE,
288             'Done unmasking payload data at %s MB/s',
289             payload_length / (time.time() - unmask_start) / 1000 / 1000)
290
291     return opcode, bytes, fin, rsv1, rsv2, rsv3
292
293
294 class FragmentedFrameBuilder(object):
295     """A stateful class to send a message as fragments."""
296
297     def __init__(self, mask, frame_filters=[], encode_utf8=True):
298         """Constructs an instance."""
299
300         self._mask = mask
301         self._frame_filters = frame_filters
302         # This is for skipping UTF-8 encoding when building text type frames
303         # from compressed data.
304         self._encode_utf8 = encode_utf8
305
306         self._started = False
307
308         # Hold opcode of the first frame in messages to verify types of other
309         # frames in the message are all the same.
310         self._opcode = common.OPCODE_TEXT
311
312     def build(self, message, end, binary):
313         if binary:
314             frame_type = common.OPCODE_BINARY
315         else:
316             frame_type = common.OPCODE_TEXT
317         if self._started:
318             if self._opcode != frame_type:
319                 raise ValueError('Message types are different in frames for '
320                                  'the same message')
321             opcode = common.OPCODE_CONTINUATION
322         else:
323             opcode = frame_type
324             self._opcode = frame_type
325
326         if end:
327             self._started = False
328             fin = 1
329         else:
330             self._started = True
331             fin = 0
332
333         if binary or not self._encode_utf8:
334             return create_binary_frame(
335                 message, opcode, fin, self._mask, self._frame_filters)
336         else:
337             return create_text_frame(
338                 message, opcode, fin, self._mask, self._frame_filters)
339
340
341 def _create_control_frame(opcode, body, mask, frame_filters):
342     frame = Frame(opcode=opcode, payload=body)
343
344     for frame_filter in frame_filters:
345         frame_filter.filter(frame)
346
347     if len(frame.payload) > 125:
348         raise BadOperationException(
349             'Payload data size of control frames must be 125 bytes or less')
350
351     header = create_header(
352         frame.opcode, len(frame.payload), frame.fin,
353         frame.rsv1, frame.rsv2, frame.rsv3, mask)
354     return _build_frame(header, frame.payload, mask)
355
356
357 def create_ping_frame(body, mask=False, frame_filters=[]):
358     return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
359
360
361 def create_pong_frame(body, mask=False, frame_filters=[]):
362     return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
363
364
365 def create_close_frame(body, mask=False, frame_filters=[]):
366     return _create_control_frame(
367         common.OPCODE_CLOSE, body, mask, frame_filters)
368
369
370 def create_closing_handshake_body(code, reason):
371     body = ''
372     if code is not None:
373         if (code > common.STATUS_USER_PRIVATE_MAX or
374             code < common.STATUS_NORMAL_CLOSURE):
375             raise BadOperationException('Status code is out of range')
376         if (code == common.STATUS_NO_STATUS_RECEIVED or
377             code == common.STATUS_ABNORMAL_CLOSURE or
378             code == common.STATUS_TLS_HANDSHAKE):
379             raise BadOperationException('Status code is reserved pseudo '
380                 'code')
381         encoded_reason = reason.encode('utf-8')
382         body = struct.pack('!H', code) + encoded_reason
383     return body
384
385
386 class StreamOptions(object):
387     """Holds option values to configure Stream objects."""
388
389     def __init__(self):
390         """Constructs StreamOptions."""
391
392         # Enables deflate-stream extension.
393         self.deflate_stream = False
394
395         # Filters applied to frames.
396         self.outgoing_frame_filters = []
397         self.incoming_frame_filters = []
398
399         # Filters applied to messages. Control frames are not affected by them.
400         self.outgoing_message_filters = []
401         self.incoming_message_filters = []
402
403         self.encode_text_message_to_utf8 = True
404         self.mask_send = False
405         self.unmask_receive = True
406         # RFC6455 disallows fragmented control frames, but mux extension
407         # relaxes the restriction.
408         self.allow_fragmented_control_frame = False
409
410
411 class Stream(StreamBase):
412     """A class for parsing/building frames of the WebSocket protocol
413     (RFC 6455).
414     """
415
416     def __init__(self, request, options):
417         """Constructs an instance.
418
419         Args:
420             request: mod_python request.
421         """
422
423         StreamBase.__init__(self, request)
424
425         self._logger = util.get_class_logger(self)
426
427         self._options = options
428
429         if self._options.deflate_stream:
430             self._logger.debug('Setup filter for deflate-stream')
431             self._request = util.DeflateRequest(self._request)
432
433         self._request.client_terminated = False
434         self._request.server_terminated = False
435
436         # Holds body of received fragments.
437         self._received_fragments = []
438         # Holds the opcode of the first fragment.
439         self._original_opcode = None
440
441         self._writer = FragmentedFrameBuilder(
442             self._options.mask_send, self._options.outgoing_frame_filters,
443             self._options.encode_text_message_to_utf8)
444
445         self._ping_queue = deque()
446
447     def _receive_frame(self):
448         """Receives a frame and return data in the frame as a tuple containing
449         each header field and payload separately.
450
451         Raises:
452             ConnectionTerminatedException: when read returns empty
453                 string.
454             InvalidFrameException: when the frame contains invalid data.
455         """
456
457         def _receive_bytes(length):
458             return self.receive_bytes(length)
459
460         return parse_frame(receive_bytes=_receive_bytes,
461                            logger=self._logger,
462                            ws_version=self._request.ws_version,
463                            unmask_receive=self._options.unmask_receive)
464
465     def _receive_frame_as_frame_object(self):
466         opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
467
468         return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
469                      opcode=opcode, payload=bytes)
470
471     def send_message(self, message, end=True, binary=False):
472         """Send message.
473
474         Args:
475             message: text in unicode or binary in str to send.
476             binary: send message as binary frame.
477
478         Raises:
479             BadOperationException: when called on a server-terminated
480                 connection or called with inconsistent message type or
481                 binary parameter.
482         """
483
484         if self._request.server_terminated:
485             raise BadOperationException(
486                 'Requested send_message after sending out a closing handshake')
487
488         if binary and isinstance(message, unicode):
489             raise BadOperationException(
490                 'Message for binary frame must be instance of str')
491
492         for message_filter in self._options.outgoing_message_filters:
493             message = message_filter.filter(message, end, binary)
494
495         try:
496             self._write(self._writer.build(message, end, binary))
497         except ValueError, e:
498             raise BadOperationException(e)
499
500     def _get_message_from_frame(self, frame):
501         """Gets a message from frame. If the message is composed of fragmented
502         frames and the frame is not the last fragmented frame, this method
503         returns None. The whole message will be returned when the last
504         fragmented frame is passed to this method.
505
506         Raises:
507             InvalidFrameException: when the frame doesn't match defragmentation
508                 context, or the frame contains invalid data.
509         """
510
511         if frame.opcode == common.OPCODE_CONTINUATION:
512             if not self._received_fragments:
513                 if frame.fin:
514                     raise InvalidFrameException(
515                         'Received a termination frame but fragmentation '
516                         'not started')
517                 else:
518                     raise InvalidFrameException(
519                         'Received an intermediate frame but '
520                         'fragmentation not started')
521
522             if frame.fin:
523                 # End of fragmentation frame
524                 self._received_fragments.append(frame.payload)
525                 message = ''.join(self._received_fragments)
526                 self._received_fragments = []
527                 return message
528             else:
529                 # Intermediate frame
530                 self._received_fragments.append(frame.payload)
531                 return None
532         else:
533             if self._received_fragments:
534                 if frame.fin:
535                     raise InvalidFrameException(
536                         'Received an unfragmented frame without '
537                         'terminating existing fragmentation')
538                 else:
539                     raise InvalidFrameException(
540                         'New fragmentation started without terminating '
541                         'existing fragmentation')
542
543             if frame.fin:
544                 # Unfragmented frame
545
546                 self._original_opcode = frame.opcode
547                 return frame.payload
548             else:
549                 # Start of fragmentation frame
550
551                 if (not self._options.allow_fragmented_control_frame and
552                     common.is_control_opcode(frame.opcode)):
553                     raise InvalidFrameException(
554                         'Control frames must not be fragmented')
555
556                 self._original_opcode = frame.opcode
557                 self._received_fragments.append(frame.payload)
558                 return None
559
560     def _process_close_message(self, message):
561         """Processes close message.
562
563         Args:
564             message: close message.
565
566         Raises:
567             InvalidFrameException: when the message is invalid.
568         """
569
570         self._request.client_terminated = True
571
572         # Status code is optional. We can have status reason only if we
573         # have status code. Status reason can be empty string. So,
574         # allowed cases are
575         # - no application data: no code no reason
576         # - 2 octet of application data: has code but no reason
577         # - 3 or more octet of application data: both code and reason
578         if len(message) == 0:
579             self._logger.debug('Received close frame (empty body)')
580             self._request.ws_close_code = (
581                 common.STATUS_NO_STATUS_RECEIVED)
582         elif len(message) == 1:
583             raise InvalidFrameException(
584                 'If a close frame has status code, the length of '
585                 'status code must be 2 octet')
586         elif len(message) >= 2:
587             self._request.ws_close_code = struct.unpack(
588                 '!H', message[0:2])[0]
589             self._request.ws_close_reason = message[2:].decode(
590                 'utf-8', 'replace')
591             self._logger.debug(
592                 'Received close frame (code=%d, reason=%r)',
593                 self._request.ws_close_code,
594                 self._request.ws_close_reason)
595
596         # Drain junk data after the close frame if necessary.
597         self._drain_received_data()
598
599         if self._request.server_terminated:
600             self._logger.debug(
601                 'Received ack for server-initiated closing handshake')
602             return
603
604         self._logger.debug(
605             'Received client-initiated closing handshake')
606
607         code = common.STATUS_NORMAL_CLOSURE
608         reason = ''
609         if hasattr(self._request, '_dispatcher'):
610             dispatcher = self._request._dispatcher
611             code, reason = dispatcher.passive_closing_handshake(
612                 self._request)
613             if code is None and reason is not None and len(reason) > 0:
614                 self._logger.warning(
615                     'Handler specified reason despite code being None')
616                 reason = ''
617             if reason is None:
618                 reason = ''
619         self._send_closing_handshake(code, reason)
620         self._logger.debug(
621             'Sent ack for client-initiated closing handshake '
622             '(code=%r, reason=%r)', code, reason)
623
624     def _process_ping_message(self, message):
625         """Processes ping message.
626
627         Args:
628             message: ping message.
629         """
630
631         try:
632             handler = self._request.on_ping_handler
633             if handler:
634                 handler(self._request, message)
635                 return
636         except AttributeError, e:
637             pass
638         self._send_pong(message)
639
640     def _process_pong_message(self, message):
641         """Processes pong message.
642
643         Args:
644             message: pong message.
645         """
646
647         # TODO(tyoshino): Add ping timeout handling.
648
649         inflight_pings = deque()
650
651         while True:
652             try:
653                 expected_body = self._ping_queue.popleft()
654                 if expected_body == message:
655                     # inflight_pings contains pings ignored by the
656                     # other peer. Just forget them.
657                     self._logger.debug(
658                         'Ping %r is acked (%d pings were ignored)',
659                         expected_body, len(inflight_pings))
660                     break
661                 else:
662                     inflight_pings.append(expected_body)
663             except IndexError, e:
664                 # The received pong was unsolicited pong. Keep the
665                 # ping queue as is.
666                 self._ping_queue = inflight_pings
667                 self._logger.debug('Received a unsolicited pong')
668                 break
669
670         try:
671             handler = self._request.on_pong_handler
672             if handler:
673                 handler(self._request, message)
674         except AttributeError, e:
675             pass
676
677     def receive_message(self):
678         """Receive a WebSocket frame and return its payload as a text in
679         unicode or a binary in str.
680
681         Returns:
682             payload data of the frame
683             - as unicode instance if received text frame
684             - as str instance if received binary frame
685             or None iff received closing handshake.
686         Raises:
687             BadOperationException: when called on a client-terminated
688                 connection.
689             ConnectionTerminatedException: when read returns empty
690                 string.
691             InvalidFrameException: when the frame contains invalid
692                 data.
693             UnsupportedFrameException: when the received frame has
694                 flags, opcode we cannot handle. You can ignore this
695                 exception and continue receiving the next frame.
696         """
697
698         if self._request.client_terminated:
699             raise BadOperationException(
700                 'Requested receive_message after receiving a closing '
701                 'handshake')
702
703         while True:
704             # mp_conn.read will block if no bytes are available.
705             # Timeout is controlled by TimeOut directive of Apache.
706
707             frame = self._receive_frame_as_frame_object()
708
709             # Check the constraint on the payload size for control frames
710             # before extension processes the frame.
711             # See also http://tools.ietf.org/html/rfc6455#section-5.5
712             if (common.is_control_opcode(frame.opcode) and
713                 len(frame.payload) > 125):
714                 raise InvalidFrameException(
715                     'Payload data size of control frames must be 125 bytes or '
716                     'less')
717
718             for frame_filter in self._options.incoming_frame_filters:
719                 frame_filter.filter(frame)
720
721             if frame.rsv1 or frame.rsv2 or frame.rsv3:
722                 raise UnsupportedFrameException(
723                     'Unsupported flag is set (rsv = %d%d%d)' %
724                     (frame.rsv1, frame.rsv2, frame.rsv3))
725
726             message = self._get_message_from_frame(frame)
727             if message is None:
728                 continue
729
730             for message_filter in self._options.incoming_message_filters:
731                 message = message_filter.filter(message)
732
733             if self._original_opcode == common.OPCODE_TEXT:
734                 # The WebSocket protocol section 4.4 specifies that invalid
735                 # characters must be replaced with U+fffd REPLACEMENT
736                 # CHARACTER.
737                 try:
738                     return message.decode('utf-8')
739                 except UnicodeDecodeError, e:
740                     raise InvalidUTF8Exception(e)
741             elif self._original_opcode == common.OPCODE_BINARY:
742                 return message
743             elif self._original_opcode == common.OPCODE_CLOSE:
744                 self._process_close_message(message)
745                 return None
746             elif self._original_opcode == common.OPCODE_PING:
747                 self._process_ping_message(message)
748             elif self._original_opcode == common.OPCODE_PONG:
749                 self._process_pong_message(message)
750             else:
751                 raise UnsupportedFrameException(
752                     'Opcode %d is not supported' % self._original_opcode)
753
754     def _send_closing_handshake(self, code, reason):
755         body = create_closing_handshake_body(code, reason)
756         frame = create_close_frame(
757             body, mask=self._options.mask_send,
758             frame_filters=self._options.outgoing_frame_filters)
759
760         self._request.server_terminated = True
761
762         self._write(frame)
763
764     def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
765         """Closes a WebSocket connection.
766
767         Args:
768             code: Status code for close frame. If code is None, a close
769                 frame with empty body will be sent.
770             reason: string representing close reason.
771         Raises:
772             BadOperationException: when reason is specified with code None
773             or reason is not an instance of both str and unicode.
774         """
775
776         if self._request.server_terminated:
777             self._logger.debug(
778                 'Requested close_connection but server is already terminated')
779             return
780
781         if code is None:
782             if reason is not None and len(reason) > 0:
783                 raise BadOperationException(
784                     'close reason must not be specified if code is None')
785             reason = ''
786         else:
787             if not isinstance(reason, str) and not isinstance(reason, unicode):
788                 raise BadOperationException(
789                     'close reason must be an instance of str or unicode')
790
791         self._send_closing_handshake(code, reason)
792         self._logger.debug(
793             'Sent server-initiated closing handshake (code=%r, reason=%r)',
794             code, reason)
795
796         if (code == common.STATUS_GOING_AWAY or
797             code == common.STATUS_PROTOCOL_ERROR):
798             # It doesn't make sense to wait for a close frame if the reason is
799             # protocol error or that the server is going away. For some of
800             # other reasons, it might not make sense to wait for a close frame,
801             # but it's not clear, yet.
802             return
803
804         # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
805         # or until a server-defined timeout expires.
806         #
807         # For now, we expect receiving closing handshake right after sending
808         # out closing handshake.
809         message = self.receive_message()
810         if message is not None:
811             raise ConnectionTerminatedException(
812                 'Didn\'t receive valid ack for closing handshake')
813         # TODO: 3. close the WebSocket connection.
814         # note: mod_python Connection (mp_conn) doesn't have close method.
815
816     def send_ping(self, body=''):
817         frame = create_ping_frame(
818             body,
819             self._options.mask_send,
820             self._options.outgoing_frame_filters)
821         self._write(frame)
822
823         self._ping_queue.append(body)
824
825     def _send_pong(self, body):
826         frame = create_pong_frame(
827             body,
828             self._options.mask_send,
829             self._options.outgoing_frame_filters)
830         self._write(frame)
831
832     def get_last_received_opcode(self):
833         """Returns the opcode of the WebSocket message which the last received
834         frame belongs to. The return value is valid iff immediately after
835         receive_message call.
836         """
837
838         return self._original_opcode
839
840     def _drain_received_data(self):
841         """Drains unread data in the receive buffer to avoid sending out TCP
842         RST packet. This is because when deflate-stream is enabled, some
843         DEFLATE block for flushing data may follow a close frame. If any data
844         remains in the receive buffer of a socket when the socket is closed,
845         it sends out TCP RST packet to the other peer.
846
847         Since mod_python's mp_conn object doesn't support non-blocking read,
848         we perform this only when pywebsocket is running in standalone mode.
849         """
850
851         # If self._options.deflate_stream is true, self._request is
852         # DeflateRequest, so we can get wrapped request object by
853         # self._request._request.
854         #
855         # Only _StandaloneRequest has _drain_received_data method.
856         if (self._options.deflate_stream and
857             ('_drain_received_data' in dir(self._request._request))):
858             self._request._request._drain_received_data()
859
860
861 # vi:sts=4 sw=4 et