simple-server: Add support for multi-party rooms
authorNirbheek Chauhan <nirbheek@centricular.com>
Sat, 28 Oct 2017 13:30:42 +0000 (19:00 +0530)
committerNirbheek Chauhan <nirbheek@centricular.com>
Sat, 28 Oct 2017 13:50:44 +0000 (19:20 +0530)
Also add a new room-client.py to test the protocol which is documented
in Protocol.md

webrtc/signalling/Protocol.md
webrtc/signalling/room-client.py [new file with mode: 0755]
webrtc/signalling/simple-server.py

index 18ef5fa..0aad2b8 100644 (file)
@@ -20,7 +20,7 @@ Basic websockets server implemented in Python that manages the peers list and sh
 
 This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
 
-### Peer registration and calling
+### Peer registration
 
 Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
 
@@ -31,13 +31,29 @@ This protocol builds upon https://github.com/shanet/WebRTC-Example/
 * Receive `HELLO`
 * Any other message starting with `ERROR` is an error.
 
-* To connect to a peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
+### 1-1 calls with a 'session'
+
+* To connect to a single peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
 * All further messages will be forwarded to the peer
 * The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
 
 * Closure of the server connection means the call has ended; either because the other peer ended it or went away
 * To end the call, disconnect from the server. You may reconnect again whenever you wish.
 
+### Multi-party calls with a 'room'
+
+* To create a multi-party call, you must first register (or join) a room. Send `ROOM <room_id>` where `<room_id>` is a unique room name
+* Receive `ROOM_OK ` from the server if this is a new room, or `ROOM_OK <peer1_id> <peer2_id> ...` where `<peerN_id>` are unique identifiers for the peers already in the room
+* To send messages to a specific peer within the room for call negotiation (or any other purpose, use `ROOM_PEER_MSG <peer_id> <msg>`
+* When a new peer joins the room, you will receive a `ROOM_PEER_JOINED <peer_id>` message
+ - For the purposes of convention and to avoid overwhelming newly-joined peers, offers must only be sent by the newly-joined peer
+* When a peer leaves the room, you will receive a `ROOM_PEER_LEFT <peer_id>` message
+ - You should stop sending/receiving media from/to this peer
+* To get a list of all peers currently in the room, send `ROOM_PEER_LIST` and receive `ROOM_PEER_LIST <peer1_id> ...`
+ - This list will never contain your own `<uid>`
+ - In theory you should never need to use this since you are guaranteed to receive JOINED and LEFT messages for all peers in a room
+* You may stay connected to a room for as long as you like
+
 ### Negotiation
 
 Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
diff --git a/webrtc/signalling/room-client.py b/webrtc/signalling/room-client.py
new file mode 100755 (executable)
index 0000000..e82a3f2
--- /dev/null
@@ -0,0 +1,107 @@
+#!/usr/bin/env python3
+#
+# Test client for simple room-based multi-peer p2p calling
+#
+# Copyright (C) 2017 Centricular Ltd.
+#
+#  Author: Nirbheek Chauhan <nirbheek@centricular.com>
+#
+
+import sys
+import ssl
+import json
+import uuid
+import asyncio
+import websockets
+import argparse
+
+parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to')
+parser.add_argument('--room', default=None, help='the room to join')
+
+options = parser.parse_args(sys.argv[1:])
+
+SERVER_ADDR = options.url
+PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6]
+ROOM_ID = options.room
+if ROOM_ID is None:
+    print('--room argument is required')
+    sys.exit(1)
+
+sslctx = False
+if SERVER_ADDR.startswith(('wss://', 'https://')):
+    sslctx = ssl.create_default_context()
+    # FIXME
+    sslctx.check_hostname = False
+    sslctx.verify_mode = ssl.CERT_NONE
+
+def get_answer_sdp(offer, peer_id):
+    # Here we'd parse the incoming JSON message for ICE and SDP candidates
+    print("Got: " + offer)
+    sdp = json.dumps({'sdp': 'reply sdp'})
+    answer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp)
+    print("Sent: " + answer)
+    return answer
+
+def get_offer_sdp(peer_id):
+    sdp = json.dumps({'sdp': 'initial sdp'})
+    offer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp)
+    print("Sent: " + offer)
+    return offer
+
+async def hello():
+    async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws:
+        await ws.send('HELLO ' + PEER_ID)
+        assert(await ws.recv() == 'HELLO')
+
+        await ws.send('ROOM {}'.format(ROOM_ID))
+
+        sent_offers = set()
+        # Receive messages
+        while True:
+            msg = await ws.recv()
+            if msg.startswith('ERROR'):
+                # On error, we bring down the webrtc pipeline, etc
+                print('{!r}, exiting'.format(msg))
+                return
+            if msg.startswith('ROOM_OK'):
+                print('Got ROOM_OK for room {!r}'.format(ROOM_ID))
+                _, *room_peers = msg.split()
+                for peer_id in room_peers:
+                    print('Sending offer to {!r}'.format(peer_id))
+                    # Create a peer connection for each peer and start
+                    # exchanging SDP and ICE candidates
+                    await ws.send(get_offer_sdp(peer_id))
+                    sent_offers.add(peer_id)
+                continue
+            elif msg.startswith('ROOM_PEER'):
+                if msg.startswith('ROOM_PEER_JOINED'):
+                    _, peer_id = msg.split(maxsplit=1)
+                    print('Peer {!r} joined the room'.format(peer_id))
+                    # Peer will send us an offer
+                    continue
+                if msg.startswith('ROOM_PEER_LEFT'):
+                    _, peer_id = msg.split(maxsplit=1)
+                    print('Peer {!r} left the room'.format(peer_id))
+                    continue
+                elif msg.startswith('ROOM_PEER_MSG'):
+                    _, peer_id, msg = msg.split(maxsplit=2)
+                    if peer_id in sent_offers:
+                        print('Got answer from {!r}: {}'.format(peer_id, msg))
+                        continue
+                    print('Got offer from {!r}, replying'.format(peer_id))
+                    await ws.send(get_answer_sdp(msg, peer_id))
+                    continue
+            print('Unknown msg: {!r}, exiting'.format(msg))
+            return
+
+print('Our uid is {!r}'.format(PEER_ID))
+
+try:
+    asyncio.get_event_loop().run_until_complete(hello())
+except websockets.exceptions.InvalidHandshake:
+    print('Invalid handshake: are you sure this is a websockets server?\n')
+    raise
+except ssl.SSLError:
+    print('SSL Error: are you sure the server is using TLS?\n')
+    raise
index 34af863..9714213 100755 (executable)
@@ -28,12 +28,19 @@ options = parser.parse_args(sys.argv[1:])
 ADDR_PORT = (options.addr, options.port)
 KEEPALIVE_TIMEOUT = options.keepalive_timeout
 
-# Format: {uid: (Peer WebSocketServerProtocol, remote_address)}
+############### Global data ###############
+
+# Format: {uid: (Peer WebSocketServerProtocol,
+#                remote_address,
+#                <'session'|room_id|None>)}
 peers = dict()
 # Format: {caller_uid: callee_uid,
 #          callee_uid: caller_uid}
 # Bidirectional mapping between the two peers
 sessions = dict()
+# Format: {room_id: {peer1_id, peer2_id, peer3_id, ...}}
+# Room dict with a set of peers in each room
+rooms = dict()
 
 ############### Helper functions ###############
 
@@ -65,7 +72,7 @@ async def disconnect(ws, peer_id):
         # Don't care about errors
         asyncio.ensure_future(ws.close(reason='hangup'))
 
-async def remove_peer(uid):
+async def cleanup_session(uid):
     if uid in sessions:
         other_id = sessions[uid]
         del sessions[uid]
@@ -77,47 +84,129 @@ async def remove_peer(uid):
             # close the connection to reset its state.
             if other_id in peers:
                 print("Closing connection to {}".format(other_id))
-                wso, oaddr = peers[other_id]
+                wso, oaddr, _ = peers[other_id]
                 del peers[other_id]
                 await wso.close()
+
+async def cleanup_room(uid, room_id):
+    room_peers = rooms[room_id]
+    if uid not in room_peers:
+        return
+    room_peers.remove(uid)
+    for pid in room_peers:
+        wsp, paddr, _ = peers[pid]
+        msg = 'ROOM_PEER_LEFT {}'.format(uid)
+        print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
+        await wsp.send(msg)
+
+async def remove_peer(uid):
+    await cleanup_session(uid)
     if uid in peers:
-        ws, raddr = peers[uid]
+        ws, raddr, status = peers[uid]
+        if status and status != 'session':
+            await cleanup_room(uid, status)
         del peers[uid]
         await ws.close()
         print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
 
 ############### Handler functions ###############
 
-async def connection_handler(ws, peer_id):
-    global peers, sessions
+async def connection_handler(ws, uid):
+    global peers, sessions, rooms
     raddr = ws.remote_address
-    peers[peer_id] = (ws, raddr)
-    print("Registered peer {!r} at {!r}".format(peer_id, raddr))
+    peer_status = None
+    peers[uid] = [ws, raddr, peer_status]
+    print("Registered peer {!r} at {!r}".format(uid, raddr))
     while True:
         # Receive command, wait forever if necessary
         msg = await recv_msg_ping(ws, raddr)
-        if msg.startswith('SESSION'):
-            print("{!r} command {!r}".format(peer_id, msg))
+        # Update current status
+        peer_status = peers[uid][2]
+        # We are in a session or a room, messages must be relayed
+        if peer_status is not None:
+            # We're in a session, route message to connected peer
+            if peer_status == 'session':
+                other_id = sessions[uid]
+                wso, oaddr, status = peers[other_id]
+                assert(status == 'session')
+                print("{} -> {}: {}".format(uid, other_id, msg))
+                await wso.send(msg)
+            # We're in a room, accept room-specific commands
+            elif peer_status:
+                # ROOM_PEER_MSG peer_id MSG
+                if msg.startswith('ROOM_PEER_MSG'):
+                    _, other_id, msg = msg.split(maxsplit=2)
+                    if other_id not in peers:
+                        await ws.send('ERROR peer {!r} not found'
+                                      ''.format(other_id))
+                        continue
+                    wso, oaddr, status = peers[other_id]
+                    if status != room_id:
+                        await ws.send('ERROR peer {!r} is not in the room'
+                                      ''.format(other_id))
+                        continue
+                    msg = 'ROOM_PEER_MSG {} {}'.format(uid, msg)
+                    print('room {}: {} -> {}: {}'.format(room_id, uid, other_id, msg))
+                    await wso.send(msg)
+                elif msg == 'ROOM_PEER_LIST':
+                    room_id = peers[peer_id][2]
+                    room_peers = ' '.join([pid for pid in rooms[room_id] if pid != peer_id])
+                    msg = 'ROOM_PEER_LIST {}'.format(room_peers)
+                    print('room {}: -> {}: {}'.format(room_id, uid, msg))
+                    await ws.send(msg)
+                else:
+                    await ws.send('ERROR invalid msg, already in room')
+                    continue
+            else:
+                raise AssertionError('Unknown peer status {!r}'.format(peer_status))
+        # Requested a session with a specific peer
+        elif msg.startswith('SESSION'):
+            print("{!r} command {!r}".format(uid, msg))
             _, callee_id = msg.split(maxsplit=1)
             if callee_id not in peers:
                 await ws.send('ERROR peer {!r} not found'.format(callee_id))
                 continue
-            if callee_id in sessions:
+            if peer_status is not None:
                 await ws.send('ERROR peer {!r} busy'.format(callee_id))
                 continue
             await ws.send('SESSION_OK')
             wsc = peers[callee_id][0]
-            print("Session from {!r} ({!r}) to {!r} ({!r})".format(peer_id, raddr, callee_id,
-                                                                    wsc.remote_address))
-            # Register call
-            sessions[peer_id] = callee_id
-            sessions[callee_id] = peer_id
-        # We're in a session, route message to connected peer
-        elif peer_id in sessions:
-            other_id = sessions[peer_id]
-            wso, oaddr = peers[other_id]
-            print("{} -> {}: {}".format(peer_id, other_id, msg))
-            await wso.send(msg)
+            print('Session from {!r} ({!r}) to {!r} ({!r})'
+                  ''.format(uid, raddr, callee_id, wsc.remote_address))
+            # Register session
+            peers[uid][2] = peer_status = 'session'
+            sessions[uid] = callee_id
+            peers[callee_id][2] = 'session'
+            sessions[callee_id] = uid
+        # Requested joining or creation of a room
+        elif msg.startswith('ROOM'):
+            print('{!r} command {!r}'.format(uid, msg))
+            _, room_id = msg.split(maxsplit=1)
+            # Room name cannot be 'session', empty, or contain whitespace
+            if room_id == 'session' or room_id.split() != [room_id]:
+                await ws.send('ERROR invalid room id {!r}'.format(room_id))
+                continue
+            if room_id in rooms:
+                if uid in rooms[room_id]:
+                    raise AssertionError('How did we accept a ROOM command '
+                                         'despite already being in a room?')
+            else:
+                # Create room if required
+                rooms[room_id] = set()
+            room_peers = ' '.join([pid for pid in rooms[room_id]])
+            await ws.send('ROOM_OK {}'.format(room_peers))
+            # Enter room
+            peers[uid][2] = peer_status = room_id
+            rooms[room_id].add(uid)
+            for pid in rooms[room_id]:
+                if pid == uid:
+                    continue
+                wsp, paddr, _ = peers[pid]
+                msg = 'ROOM_PEER_JOINED {}'.format(uid)
+                print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
+                await wsp.send(msg)
+        else:
+            print('Ignoring unknown message {!r} from {!r}'.format(msg, uid))
 
 async def hello_peer(ws):
     '''
@@ -129,7 +218,7 @@ async def hello_peer(ws):
     if hello != 'HELLO':
         await ws.close(code=1002, reason='invalid protocol')
         raise Exception("Invalid hello from {!r}".format(raddr))
-    if not uid or uid in peers:
+    if not uid or uid in peers or uid.split() != [uid]: # no whitespace
         await ws.close(code=1002, reason='invalid peer uid')
         raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
     # Send back a HELLO