--- /dev/null
+# Terminology
+
+### Client
+
+A GStreamer-based application
+
+### Browser
+
+A JS application that runs in the browser and uses built-in browser webrtc APIs
+
+### Peer
+
+Any webrtc-using application that can participate in a call
+
+### Signalling server
+
+Basic websockets server implemented in Python that manages the peers list and shovels data between peers
+
+# Overview
+
+This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
+
+# Peer registration and calling
+
+Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
+
+This protocol builds upon https://github.com/shanet/WebRTC-Example/
+
+* Connect to the websocket server
+* Send `HELLO <uid>` where `<uid>` is a string which will uniquely identify this peer
+* Receive `HELLO`
+* Any other message starting with `ERROR` is an error.
+
+* To connect to a peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
+* All further messages will be forwarded to the peer
+* The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
+
+* Closure of the server connection means the call has ended; either because the other peer ended it or went away
+* To end the call, disconnect from the server. You may reconnect again whenever you wish.
+
+# Negotiation
+
+Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
+
+The calling side must create an SDP offer and send it to the peer as a JSON object:
+
+```json
+{
+ "sdp": {
+ "sdp": "o=- [....]",
+ "type": "offer"
+ }
+}
+```
+
+The callee must then reply with an answer:
+
+```json
+{
+ "sdp": {
+ "sdp": "o=- [....]",
+ "type": "answer"
+ }
+}
+```
+
+ICE candidates must be exchanged similarly by exchanging JSON objects:
+
+
+```json
+{
+ "ice": {
+ "candidate": ...,
+ "sdpMLineIndex": ...,
+ ...
+ }
+}
+```
+
+Note that the structure of these is the same as that specified by the WebRTC spec.
--- /dev/null
+#!/usr/bin/env python3
+#
+# Test client for simple 1-1 call signalling server
+#
+# Copyright (C) 2017 Centricular Ltd.
+#
+# Author: Nirbheek Chauhan <nirbheek@centricular.com>
+#
+
+import sys
+import ssl
+import json
+import uuid
+import asyncio
+import websockets
+import argparse
+
+parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to')
+parser.add_argument('--call', default=None, help='uid of peer to call')
+
+options = parser.parse_args(sys.argv[1:])
+
+SERVER_ADDR = options.url
+CALLEE_ID = options.call
+PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6]
+
+sslctx = False
+if SERVER_ADDR.startswith(('wss://', 'https://')):
+ sslctx = ssl.create_default_context()
+ # FIXME
+ sslctx.check_hostname = False
+ sslctx.verify_mode = ssl.CERT_NONE
+
+def reply_sdp_ice(msg):
+ # Here we'd parse the incoming JSON message for ICE and SDP candidates
+ print("Got: " + msg)
+ reply = json.dumps({'sdp': 'reply sdp'})
+ print("Sent: " + reply)
+ return reply
+
+def send_sdp_ice():
+ reply = json.dumps({'sdp': 'initial sdp'})
+ print("Sent: " + reply)
+ return reply
+
+async def hello():
+ async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws:
+ await ws.send('HELLO ' + PEER_ID)
+ assert(await ws.recv() == 'HELLO')
+
+ # Initiate call if requested
+ if CALLEE_ID:
+ await ws.send('CALL {}'.format(CALLEE_ID))
+
+ # Receive messages
+ while True:
+ msg = await ws.recv()
+ if msg.startswith('ERROR'):
+ # On error, we bring down the webrtc pipeline, etc
+ print('{!r}, exiting'.format(msg))
+ return
+ if CALLEE_ID:
+ if msg == 'CALL_OK':
+ await ws.send(send_sdp_ice())
+ # Return so we don't have an infinite loop
+ return
+ else:
+ print('Unknown reply: {!r}, exiting'.format(msg))
+ return
+ else:
+ await ws.send(reply_sdp_ice(msg))
+ # Return so we don't have an infinite loop
+ return
+
+print('Our uid is {!r}'.format(PEER_ID))
+
+try:
+ asyncio.get_event_loop().run_until_complete(hello())
+except websockets.exceptions.InvalidHandshake:
+ print('Invalid handshake: are you sure this is a websockets server?\n')
+ raise
+except ssl.SSLError:
+ print('SSL Error: are you sure the server is using TLS?\n')
+ raise
--- /dev/null
+#!/usr/bin/env python3
+#
+# Example 1-1 call signalling server
+#
+# Copyright (C) 2017 Centricular Ltd.
+#
+# Author: Nirbheek Chauhan <nirbheek@centricular.com>
+#
+
+import os
+import sys
+import ssl
+import logging
+import asyncio
+import websockets
+import argparse
+
+from concurrent.futures._base import TimeoutError
+
+parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('--addr', default='0.0.0.0', help='Address to listen on')
+parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
+parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
+parser.add_argument('--cert-path', default=os.path.dirname(__file__))
+
+options = parser.parse_args(sys.argv[1:])
+
+ADDR_PORT = (options.addr, options.port)
+KEEPALIVE_TIMEOUT = options.keepalive_timeout
+
+# Format: {uid: (Peer WebSocketServerProtocol, remote_address)}
+peers = dict()
+# Format: {caller_uid: callee_uid,
+# callee_uid: caller_uid}
+# Bidirectional mapping between the two peers
+sessions = dict()
+
+############### Helper functions ###############
+
+async def recv_msg_ping(ws, raddr):
+ '''
+ Wait for a message forever, and send a regular ping to prevent bad routers
+ from closing the connection.
+ '''
+ msg = None
+ while msg is None:
+ try:
+ msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT)
+ except TimeoutError:
+ print('Sending keepalive ping to {!r} in recv'.format(raddr))
+ await ws.ping()
+ return msg
+
+async def disconnect(ws, peer_id):
+ '''
+ Remove @peer_id from the list of sessions and close our connection to it.
+ This informs the peer that the session and all calls have ended, and it
+ must reconnect.
+ '''
+ global sessions
+ if peer_id in sessions:
+ del sessions[peer_id]
+ # Close connection
+ if ws and ws.open:
+ # Don't care about errors
+ asyncio.ensure_future(ws.close(reason='hangup'))
+
+async def remove_peer(uid):
+ if uid in sessions:
+ other_id = sessions[uid]
+ del sessions[uid]
+ print("Cleaned up {} session".format(uid))
+ if other_id in sessions:
+ del sessions[other_id]
+ print("Also cleaned up {} session".format(other_id))
+ # If there was a session with this peer, also
+ # close the connection to reset its state.
+ if other_id in peers:
+ print("Closing connection to {}".format(other_id))
+ wso, oaddr = peers[other_id]
+ del peers[other_id]
+ await wso.close()
+ if uid in peers:
+ ws, raddr = peers[uid]
+ del peers[uid]
+ await ws.close()
+ print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
+
+############### Handler functions ###############
+
+async def connection_handler(ws, peer_id):
+ global peers, sessions
+ raddr = ws.remote_address
+ peers[peer_id] = (ws, raddr)
+ print("Registered peer {!r} at {!r}".format(peer_id, raddr))
+ while True:
+ # Receive command, wait forever if necessary
+ msg = await recv_msg_ping(ws, raddr)
+ if msg.startswith('SESSION'):
+ print("{!r} command {!r}".format(peer_id, msg))
+ _, callee_id = msg.split(maxsplit=1)
+ if callee_id not in peers:
+ await ws.send('ERROR peer {!r} not found'.format(callee_id))
+ continue
+ if callee_id in sessions:
+ await ws.send('ERROR peer {!r} busy'.format(callee_id))
+ continue
+ await ws.send('SESSION_OK')
+ wsc = peers[callee_id][0]
+ print("Session from {!r} ({!r}) to {!r} ({!r})".format(peer_id, raddr, callee_id,
+ wsc.remote_address))
+ # Register call
+ sessions[peer_id] = callee_id
+ sessions[callee_id] = peer_id
+ # We're in a session, route message to connected peer
+ elif peer_id in sessions:
+ other_id = sessions[peer_id]
+ wso, oaddr = peers[other_id]
+ print("{} -> {}: {}".format(peer_id, other_id, msg))
+ await wso.send(msg)
+
+async def hello_peer(ws):
+ '''
+ Exchange hello, register peer
+ '''
+ raddr = ws.remote_address
+ hello = await ws.recv()
+ hello, uid = hello.split(maxsplit=1)
+ if hello != 'HELLO':
+ await ws.close(code=1002, reason='invalid protocol')
+ raise Exception("Invalid hello from {!r}".format(raddr))
+ if not uid or uid in peers:
+ await ws.close(code=1002, reason='invalid peer uid')
+ raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
+ # Send back a HELLO
+ await ws.send('HELLO')
+ return uid
+
+async def handler(ws, path):
+ '''
+ All incoming messages are handled here. @path is unused.
+ '''
+ raddr = ws.remote_address
+ print("Connected to {!r}".format(raddr))
+ peer_id = await hello_peer(ws)
+ try:
+ await connection_handler(ws, peer_id)
+ except websockets.ConnectionClosed:
+ print("Connection to peer {!r} closed, exiting handler".format(raddr))
+ finally:
+ await remove_peer(peer_id)
+
+# Create an SSL context to be used by the websocket server
+certpath = options.cert_path
+print('Using TLS with keys in {!r}'.format(certpath))
+if 'letsencrypt' in certpath:
+ chain_pem = os.path.join(certpath, 'fullchain.pem')
+ key_pem = os.path.join(certpath, 'privkey.pem')
+else:
+ chain_pem = os.path.join(certpath, 'cert.pem')
+ key_pem = os.path.join(certpath, 'key.pem')
+
+sslctx = ssl.create_default_context()
+try:
+ sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
+except FileNotFoundError:
+ print("Certificates not found, did you run generate_cert.sh?")
+ sys.exit(1)
+# FIXME
+sslctx.check_hostname = False
+sslctx.verify_mode = ssl.CERT_NONE
+
+print("Listening on https://{}:{}".format(*ADDR_PORT))
+# Websocket server
+wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx,
+ # Maximum number of messages that websockets will pop
+ # off the asyncio and OS buffers per connection. See:
+ # https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
+ max_queue=16)
+
+logger = logging.getLogger('websockets.server')
+
+logger.setLevel(logging.ERROR)
+logger.addHandler(logging.StreamHandler())
+
+asyncio.get_event_loop().run_until_complete(wsd)
+asyncio.get_event_loop().run_forever()