1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
12 from telemetry.core.backends.chrome import websocket
15 # Minimal handler for a local websocket server.
16 class _FakeWebSocketHandler(BaseHTTPServer.BaseHTTPRequestHandler):
18 key = self.headers.getheader('Sec-WebSocket-Key')
20 value = key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
21 hashed = base64.encodestring(hashlib.sha1(value).digest()).strip().lower()
23 self.send_response(101)
25 self.send_header('Sec-Websocket-Accept', hashed)
26 self.send_header('upgrade', 'websocket')
27 self.send_header('connection', 'upgrade')
33 class TestWebSocket(unittest.TestCase):
34 def testExports(self):
35 self.assertNotEqual(websocket.create_connection, None)
36 self.assertNotEqual(websocket.WebSocketException, None)
37 self.assertNotEqual(websocket.WebSocketTimeoutException, None)
39 def testSockOpts(self):
40 httpd = BaseHTTPServer.HTTPServer(('127.0.0.1', 0), _FakeWebSocketHandler)
41 ws_url = 'ws://127.0.0.1:%d' % httpd.server_port
43 threading.Thread(target=httpd.handle_request).start()
44 ws = websocket.create_connection(ws_url)
47 ws.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR), 0)
51 threading.Thread(target=httpd.handle_request).start()
52 ws = websocket.create_connection(
54 sockopt=[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)])
57 ws.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR), 0)
59 ws.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)