1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
13 from telemetry.core.backends.chrome import websocket
16 # Minimal handler for a local websocket server.
17 class _FakeWebSocketHandler(BaseHTTPServer.BaseHTTPRequestHandler):
19 key = self.headers.getheader('Sec-WebSocket-Key')
21 value = key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
22 hashed = base64.encodestring(hashlib.sha1(value).digest()).strip().lower()
24 self.send_response(101)
26 self.send_header('Sec-Websocket-Accept', hashed)
27 self.send_header('upgrade', 'websocket')
28 self.send_header('connection', 'upgrade')
34 class TestWebSocket(unittest.TestCase):
35 def testExports(self):
36 self.assertNotEqual(websocket.create_connection, None)
37 self.assertNotEqual(websocket.WebSocketException, None)
38 self.assertNotEqual(websocket.WebSocketTimeoutException, None)
40 def testSockOpts(self):
41 httpd = BaseHTTPServer.HTTPServer(('', 0), _FakeWebSocketHandler)
42 threading.Thread(target=httpd.handle_request).start()
43 ws_url = 'ws://127.0.0.1:%d' % httpd.server_port
44 ws = websocket.create_connection(ws_url)
46 ws.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR), 0)
48 threading.Thread(target=httpd.handle_request).start()
49 ws = websocket.create_connection(
51 sockopt=[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)])
53 ws.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR), 0)
55 ws.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)