Upstream version 9.37.197.0
[platform/framework/web/crosswalk.git] / src / third_party / trace-viewer / third_party / pywebsocket / src / test / mux_client_for_testing.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2012, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 #     * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 #     * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 #     * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33 """WebSocket client utility for testing mux extension.
34
35 This code should be independent from mod_pywebsocket. See the comment of
36 client_for_testing.py.
37
38 NOTE: This code is far from robust like client_for_testing.py.
39 """
40
41
42
43 import Queue
44 import base64
45 import collections
46 import email
47 import email.parser
48 import logging
49 import math
50 import os
51 import random
52 import socket
53 import struct
54 import threading
55
56 from mod_pywebsocket import util
57
58 from test import client_for_testing
59
60
61 _CONTROL_CHANNEL_ID = 0
62 _DEFAULT_CHANNEL_ID = 1
63
64 _MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
65 _MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
66 _MUX_OPCODE_FLOW_CONTROL = 2
67 _MUX_OPCODE_DROP_CHANNEL = 3
68 _MUX_OPCODE_NEW_CHANNEL_SLOT = 4
69
70
71 class _ControlBlock:
72     def __init__(self, opcode):
73         self.opcode = opcode
74
75
76 def _parse_handshake_response(response):
77     status_line, header_lines = response.split('\r\n', 1)
78
79     words = status_line.split(' ')
80     if len(words) < 3:
81         raise ValueError('Bad Status-Line syntax %r' % status_line)
82     [version, response_code] = words[:2]
83     if version != 'HTTP/1.1':
84         raise ValueError('Bad response version %r' % version)
85
86     if response_code != '101':
87         raise ValueError('Bad response code %r ' % response_code)
88     headers = email.parser.Parser().parsestr(header_lines)
89     return headers
90
91
92 def _parse_channel_id(data, offset=0):
93     length = len(data)
94     remaining = length - offset
95
96     if remaining <= 0:
97         raise Exception('No channel id found')
98
99     channel_id = ord(data[offset])
100     channel_id_length = 1
101     if channel_id & 0xe0 == 0xe0:
102         if remaining < 4:
103             raise Exception('Invalid channel id format')
104         channel_id = struct.unpack('!L',
105                                    data[offset:offset+4])[0] & 0x1fffffff
106         channel_id_length = 4
107     elif channel_id & 0xc0 == 0xc0:
108         if remaining < 3:
109             raise Exception('Invalid channel id format')
110         channel_id = (((channel_id & 0x1f) << 16) +
111                       struct.unpack('!H', data[offset+1:offset+3])[0])
112         channel_id_length = 3
113     elif channel_id & 0x80 == 0x80:
114         if remaining < 2:
115             raise Exception('Invalid channel id format')
116         channel_id = struct.unpack('!H', data[offset:offset+2])[0] & 0x3fff
117         channel_id_length = 2
118
119     return channel_id, channel_id_length
120
121
122 def _read_number(data, size_of_size, offset=0):
123     if size_of_size == 1:
124         return ord(data[offset])
125     elif size_of_size == 2:
126         return struct.unpack('!H', data[offset:offset+2])[0]
127     elif size_of_size == 3:
128         return ((ord(data[offset]) << 16)
129                 + struct.unpack('!H', data[offset+1:offset+3])[0])
130     elif size_of_size == 4:
131         return struct.unpack('!L', data[offset:offset+4])[0]
132     else:
133         raise Exception('Invalid "size of size" in control block')
134
135
136 def _parse_control_block_specific_data(data, size_of_size, offset=0):
137     remaining = len(data) - offset
138     if remaining < size_of_size:
139         raise Exception('Invalid control block received')
140
141     size = _read_number(data, size_of_size, offset)
142
143     start_position = offset + size_of_size
144     end_position = start_position + size
145     if len(data) < end_position:
146         raise Exception('Invalid size of control block (%d < %d)' % (
147                 len(data), end_position))
148     return data[start_position:end_position], size_of_size + size
149
150
151 def _parse_control_blocks(data):
152     blocks = []
153     length = len(data)
154     pos = 0
155
156     while pos < length:
157         first_byte = ord(data[pos])
158         pos += 1
159         opcode = (first_byte >> 5) & 0x7
160         block = _ControlBlock(opcode)
161
162         # TODO(bashi): Support more opcode
163         if opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
164             block.encode = (first_byte >> 2) & 3
165             block.rejected = (first_byte >> 4) & 1
166
167             channel_id, advance = _parse_channel_id(data, pos)
168             block.channel_id = channel_id
169             pos += advance
170
171             size_of_size = (first_byte & 3) + 1
172             encoded_handshake, advance = _parse_control_block_specific_data(
173                 data, size_of_size, pos)
174             block.encoded_handshake = encoded_handshake
175             pos += advance
176             blocks.append(block)
177         elif opcode == _MUX_OPCODE_DROP_CHANNEL:
178             block.mux_error = (first_byte >> 4) & 1
179
180             channel_id, channel_id_length = _parse_channel_id(data, pos)
181             block.channel_id = channel_id
182             pos += channel_id_length
183
184             size_of_size = first_byte & 3
185             reason, size = _parse_control_block_specific_data(
186                 data, size_of_size, pos)
187             block.reason = reason
188             pos += size
189             blocks.append(block)
190         elif opcode == _MUX_OPCODE_FLOW_CONTROL:
191             channel_id, advance = _parse_channel_id(data, pos)
192             block.channel_id = channel_id
193             pos += advance
194             size_of_quota = (first_byte & 3) + 1
195             block.send_quota = _read_number(data, size_of_quota, pos)
196             pos += size_of_quota
197             blocks.append(block)
198         elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
199             size_of_slots = ((first_byte >> 2) & 3) + 1
200             size_of_quota = (first_byte & 3) + 1
201             block.slots = _read_number(data, size_of_slots, pos)
202             pos += size_of_slots
203             block.send_quota = _read_number(data, size_of_quota, pos)
204             pos += size_of_quota
205             blocks.append(block)
206         else:
207             raise Exception(
208                 'Unsupported mux opcode %d received' % opcode)
209
210     return blocks
211
212
213 def _encode_channel_id(channel_id):
214     if channel_id < 0:
215         raise ValueError('Channel id %d must not be negative' % channel_id)
216
217     if channel_id < 2 ** 7:
218         return chr(channel_id)
219     if channel_id < 2 ** 14:
220         return struct.pack('!H', 0x8000 + channel_id)
221     if channel_id < 2 ** 21:
222         first = chr(0xc0 + (channel_id >> 16))
223         return first + struct.pack('!H', channel_id & 0xffff)
224     if channel_id < 2 ** 29:
225         return struct.pack('!L', 0xe0000000 + channel_id)
226
227     raise ValueError('Channel id %d is too large' % channel_id)
228
229
230 def _size_of_number_in_bytes_minus_1(number):
231     # Calculate the minimum number of bytes minus 1 that are required to store
232     # the data.
233     if number < 0:
234         raise ValueError('Invalid number: %d' % number)
235     elif number < 2 ** 8:
236         return 0
237     elif number < 2 ** 16:
238         return 1
239     elif number < 2 ** 24:
240         return 2
241     elif number < 2 ** 32:
242         return 3
243     else:
244         raise ValueError('Invalid number %d' % number)
245
246
247 def _encode_number(number):
248     if number < 2 ** 8:
249         return chr(number)
250     elif number < 2 ** 16:
251         return struct.pack('!H', number)
252     elif number < 2 ** 24:
253         return chr(number >> 16) + struct.pack('!H', number & 0xffff)
254     else:
255         return struct.pack('!L', number)
256
257
258 def _create_add_channel_request(channel_id, encoded_handshake,
259                                 encoding=0):
260     length = len(encoded_handshake)
261     size_of_length = _size_of_number_in_bytes_minus_1(length)
262
263     first_byte = ((_MUX_OPCODE_ADD_CHANNEL_REQUEST << 5) | (encoding << 2) |
264                   size_of_length)
265     encoded_length = _encode_number(length)
266
267     return (chr(first_byte) + _encode_channel_id(channel_id) +
268             encoded_length + encoded_handshake)
269
270
271 def _create_flow_control(channel_id, replenished_quota):
272     size_of_quota = _size_of_number_in_bytes_minus_1(replenished_quota)
273     first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | size_of_quota)
274     return (chr(first_byte) + _encode_channel_id(channel_id) +
275             _encode_number(replenished_quota))
276
277
278 class _MuxReaderThread(threading.Thread):
279     """Mux reader thread.
280
281     Reads frames and passes them to the mux client. This thread accesses
282     private functions/variables of the mux client.
283     """
284
285     def __init__(self, mux):
286         threading.Thread.__init__(self)
287         self.setDaemon(True)
288         self._mux = mux
289         self._stop_requested = False
290
291     def _receive_message(self):
292         first_opcode = None
293         pending_payload = []
294         while not self._stop_requested:
295             fin, rsv1, rsv2, rsv3, opcode, payload_length = (
296                 client_for_testing.read_frame_header(self._mux._socket))
297
298             if not first_opcode:
299                 if opcode == client_for_testing.OPCODE_TEXT:
300                     raise Exception('Received a text message on physical '
301                                     'connection')
302                 if opcode == client_for_testing.OPCODE_CONTINUATION:
303                     raise Exception('Received an intermediate frame but '
304                                     'fragmentation was not started')
305                 if (opcode == client_for_testing.OPCODE_BINARY or
306                     opcode == client_for_testing.OPCODE_PONG or
307                     opcode == client_for_testing.OPCODE_PONG or
308                     opcode == client_for_testing.OPCODE_CLOSE):
309                     first_opcode = opcode
310                 else:
311                     raise Exception('Received an undefined opcode frame: %d' %
312                                     opcode)
313
314             elif opcode != client_for_testing.OPCODE_CONTINUATION:
315                 raise Exception('Received a new opcode before '
316                                 'terminating fragmentation')
317
318             payload = client_for_testing.receive_bytes(
319                 self._mux._socket, payload_length)
320
321             if self._mux._incoming_frame_filter is not None:
322                 payload = self._mux._incoming_frame_filter.filter(payload)
323
324             pending_payload.append(payload)
325
326             if fin:
327                 break
328
329         if self._stop_requested:
330             return None, None
331
332         message = ''.join(pending_payload)
333         return first_opcode, message
334
335     def request_stop(self):
336         self._stop_requested = True
337
338     def run(self):
339         try:
340             while not self._stop_requested:
341                 # opcode is OPCODE_BINARY or control opcodes when a message
342                 # is succesfully received.
343                 opcode, message = self._receive_message()
344                 if not opcode:
345                     return
346                 if opcode == client_for_testing.OPCODE_BINARY:
347                     channel_id, advance = _parse_channel_id(message)
348                     self._mux._dispatch_frame(channel_id, message[advance:])
349                 else:
350                     self._mux._process_control_message(opcode, message)
351         finally:
352             self._mux._notify_reader_done()
353
354
355 class _InnerFrame(object):
356     def __init__(self, fin, rsv1, rsv2, rsv3, opcode, payload):
357         self.fin = fin
358         self.rsv1 = rsv1
359         self.rsv2 = rsv2
360         self.rsv3 = rsv3
361         self.opcode = opcode
362         self.payload = payload
363
364
365 class _LogicalChannelData(object):
366     def __init__(self):
367         self.queue = Queue.Queue()
368         self.send_quota = 0
369         self.receive_quota = 0
370
371
372 class MuxClient(object):
373     """WebSocket mux client.
374
375     Note that this class is NOT thread-safe. Do not access an instance of this
376     class from multiple threads at a same time.
377     """
378
379     def __init__(self, options):
380         self._logger = util.get_class_logger(self)
381
382         self._options = options
383         self._options.enable_mux()
384         self._stream = None
385         self._socket = None
386         self._handshake = client_for_testing.WebSocketHandshake(self._options)
387         self._incoming_frame_filter = None
388         self._outgoing_frame_filter = None
389
390         self._is_active = False
391         self._read_thread = None
392         self._control_blocks_condition = threading.Condition()
393         self._control_blocks = []
394         self._channel_slots = collections.deque()
395         self._logical_channels_condition = threading.Condition();
396         self._logical_channels = {}
397         self._timeout = 2
398         self._physical_connection_close_event = None
399         self._physical_connection_close_message = None
400
401     def _parse_inner_frame(self, data):
402         if len(data) == 0:
403             raise Exception('Invalid encapsulated frame received')
404
405         first_byte = ord(data[0])
406         fin = (first_byte << 7) & 1
407         rsv1 = (first_byte << 6) & 1
408         rsv2 = (first_byte << 5) & 1
409         rsv3 = (first_byte << 4) & 1
410         opcode = first_byte & 0xf
411
412         if self._outgoing_frame_filter:
413             payload = self._outgoing_frame_filter.filter(
414                 data[1:])
415         else:
416             payload = data[1:]
417
418         return _InnerFrame(fin, rsv1, rsv2, rsv3, opcode, payload)
419
420     def _process_mux_control_blocks(self):
421         for block in self._control_blocks:
422             if block.opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
423                 # AddChannelResponse will be handled in add_channel().
424                 continue
425             elif block.opcode == _MUX_OPCODE_FLOW_CONTROL:
426                 try:
427                     self._logical_channels_condition.acquire()
428                     if not block.channel_id in self._logical_channels:
429                         raise Exception('Invalid flow control received for '
430                                         'channel id %d' % block.channel_id)
431                     self._logical_channels[block.channel_id].send_quota += (
432                         block.send_quota)
433                     self._logical_channels_condition.notify()
434                 finally:
435                     self._logical_channels_condition.release()
436             elif block.opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
437                 self._channel_slots.extend([block.send_quota] * block.slots)
438
439     def _dispatch_frame(self, channel_id, payload):
440         if channel_id == _CONTROL_CHANNEL_ID:
441             try:
442                 self._control_blocks_condition.acquire()
443                 self._control_blocks += _parse_control_blocks(payload)
444                 self._process_mux_control_blocks()
445                 self._control_blocks_condition.notify()
446             finally:
447                 self._control_blocks_condition.release()
448         else:
449             try:
450                 self._logical_channels_condition.acquire()
451                 if not channel_id in self._logical_channels:
452                     raise Exception('Received logical frame on channel id '
453                                     '%d, which is not established' %
454                                     channel_id)
455
456                 inner_frame = self._parse_inner_frame(payload)
457                 self._logical_channels[channel_id].receive_quota -= (
458                         len(inner_frame.payload))
459                 if self._logical_channels[channel_id].receive_quota < 0:
460                     raise Exception('The server violates quota on '
461                                     'channel id %d' % channel_id)
462             finally:
463                 self._logical_channels_condition.release()
464             self._logical_channels[channel_id].queue.put(inner_frame)
465
466     def _process_control_message(self, opcode, message):
467         # Ping/Pong are not supported.
468         if opcode == client_for_testing.OPCODE_CLOSE:
469             self._physical_connection_close_message = message
470             if self._is_active:
471                 self._stream.send_close(
472                     code=client_for_testing.STATUS_NORMAL_CLOSURE, reason='')
473                 self._read_thread.request_stop()
474
475             if self._physical_connection_close_event:
476                 self._physical_connection_close_event.set()
477
478     def _notify_reader_done(self):
479         self._logger.debug('Read thread terminated.')
480         self.close_socket()
481
482     def _assert_channel_slot_available(self):
483         try:
484             self._control_blocks_condition.acquire()
485             if len(self._channel_slots) == 0:
486                 # Wait once
487                 self._control_blocks_condition.wait(timeout=self._timeout)
488         finally:
489             self._control_blocks_condition.release()
490
491         if len(self._channel_slots) == 0:
492             raise Exception('Failed to receive NewChannelSlot')
493
494     def _assert_send_quota_available(self, channel_id):
495         try:
496             self._logical_channels_condition.acquire()
497             if self._logical_channels[channel_id].send_quota == 0:
498                 # Wait once
499                 self._logical_channels_condition.wait(timeout=self._timeout)
500         finally:
501             self._logical_channels_condition.release()
502
503         if self._logical_channels[channel_id].send_quota == 0:
504             raise Exception('Failed to receive FlowControl for channel id %d' %
505                             channel_id)
506
507     def connect(self):
508         self._socket = socket.socket()
509         self._socket.settimeout(self._options.socket_timeout)
510
511         self._socket.connect((self._options.server_host,
512                               self._options.server_port))
513         if self._options.use_tls:
514             self._socket = _TLSSocket(self._socket)
515
516         self._handshake.handshake(self._socket)
517         self._stream = client_for_testing.WebSocketStream(
518             self._socket, self._handshake)
519
520         self._logical_channels[_DEFAULT_CHANNEL_ID] = _LogicalChannelData()
521
522         self._read_thread = _MuxReaderThread(self)
523         self._read_thread.start()
524
525         self._assert_channel_slot_available()
526         self._assert_send_quota_available(_DEFAULT_CHANNEL_ID)
527
528         self._is_active = True
529         self._logger.info('Connection established')
530
531     def add_channel(self, channel_id, options):
532         if not self._is_active:
533             raise Exception('Mux client is not active')
534
535         if channel_id in self._logical_channels:
536             raise Exception('Channel id %d already exists' % channel_id)
537
538         try:
539             send_quota = self._channel_slots.popleft()
540         except IndexError, e:
541             raise Exception('No channel slots: %r' % e)
542
543         # Create AddChannel request
544         request_line = 'GET %s HTTP/1.1\r\n' % options.resource
545         fields = []
546         fields.append('Upgrade: websocket\r\n')
547         fields.append('Connection: Upgrade\r\n')
548         if options.server_port == client_for_testing.DEFAULT_PORT:
549             fields.append('Host: %s\r\n' % options.server_host.lower())
550         else:
551             fields.append('Host: %s:%d\r\n' % (options.server_host.lower(),
552                                                options.server_port))
553         fields.append('Origin: %s\r\n' % options.origin.lower())
554
555         original_key = os.urandom(16)
556         key = base64.b64encode(original_key)
557         fields.append('Sec-WebSocket-Key: %s\r\n' % key)
558
559         fields.append('Sec-WebSocket-Version: 13\r\n')
560
561         if len(options.extensions) > 0:
562             fields.append('Sec-WebSocket-Extensions: %s\r\n' %
563                           ', '.join(options.extensions))
564
565         handshake = request_line + ''.join(fields) + '\r\n'
566         add_channel_request = _create_add_channel_request(
567             channel_id, handshake)
568         payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + add_channel_request
569         self._stream.send_binary(payload)
570
571         # Wait AddChannelResponse
572         self._logger.debug('Waiting AddChannelResponse for the request...')
573         response = None
574         try:
575             self._control_blocks_condition.acquire()
576             while True:
577                 for block in self._control_blocks:
578                     if block.opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
579                         continue
580                     if block.channel_id == channel_id:
581                         response = block
582                         self._control_blocks.remove(response)
583                         break
584                 if response:
585                     break
586                 self._control_blocks_condition.wait(self._timeout)
587                 if not self._is_active:
588                     raise Exception('AddChannelRequest timed out')
589         finally:
590             self._control_blocks_condition.release()
591
592         # Validate AddChannelResponse
593         if response.rejected:
594             raise Exception('The server rejected AddChannelRequest')
595
596         fields = _parse_handshake_response(response.encoded_handshake)
597
598         if not 'upgrade' in fields:
599             raise Exception('No Upgrade header')
600         if fields['upgrade'] != 'websocket':
601             raise Exception('Wrong Upgrade header')
602         if not 'connection' in fields:
603             raise Exception('No Connection header')
604         if fields['connection'] != 'Upgrade':
605             raise Exception('Wrong Connection header')
606         if not 'sec-websocket-accept' in fields:
607             raise Exception('No Sec-WebSocket-Accept header')
608
609         accept = fields['sec-websocket-accept']
610         try:
611             decoded_accept = base64.b64decode(accept)
612         except TypeError, e:
613             raise Exception(
614                 'Illegal value for header Sec-WebSocket-Accept: ' + accept)
615
616         if len(decoded_accept) != 20:
617             raise Exception(
618                 'Decoded value of Sec-WebSocket-Accept is not 20-byte long')
619
620         original_expected_accept = util.sha1_hash(
621             key + client_for_testing.WEBSOCKET_ACCEPT_UUID).digest()
622         expected_accept = base64.b64encode(original_expected_accept)
623
624         if accept != expected_accept:
625             raise Exception(
626                 'Invalid Sec-WebSocket-Accept header: %r (expected) != %r '
627                 '(actual)' % (accept, expected_accept))
628
629         self._logical_channels_condition.acquire()
630         self._logical_channels[channel_id] = _LogicalChannelData()
631         self._logical_channels[channel_id].send_quota = send_quota
632         self._logical_channels_condition.release()
633
634         self._logger.debug('Logical channel %d established' % channel_id)
635
636     def _check_logical_channel_is_opened(self, channel_id):
637         if not self._is_active:
638             raise Exception('Mux client is not active')
639
640         if not channel_id in self._logical_channels:
641             raise Exception('Logical channel %d is not established.')
642
643     def drop_channel(self, channel_id):
644         # TODO(bashi): Implement
645         pass
646
647     def send_flow_control(self, channel_id, replenished_quota):
648         self._check_logical_channel_is_opened(channel_id)
649         flow_control = _create_flow_control(channel_id, replenished_quota)
650         payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + flow_control
651         # Replenish receive quota
652         try:
653             self._logical_channels_condition.acquire()
654             self._logical_channels[channel_id].receive_quota += (
655                 replenished_quota)
656         finally:
657             self._logical_channels_condition.release()
658         self._stream.send_binary(payload)
659
660     def send_message(self, channel_id, message, end=True, binary=False):
661         self._check_logical_channel_is_opened(channel_id)
662
663         if binary:
664             first_byte = (end << 7) | client_for_testing.OPCODE_BINARY
665         else:
666             first_byte = (end << 7) | client_for_testing.OPCODE_TEXT
667             message = message.encode('utf-8')
668
669         try:
670             self._logical_channels_condition.acquire()
671             if self._logical_channels[channel_id].send_quota < len(message):
672                 raise Exception('Send quota violation: %d < %d' % (
673                         self._logical_channels[channel_id].send_quota,
674                         len(message)))
675
676             self._logical_channels[channel_id].send_quota -= len(message)
677         finally:
678             self._logical_channels_condition.release()
679         payload = _encode_channel_id(channel_id) + chr(first_byte) + message
680         self._stream.send_binary(payload)
681
682     def assert_receive(self, channel_id, payload, binary=False):
683         self._check_logical_channel_is_opened(channel_id)
684
685         try:
686             inner_frame = self._logical_channels[channel_id].queue.get(
687                 timeout=self._timeout)
688         except Queue.Empty, e:
689             raise Exception('Cannot receive message from channel id %d' %
690                             channel_id)
691
692         if binary:
693             opcode = client_for_testing.OPCODE_BINARY
694         else:
695             opcode = client_for_testing.OPCODE_TEXT
696
697         if inner_frame.opcode != opcode:
698             raise Exception('Unexpected opcode received (%r != %r)' %
699                             (expected_opcode, inner_frame.opcode))
700
701         if inner_frame.payload != payload:
702             raise Exception('Unexpected payload received')
703
704     def send_close(self, channel_id, code=None, reason=''):
705         self._check_logical_channel_is_opened(channel_id)
706
707         if code is not None:
708             body = struct.pack('!H', code) + reason.encode('utf-8')
709         else:
710             body = ''
711
712         first_byte = (1 << 7) | client_for_testing.OPCODE_CLOSE
713         payload = _encode_channel_id(channel_id) + chr(first_byte) + body
714         self._stream.send_binary(payload)
715
716     def assert_receive_close(self, channel_id):
717         self._check_logical_channel_is_opened(channel_id)
718
719         try:
720             inner_frame = self._logical_channels[channel_id].queue.get(
721                 timeout=self._timeout)
722         except Queue.Empty, e:
723             raise Exception('Cannot receive message from channel id %d' %
724                             channel_id)
725         if inner_frame.opcode != client_for_testing.OPCODE_CLOSE:
726             raise Exception('Didn\'t receive close frame')
727
728     def send_physical_connection_close(self, code=None, reason=''):
729         self._physical_connection_close_event = threading.Event()
730         self._stream.send_close(code, reason)
731
732     # This method can be used only after calling
733     # send_physical_connection_close().
734     def assert_physical_connection_receive_close(
735         self, code=client_for_testing.STATUS_NORMAL_CLOSURE, reason=''):
736         self._physical_connection_close_event.wait(timeout=self._timeout)
737         if (not self._physical_connection_close_event.isSet() or
738             not self._physical_connection_close_message):
739             raise Exception('Didn\'t receive closing handshake')
740
741     def close_socket(self):
742         self._is_active = False
743         self._socket.close()