1 # Copyright 2012, Google Inc.
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
14 # * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 """This file provides classes and helper functions for parsing/building frames
32 of the WebSocket protocol (RFC 6455).
35 http://tools.ietf.org/html/rfc6455
39 from collections import deque
45 from mod_pywebsocket import common
46 from mod_pywebsocket import util
47 from mod_pywebsocket._stream_base import BadOperationException
48 from mod_pywebsocket._stream_base import ConnectionTerminatedException
49 from mod_pywebsocket._stream_base import InvalidFrameException
50 from mod_pywebsocket._stream_base import InvalidUTF8Exception
51 from mod_pywebsocket._stream_base import StreamBase
52 from mod_pywebsocket._stream_base import UnsupportedFrameException
55 _NOOP_MASKER = util.NoopMasker()
60 def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
61 opcode=None, payload=''):
67 self.payload = payload
70 # Helper functions made public to be used for writing unittests for WebSocket
74 def create_length_header(length, mask):
75 """Creates a length header.
78 length: Frame length. Must be less than 2^63.
79 mask: Mask bit. Must be boolean.
82 ValueError: when bad data is given.
91 raise ValueError('length must be non negative integer')
93 return chr(mask_bit | length)
94 elif length < (1 << 16):
95 return chr(mask_bit | 126) + struct.pack('!H', length)
96 elif length < (1 << 63):
97 return chr(mask_bit | 127) + struct.pack('!Q', length)
99 raise ValueError('Payload is too big for one frame')
102 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
103 """Creates a frame header.
106 Exception: when bad data is given.
109 if opcode < 0 or 0xf < opcode:
110 raise ValueError('Opcode out of range')
112 if payload_length < 0 or (1 << 63) <= payload_length:
113 raise ValueError('payload_length out of range')
115 if (fin | rsv1 | rsv2 | rsv3) & ~1:
116 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
120 first_byte = ((fin << 7)
121 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
123 header += chr(first_byte)
124 header += create_length_header(payload_length, mask)
129 def _build_frame(header, body, mask):
133 masking_nonce = os.urandom(4)
134 masker = util.RepeatedXorMasker(masking_nonce)
136 return header + masking_nonce + masker.mask(body)
139 def _filter_and_format_frame_object(frame, mask, frame_filters):
140 for frame_filter in frame_filters:
141 frame_filter.filter(frame)
143 header = create_header(
144 frame.opcode, len(frame.payload), frame.fin,
145 frame.rsv1, frame.rsv2, frame.rsv3, mask)
146 return _build_frame(header, frame.payload, mask)
149 def create_binary_frame(
150 message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
151 """Creates a simple binary frame with no extension, reserved bit."""
153 frame = Frame(fin=fin, opcode=opcode, payload=message)
154 return _filter_and_format_frame_object(frame, mask, frame_filters)
157 def create_text_frame(
158 message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
159 """Creates a simple text frame with no extension, reserved bit."""
161 encoded_message = message.encode('utf-8')
162 return create_binary_frame(encoded_message, opcode, fin, mask,
166 def parse_frame(receive_bytes, logger=None,
167 ws_version=common.VERSION_HYBI_LATEST,
168 unmask_receive=True):
169 """Parses a frame. Returns a tuple containing each header field and
173 receive_bytes: a function that reads frame data from a stream or
174 something similar. The function takes length of the bytes to be
175 read. The function must raise ConnectionTerminatedException if
176 there is not enough data to be read.
177 logger: a logging object.
178 ws_version: the version of WebSocket protocol.
179 unmask_receive: unmask received frames. When received unmasked
180 frame, raises InvalidFrameException.
183 ConnectionTerminatedException: when receive_bytes raises it.
184 InvalidFrameException: when the frame contains invalid data.
188 logger = logging.getLogger()
190 logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
192 received = receive_bytes(2)
194 first_byte = ord(received[0])
195 fin = (first_byte >> 7) & 1
196 rsv1 = (first_byte >> 6) & 1
197 rsv2 = (first_byte >> 5) & 1
198 rsv3 = (first_byte >> 4) & 1
199 opcode = first_byte & 0xf
201 second_byte = ord(received[1])
202 mask = (second_byte >> 7) & 1
203 payload_length = second_byte & 0x7f
205 logger.log(common.LOGLEVEL_FINE,
206 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
207 'Mask=%s, Payload_length=%s',
208 fin, rsv1, rsv2, rsv3, opcode, mask, payload_length)
210 if (mask == 1) != unmask_receive:
211 raise InvalidFrameException(
212 'Mask bit on the received frame did\'nt match masking '
213 'configuration for received frames')
215 # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
216 # into the 8-octet extended payload length field (or 0x0-0xFD in
218 valid_length_encoding = True
219 length_encoding_bytes = 1
220 if payload_length == 127:
221 logger.log(common.LOGLEVEL_FINE,
222 'Receive 8-octet extended payload length')
224 extended_payload_length = receive_bytes(8)
225 payload_length = struct.unpack(
226 '!Q', extended_payload_length)[0]
227 if payload_length > 0x7FFFFFFFFFFFFFFF:
228 raise InvalidFrameException(
229 'Extended payload length >= 2^63')
230 if ws_version >= 13 and payload_length < 0x10000:
231 valid_length_encoding = False
232 length_encoding_bytes = 8
234 logger.log(common.LOGLEVEL_FINE,
235 'Decoded_payload_length=%s', payload_length)
236 elif payload_length == 126:
237 logger.log(common.LOGLEVEL_FINE,
238 'Receive 2-octet extended payload length')
240 extended_payload_length = receive_bytes(2)
241 payload_length = struct.unpack(
242 '!H', extended_payload_length)[0]
243 if ws_version >= 13 and payload_length < 126:
244 valid_length_encoding = False
245 length_encoding_bytes = 2
247 logger.log(common.LOGLEVEL_FINE,
248 'Decoded_payload_length=%s', payload_length)
250 if not valid_length_encoding:
252 'Payload length is not encoded using the minimal number of '
253 'bytes (%d is encoded using %d bytes)',
255 length_encoding_bytes)
258 logger.log(common.LOGLEVEL_FINE, 'Receive mask')
260 masking_nonce = receive_bytes(4)
261 masker = util.RepeatedXorMasker(masking_nonce)
263 logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
265 masker = _NOOP_MASKER
267 logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
268 if logger.isEnabledFor(common.LOGLEVEL_FINE):
269 receive_start = time.time()
271 raw_payload_bytes = receive_bytes(payload_length)
273 if logger.isEnabledFor(common.LOGLEVEL_FINE):
275 common.LOGLEVEL_FINE,
276 'Done receiving payload data at %s MB/s',
277 payload_length / (time.time() - receive_start) / 1000 / 1000)
278 logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
280 if logger.isEnabledFor(common.LOGLEVEL_FINE):
281 unmask_start = time.time()
283 bytes = masker.mask(raw_payload_bytes)
285 if logger.isEnabledFor(common.LOGLEVEL_FINE):
287 common.LOGLEVEL_FINE,
288 'Done unmasking payload data at %s MB/s',
289 payload_length / (time.time() - unmask_start) / 1000 / 1000)
291 return opcode, bytes, fin, rsv1, rsv2, rsv3
294 class FragmentedFrameBuilder(object):
295 """A stateful class to send a message as fragments."""
297 def __init__(self, mask, frame_filters=[], encode_utf8=True):
298 """Constructs an instance."""
301 self._frame_filters = frame_filters
302 # This is for skipping UTF-8 encoding when building text type frames
303 # from compressed data.
304 self._encode_utf8 = encode_utf8
306 self._started = False
308 # Hold opcode of the first frame in messages to verify types of other
309 # frames in the message are all the same.
310 self._opcode = common.OPCODE_TEXT
312 def build(self, message, end, binary):
314 frame_type = common.OPCODE_BINARY
316 frame_type = common.OPCODE_TEXT
318 if self._opcode != frame_type:
319 raise ValueError('Message types are different in frames for '
321 opcode = common.OPCODE_CONTINUATION
324 self._opcode = frame_type
327 self._started = False
333 if binary or not self._encode_utf8:
334 return create_binary_frame(
335 message, opcode, fin, self._mask, self._frame_filters)
337 return create_text_frame(
338 message, opcode, fin, self._mask, self._frame_filters)
341 def _create_control_frame(opcode, body, mask, frame_filters):
342 frame = Frame(opcode=opcode, payload=body)
344 for frame_filter in frame_filters:
345 frame_filter.filter(frame)
347 if len(frame.payload) > 125:
348 raise BadOperationException(
349 'Payload data size of control frames must be 125 bytes or less')
351 header = create_header(
352 frame.opcode, len(frame.payload), frame.fin,
353 frame.rsv1, frame.rsv2, frame.rsv3, mask)
354 return _build_frame(header, frame.payload, mask)
357 def create_ping_frame(body, mask=False, frame_filters=[]):
358 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
361 def create_pong_frame(body, mask=False, frame_filters=[]):
362 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
365 def create_close_frame(body, mask=False, frame_filters=[]):
366 return _create_control_frame(
367 common.OPCODE_CLOSE, body, mask, frame_filters)
370 def create_closing_handshake_body(code, reason):
373 if (code > common.STATUS_USER_PRIVATE_MAX or
374 code < common.STATUS_NORMAL_CLOSURE):
375 raise BadOperationException('Status code is out of range')
376 if (code == common.STATUS_NO_STATUS_RECEIVED or
377 code == common.STATUS_ABNORMAL_CLOSURE or
378 code == common.STATUS_TLS_HANDSHAKE):
379 raise BadOperationException('Status code is reserved pseudo '
381 encoded_reason = reason.encode('utf-8')
382 body = struct.pack('!H', code) + encoded_reason
386 class StreamOptions(object):
387 """Holds option values to configure Stream objects."""
390 """Constructs StreamOptions."""
392 # Enables deflate-stream extension.
393 self.deflate_stream = False
395 # Filters applied to frames.
396 self.outgoing_frame_filters = []
397 self.incoming_frame_filters = []
399 # Filters applied to messages. Control frames are not affected by them.
400 self.outgoing_message_filters = []
401 self.incoming_message_filters = []
403 self.encode_text_message_to_utf8 = True
404 self.mask_send = False
405 self.unmask_receive = True
406 # RFC6455 disallows fragmented control frames, but mux extension
407 # relaxes the restriction.
408 self.allow_fragmented_control_frame = False
411 class Stream(StreamBase):
412 """A class for parsing/building frames of the WebSocket protocol
416 def __init__(self, request, options):
417 """Constructs an instance.
420 request: mod_python request.
423 StreamBase.__init__(self, request)
425 self._logger = util.get_class_logger(self)
427 self._options = options
429 if self._options.deflate_stream:
430 self._logger.debug('Setup filter for deflate-stream')
431 self._request = util.DeflateRequest(self._request)
433 self._request.client_terminated = False
434 self._request.server_terminated = False
436 # Holds body of received fragments.
437 self._received_fragments = []
438 # Holds the opcode of the first fragment.
439 self._original_opcode = None
441 self._writer = FragmentedFrameBuilder(
442 self._options.mask_send, self._options.outgoing_frame_filters,
443 self._options.encode_text_message_to_utf8)
445 self._ping_queue = deque()
447 def _receive_frame(self):
448 """Receives a frame and return data in the frame as a tuple containing
449 each header field and payload separately.
452 ConnectionTerminatedException: when read returns empty
454 InvalidFrameException: when the frame contains invalid data.
457 def _receive_bytes(length):
458 return self.receive_bytes(length)
460 return parse_frame(receive_bytes=_receive_bytes,
462 ws_version=self._request.ws_version,
463 unmask_receive=self._options.unmask_receive)
465 def _receive_frame_as_frame_object(self):
466 opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
468 return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
469 opcode=opcode, payload=bytes)
471 def send_message(self, message, end=True, binary=False):
475 message: text in unicode or binary in str to send.
476 binary: send message as binary frame.
479 BadOperationException: when called on a server-terminated
480 connection or called with inconsistent message type or
484 if self._request.server_terminated:
485 raise BadOperationException(
486 'Requested send_message after sending out a closing handshake')
488 if binary and isinstance(message, unicode):
489 raise BadOperationException(
490 'Message for binary frame must be instance of str')
492 for message_filter in self._options.outgoing_message_filters:
493 message = message_filter.filter(message, end, binary)
496 self._write(self._writer.build(message, end, binary))
497 except ValueError, e:
498 raise BadOperationException(e)
500 def _get_message_from_frame(self, frame):
501 """Gets a message from frame. If the message is composed of fragmented
502 frames and the frame is not the last fragmented frame, this method
503 returns None. The whole message will be returned when the last
504 fragmented frame is passed to this method.
507 InvalidFrameException: when the frame doesn't match defragmentation
508 context, or the frame contains invalid data.
511 if frame.opcode == common.OPCODE_CONTINUATION:
512 if not self._received_fragments:
514 raise InvalidFrameException(
515 'Received a termination frame but fragmentation '
518 raise InvalidFrameException(
519 'Received an intermediate frame but '
520 'fragmentation not started')
523 # End of fragmentation frame
524 self._received_fragments.append(frame.payload)
525 message = ''.join(self._received_fragments)
526 self._received_fragments = []
530 self._received_fragments.append(frame.payload)
533 if self._received_fragments:
535 raise InvalidFrameException(
536 'Received an unfragmented frame without '
537 'terminating existing fragmentation')
539 raise InvalidFrameException(
540 'New fragmentation started without terminating '
541 'existing fragmentation')
546 self._original_opcode = frame.opcode
549 # Start of fragmentation frame
551 if (not self._options.allow_fragmented_control_frame and
552 common.is_control_opcode(frame.opcode)):
553 raise InvalidFrameException(
554 'Control frames must not be fragmented')
556 self._original_opcode = frame.opcode
557 self._received_fragments.append(frame.payload)
560 def _process_close_message(self, message):
561 """Processes close message.
564 message: close message.
567 InvalidFrameException: when the message is invalid.
570 self._request.client_terminated = True
572 # Status code is optional. We can have status reason only if we
573 # have status code. Status reason can be empty string. So,
575 # - no application data: no code no reason
576 # - 2 octet of application data: has code but no reason
577 # - 3 or more octet of application data: both code and reason
578 if len(message) == 0:
579 self._logger.debug('Received close frame (empty body)')
580 self._request.ws_close_code = (
581 common.STATUS_NO_STATUS_RECEIVED)
582 elif len(message) == 1:
583 raise InvalidFrameException(
584 'If a close frame has status code, the length of '
585 'status code must be 2 octet')
586 elif len(message) >= 2:
587 self._request.ws_close_code = struct.unpack(
588 '!H', message[0:2])[0]
589 self._request.ws_close_reason = message[2:].decode(
592 'Received close frame (code=%d, reason=%r)',
593 self._request.ws_close_code,
594 self._request.ws_close_reason)
596 # Drain junk data after the close frame if necessary.
597 self._drain_received_data()
599 if self._request.server_terminated:
601 'Received ack for server-initiated closing handshake')
605 'Received client-initiated closing handshake')
607 code = common.STATUS_NORMAL_CLOSURE
609 if hasattr(self._request, '_dispatcher'):
610 dispatcher = self._request._dispatcher
611 code, reason = dispatcher.passive_closing_handshake(
613 if code is None and reason is not None and len(reason) > 0:
614 self._logger.warning(
615 'Handler specified reason despite code being None')
619 self._send_closing_handshake(code, reason)
621 'Sent ack for client-initiated closing handshake '
622 '(code=%r, reason=%r)', code, reason)
624 def _process_ping_message(self, message):
625 """Processes ping message.
628 message: ping message.
632 handler = self._request.on_ping_handler
634 handler(self._request, message)
636 except AttributeError, e:
638 self._send_pong(message)
640 def _process_pong_message(self, message):
641 """Processes pong message.
644 message: pong message.
647 # TODO(tyoshino): Add ping timeout handling.
649 inflight_pings = deque()
653 expected_body = self._ping_queue.popleft()
654 if expected_body == message:
655 # inflight_pings contains pings ignored by the
656 # other peer. Just forget them.
658 'Ping %r is acked (%d pings were ignored)',
659 expected_body, len(inflight_pings))
662 inflight_pings.append(expected_body)
663 except IndexError, e:
664 # The received pong was unsolicited pong. Keep the
666 self._ping_queue = inflight_pings
667 self._logger.debug('Received a unsolicited pong')
671 handler = self._request.on_pong_handler
673 handler(self._request, message)
674 except AttributeError, e:
677 def receive_message(self):
678 """Receive a WebSocket frame and return its payload as a text in
679 unicode or a binary in str.
682 payload data of the frame
683 - as unicode instance if received text frame
684 - as str instance if received binary frame
685 or None iff received closing handshake.
687 BadOperationException: when called on a client-terminated
689 ConnectionTerminatedException: when read returns empty
691 InvalidFrameException: when the frame contains invalid
693 UnsupportedFrameException: when the received frame has
694 flags, opcode we cannot handle. You can ignore this
695 exception and continue receiving the next frame.
698 if self._request.client_terminated:
699 raise BadOperationException(
700 'Requested receive_message after receiving a closing '
704 # mp_conn.read will block if no bytes are available.
705 # Timeout is controlled by TimeOut directive of Apache.
707 frame = self._receive_frame_as_frame_object()
709 # Check the constraint on the payload size for control frames
710 # before extension processes the frame.
711 # See also http://tools.ietf.org/html/rfc6455#section-5.5
712 if (common.is_control_opcode(frame.opcode) and
713 len(frame.payload) > 125):
714 raise InvalidFrameException(
715 'Payload data size of control frames must be 125 bytes or '
718 for frame_filter in self._options.incoming_frame_filters:
719 frame_filter.filter(frame)
721 if frame.rsv1 or frame.rsv2 or frame.rsv3:
722 raise UnsupportedFrameException(
723 'Unsupported flag is set (rsv = %d%d%d)' %
724 (frame.rsv1, frame.rsv2, frame.rsv3))
726 message = self._get_message_from_frame(frame)
730 for message_filter in self._options.incoming_message_filters:
731 message = message_filter.filter(message)
733 if self._original_opcode == common.OPCODE_TEXT:
734 # The WebSocket protocol section 4.4 specifies that invalid
735 # characters must be replaced with U+fffd REPLACEMENT
738 return message.decode('utf-8')
739 except UnicodeDecodeError, e:
740 raise InvalidUTF8Exception(e)
741 elif self._original_opcode == common.OPCODE_BINARY:
743 elif self._original_opcode == common.OPCODE_CLOSE:
744 self._process_close_message(message)
746 elif self._original_opcode == common.OPCODE_PING:
747 self._process_ping_message(message)
748 elif self._original_opcode == common.OPCODE_PONG:
749 self._process_pong_message(message)
751 raise UnsupportedFrameException(
752 'Opcode %d is not supported' % self._original_opcode)
754 def _send_closing_handshake(self, code, reason):
755 body = create_closing_handshake_body(code, reason)
756 frame = create_close_frame(
757 body, mask=self._options.mask_send,
758 frame_filters=self._options.outgoing_frame_filters)
760 self._request.server_terminated = True
764 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
765 """Closes a WebSocket connection.
768 code: Status code for close frame. If code is None, a close
769 frame with empty body will be sent.
770 reason: string representing close reason.
772 BadOperationException: when reason is specified with code None
773 or reason is not an instance of both str and unicode.
776 if self._request.server_terminated:
778 'Requested close_connection but server is already terminated')
782 if reason is not None and len(reason) > 0:
783 raise BadOperationException(
784 'close reason must not be specified if code is None')
787 if not isinstance(reason, str) and not isinstance(reason, unicode):
788 raise BadOperationException(
789 'close reason must be an instance of str or unicode')
791 self._send_closing_handshake(code, reason)
793 'Sent server-initiated closing handshake (code=%r, reason=%r)',
796 if (code == common.STATUS_GOING_AWAY or
797 code == common.STATUS_PROTOCOL_ERROR):
798 # It doesn't make sense to wait for a close frame if the reason is
799 # protocol error or that the server is going away. For some of
800 # other reasons, it might not make sense to wait for a close frame,
801 # but it's not clear, yet.
804 # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
805 # or until a server-defined timeout expires.
807 # For now, we expect receiving closing handshake right after sending
808 # out closing handshake.
809 message = self.receive_message()
810 if message is not None:
811 raise ConnectionTerminatedException(
812 'Didn\'t receive valid ack for closing handshake')
813 # TODO: 3. close the WebSocket connection.
814 # note: mod_python Connection (mp_conn) doesn't have close method.
816 def send_ping(self, body=''):
817 frame = create_ping_frame(
819 self._options.mask_send,
820 self._options.outgoing_frame_filters)
823 self._ping_queue.append(body)
825 def _send_pong(self, body):
826 frame = create_pong_frame(
828 self._options.mask_send,
829 self._options.outgoing_frame_filters)
832 def get_last_received_opcode(self):
833 """Returns the opcode of the WebSocket message which the last received
834 frame belongs to. The return value is valid iff immediately after
835 receive_message call.
838 return self._original_opcode
840 def _drain_received_data(self):
841 """Drains unread data in the receive buffer to avoid sending out TCP
842 RST packet. This is because when deflate-stream is enabled, some
843 DEFLATE block for flushing data may follow a close frame. If any data
844 remains in the receive buffer of a socket when the socket is closed,
845 it sends out TCP RST packet to the other peer.
847 Since mod_python's mp_conn object doesn't support non-blocking read,
848 we perform this only when pywebsocket is running in standalone mode.
851 # If self._options.deflate_stream is true, self._request is
852 # DeflateRequest, so we can get wrapped request object by
853 # self._request._request.
855 # Only _StandaloneRequest has _drain_received_data method.
856 if (self._options.deflate_stream and
857 ('_drain_received_data' in dir(self._request._request))):
858 self._request._request._drain_received_data()