Add a simple python3 webrtc signalling server
authorNirbheek Chauhan <nirbheek@centricular.com>
Sat, 21 Oct 2017 14:26:52 +0000 (19:56 +0530)
committerNirbheek Chauhan <nirbheek@centricular.com>
Sat, 21 Oct 2017 14:26:52 +0000 (19:56 +0530)
+ client for testing + protocol documentation

webrtc/signalling/Protocol.md [new file with mode: 0644]
webrtc/signalling/README.md [new file with mode: 0644]
webrtc/signalling/client.py [new file with mode: 0755]
webrtc/signalling/generate_cert.sh [new file with mode: 0755]
webrtc/signalling/simple-server.py [new file with mode: 0755]

diff --git a/webrtc/signalling/Protocol.md b/webrtc/signalling/Protocol.md
new file mode 100644 (file)
index 0000000..1af892e
--- /dev/null
@@ -0,0 +1,80 @@
+# Terminology
+
+### Client
+
+A GStreamer-based application
+
+### Browser
+
+A JS application that runs in the browser and uses built-in browser webrtc APIs
+
+### Peer
+Any webrtc-using application that can participate in a call
+
+### Signalling server
+
+Basic websockets server implemented in Python that manages the peers list and shovels data between peers
+
+# Overview
+
+This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
+
+# Peer registration and calling
+
+Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
+
+This protocol builds upon https://github.com/shanet/WebRTC-Example/
+
+* Connect to the websocket server
+* Send `HELLO <uid>` where `<uid>` is a string which will uniquely identify this peer
+* Receive `HELLO`
+* Any other message starting with `ERROR` is an error.
+
+* To connect to a peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
+* All further messages will be forwarded to the peer
+* The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
+
+* Closure of the server connection means the call has ended; either because the other peer ended it or went away
+* To end the call, disconnect from the server. You may reconnect again whenever you wish.
+
+# Negotiation
+
+Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
+
+The calling side must create an SDP offer and send it to the peer as a JSON object:
+
+```json
+{
+    "sdp": {
+                "sdp": "o=- [....]",
+                "type": "offer"
+    }
+}
+```
+
+The callee must then reply with an answer:
+
+```json
+{
+    "sdp": {
+                "sdp": "o=- [....]",
+                "type": "answer"
+    }
+}
+```
+
+ICE candidates must be exchanged similarly by exchanging JSON objects:
+
+
+```json
+{
+    "ice": {
+                "candidate": ...,
+                "sdpMLineIndex": ...,
+                ...
+    }
+}
+```
+
+Note that the structure of these is the same as that specified by the WebRTC spec.
diff --git a/webrtc/signalling/README.md b/webrtc/signalling/README.md
new file mode 100644 (file)
index 0000000..8981782
--- /dev/null
@@ -0,0 +1,26 @@
+## Overview
+
+Read Protocol.md
+
+## Dependencies
+
+* Python 3
+* pip3 install --user websockets
+
+## Example usage
+
+In three separate tabs, run consecutively:
+
+```console
+$ ./generate_certs.sh
+$ ./simple-server.py
+```
+
+```console
+$ ./client.py
+Our uid is 'ws-test-client-8f63b9'
+```
+
+```console
+$ ./client.py --call ws-test-client-8f63b9
+```
diff --git a/webrtc/signalling/client.py b/webrtc/signalling/client.py
new file mode 100755 (executable)
index 0000000..0fa8227
--- /dev/null
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+#
+# Test client for simple 1-1 call signalling server
+#
+# Copyright (C) 2017 Centricular Ltd.
+#
+#  Author: Nirbheek Chauhan <nirbheek@centricular.com>
+#
+
+import sys
+import ssl
+import json
+import uuid
+import asyncio
+import websockets
+import argparse
+
+parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to')
+parser.add_argument('--call', default=None, help='uid of peer to call')
+
+options = parser.parse_args(sys.argv[1:])
+
+SERVER_ADDR = options.url
+CALLEE_ID = options.call
+PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6]
+
+sslctx = False
+if SERVER_ADDR.startswith(('wss://', 'https://')):
+    sslctx = ssl.create_default_context()
+    # FIXME
+    sslctx.check_hostname = False
+    sslctx.verify_mode = ssl.CERT_NONE
+
+def reply_sdp_ice(msg):
+    # Here we'd parse the incoming JSON message for ICE and SDP candidates
+    print("Got: " + msg)
+    reply = json.dumps({'sdp': 'reply sdp'})
+    print("Sent: " + reply)
+    return reply
+
+def send_sdp_ice():
+    reply = json.dumps({'sdp': 'initial sdp'})
+    print("Sent: " + reply)
+    return reply
+
+async def hello():
+    async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws:
+        await ws.send('HELLO ' + PEER_ID)
+        assert(await ws.recv() == 'HELLO')
+
+        # Initiate call if requested
+        if CALLEE_ID:
+            await ws.send('CALL {}'.format(CALLEE_ID))
+
+        # Receive messages
+        while True:
+            msg = await ws.recv()
+            if msg.startswith('ERROR'):
+                # On error, we bring down the webrtc pipeline, etc
+                print('{!r}, exiting'.format(msg))
+                return
+            if CALLEE_ID:
+                if msg == 'CALL_OK':
+                    await ws.send(send_sdp_ice())
+                    # Return so we don't have an infinite loop
+                    return
+                else:
+                    print('Unknown reply: {!r}, exiting'.format(msg))
+                    return
+            else:
+                await ws.send(reply_sdp_ice(msg))
+                # Return so we don't have an infinite loop
+                return
+
+print('Our uid is {!r}'.format(PEER_ID))
+
+try:
+    asyncio.get_event_loop().run_until_complete(hello())
+except websockets.exceptions.InvalidHandshake:
+    print('Invalid handshake: are you sure this is a websockets server?\n')
+    raise
+except ssl.SSLError:
+    print('SSL Error: are you sure the server is using TLS?\n')
+    raise
diff --git a/webrtc/signalling/generate_cert.sh b/webrtc/signalling/generate_cert.sh
new file mode 100755 (executable)
index 0000000..68a4b96
--- /dev/null
@@ -0,0 +1,3 @@
+#! /bin/bash
+
+openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes
diff --git a/webrtc/signalling/simple-server.py b/webrtc/signalling/simple-server.py
new file mode 100755 (executable)
index 0000000..34af863
--- /dev/null
@@ -0,0 +1,187 @@
+#!/usr/bin/env python3
+#
+# Example 1-1 call signalling server
+#
+# Copyright (C) 2017 Centricular Ltd.
+#
+#  Author: Nirbheek Chauhan <nirbheek@centricular.com>
+#
+
+import os
+import sys
+import ssl
+import logging
+import asyncio
+import websockets
+import argparse
+
+from concurrent.futures._base import TimeoutError
+
+parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('--addr', default='0.0.0.0', help='Address to listen on')
+parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
+parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
+parser.add_argument('--cert-path', default=os.path.dirname(__file__))
+
+options = parser.parse_args(sys.argv[1:])
+
+ADDR_PORT = (options.addr, options.port)
+KEEPALIVE_TIMEOUT = options.keepalive_timeout
+
+# Format: {uid: (Peer WebSocketServerProtocol, remote_address)}
+peers = dict()
+# Format: {caller_uid: callee_uid,
+#          callee_uid: caller_uid}
+# Bidirectional mapping between the two peers
+sessions = dict()
+
+############### Helper functions ###############
+
+async def recv_msg_ping(ws, raddr):
+    '''
+    Wait for a message forever, and send a regular ping to prevent bad routers
+    from closing the connection.
+    '''
+    msg = None
+    while msg is None:
+        try:
+            msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT)
+        except TimeoutError:
+            print('Sending keepalive ping to {!r} in recv'.format(raddr))
+            await ws.ping()
+    return msg
+
+async def disconnect(ws, peer_id):
+    '''
+    Remove @peer_id from the list of sessions and close our connection to it.
+    This informs the peer that the session and all calls have ended, and it
+    must reconnect.
+    '''
+    global sessions
+    if peer_id in sessions:
+        del sessions[peer_id]
+    # Close connection
+    if ws and ws.open:
+        # Don't care about errors
+        asyncio.ensure_future(ws.close(reason='hangup'))
+
+async def remove_peer(uid):
+    if uid in sessions:
+        other_id = sessions[uid]
+        del sessions[uid]
+        print("Cleaned up {} session".format(uid))
+        if other_id in sessions:
+            del sessions[other_id]
+            print("Also cleaned up {} session".format(other_id))
+            # If there was a session with this peer, also
+            # close the connection to reset its state.
+            if other_id in peers:
+                print("Closing connection to {}".format(other_id))
+                wso, oaddr = peers[other_id]
+                del peers[other_id]
+                await wso.close()
+    if uid in peers:
+        ws, raddr = peers[uid]
+        del peers[uid]
+        await ws.close()
+        print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
+
+############### Handler functions ###############
+
+async def connection_handler(ws, peer_id):
+    global peers, sessions
+    raddr = ws.remote_address
+    peers[peer_id] = (ws, raddr)
+    print("Registered peer {!r} at {!r}".format(peer_id, raddr))
+    while True:
+        # Receive command, wait forever if necessary
+        msg = await recv_msg_ping(ws, raddr)
+        if msg.startswith('SESSION'):
+            print("{!r} command {!r}".format(peer_id, msg))
+            _, callee_id = msg.split(maxsplit=1)
+            if callee_id not in peers:
+                await ws.send('ERROR peer {!r} not found'.format(callee_id))
+                continue
+            if callee_id in sessions:
+                await ws.send('ERROR peer {!r} busy'.format(callee_id))
+                continue
+            await ws.send('SESSION_OK')
+            wsc = peers[callee_id][0]
+            print("Session from {!r} ({!r}) to {!r} ({!r})".format(peer_id, raddr, callee_id,
+                                                                    wsc.remote_address))
+            # Register call
+            sessions[peer_id] = callee_id
+            sessions[callee_id] = peer_id
+        # We're in a session, route message to connected peer
+        elif peer_id in sessions:
+            other_id = sessions[peer_id]
+            wso, oaddr = peers[other_id]
+            print("{} -> {}: {}".format(peer_id, other_id, msg))
+            await wso.send(msg)
+
+async def hello_peer(ws):
+    '''
+    Exchange hello, register peer
+    '''
+    raddr = ws.remote_address
+    hello = await ws.recv()
+    hello, uid = hello.split(maxsplit=1)
+    if hello != 'HELLO':
+        await ws.close(code=1002, reason='invalid protocol')
+        raise Exception("Invalid hello from {!r}".format(raddr))
+    if not uid or uid in peers:
+        await ws.close(code=1002, reason='invalid peer uid')
+        raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
+    # Send back a HELLO
+    await ws.send('HELLO')
+    return uid
+
+async def handler(ws, path):
+    '''
+    All incoming messages are handled here. @path is unused.
+    '''
+    raddr = ws.remote_address
+    print("Connected to {!r}".format(raddr))
+    peer_id = await hello_peer(ws)
+    try:
+        await connection_handler(ws, peer_id)
+    except websockets.ConnectionClosed:
+        print("Connection to peer {!r} closed, exiting handler".format(raddr))
+    finally:
+        await remove_peer(peer_id)
+
+# Create an SSL context to be used by the websocket server
+certpath = options.cert_path
+print('Using TLS with keys in {!r}'.format(certpath))
+if 'letsencrypt' in certpath:
+    chain_pem = os.path.join(certpath, 'fullchain.pem')
+    key_pem = os.path.join(certpath, 'privkey.pem')
+else:
+    chain_pem = os.path.join(certpath, 'cert.pem')
+    key_pem = os.path.join(certpath, 'key.pem')
+
+sslctx = ssl.create_default_context()
+try:
+    sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
+except FileNotFoundError:
+    print("Certificates not found, did you run generate_cert.sh?")
+    sys.exit(1)
+# FIXME
+sslctx.check_hostname = False
+sslctx.verify_mode = ssl.CERT_NONE
+
+print("Listening on https://{}:{}".format(*ADDR_PORT))
+# Websocket server
+wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx,
+                       # Maximum number of messages that websockets will pop
+                       # off the asyncio and OS buffers per connection. See:
+                       # https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
+                       max_queue=16)
+
+logger = logging.getLogger('websockets.server')
+
+logger.setLevel(logging.ERROR)
+logger.addHandler(logging.StreamHandler())
+
+asyncio.get_event_loop().run_until_complete(wsd)
+asyncio.get_event_loop().run_forever()