From d687ff3d9130cc297482b34a8f578f6627e65458 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Sat, 28 Oct 2017 19:00:42 +0530 Subject: [PATCH] simple-server: Add support for multi-party rooms Also add a new room-client.py to test the protocol which is documented in Protocol.md --- webrtc/signalling/Protocol.md | 20 +++++- webrtc/signalling/room-client.py | 107 +++++++++++++++++++++++++++++ webrtc/signalling/simple-server.py | 135 ++++++++++++++++++++++++++++++------- 3 files changed, 237 insertions(+), 25 deletions(-) create mode 100755 webrtc/signalling/room-client.py diff --git a/webrtc/signalling/Protocol.md b/webrtc/signalling/Protocol.md index 18ef5fa..0aad2b8 100644 --- a/webrtc/signalling/Protocol.md +++ b/webrtc/signalling/Protocol.md @@ -20,7 +20,7 @@ Basic websockets server implemented in Python that manages the peers list and sh This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser. -### Peer registration and calling +### Peer registration Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call. @@ -31,13 +31,29 @@ This protocol builds upon https://github.com/shanet/WebRTC-Example/ * Receive `HELLO` * Any other message starting with `ERROR` is an error. -* To connect to a peer, send `SESSION ` where `` identifies the peer to connect to, and receive `SESSION_OK` +### 1-1 calls with a 'session' + +* To connect to a single peer, send `SESSION ` where `` identifies the peer to connect to, and receive `SESSION_OK` * All further messages will be forwarded to the peer * The call negotiation with the peer can be started by sending JSON encoded SDP and ICE * Closure of the server connection means the call has ended; either because the other peer ended it or went away * To end the call, disconnect from the server. You may reconnect again whenever you wish. +### Multi-party calls with a 'room' + +* To create a multi-party call, you must first register (or join) a room. Send `ROOM ` where `` is a unique room name +* Receive `ROOM_OK ` from the server if this is a new room, or `ROOM_OK ...` where `` are unique identifiers for the peers already in the room +* To send messages to a specific peer within the room for call negotiation (or any other purpose, use `ROOM_PEER_MSG ` +* When a new peer joins the room, you will receive a `ROOM_PEER_JOINED ` message + - For the purposes of convention and to avoid overwhelming newly-joined peers, offers must only be sent by the newly-joined peer +* When a peer leaves the room, you will receive a `ROOM_PEER_LEFT ` message + - You should stop sending/receiving media from/to this peer +* To get a list of all peers currently in the room, send `ROOM_PEER_LIST` and receive `ROOM_PEER_LIST ...` + - This list will never contain your own `` + - In theory you should never need to use this since you are guaranteed to receive JOINED and LEFT messages for all peers in a room +* You may stay connected to a room for as long as you like + ### Negotiation Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other. diff --git a/webrtc/signalling/room-client.py b/webrtc/signalling/room-client.py new file mode 100755 index 0000000..e82a3f2 --- /dev/null +++ b/webrtc/signalling/room-client.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +# +# Test client for simple room-based multi-peer p2p calling +# +# Copyright (C) 2017 Centricular Ltd. +# +# Author: Nirbheek Chauhan +# + +import sys +import ssl +import json +import uuid +import asyncio +import websockets +import argparse + +parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to') +parser.add_argument('--room', default=None, help='the room to join') + +options = parser.parse_args(sys.argv[1:]) + +SERVER_ADDR = options.url +PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6] +ROOM_ID = options.room +if ROOM_ID is None: + print('--room argument is required') + sys.exit(1) + +sslctx = False +if SERVER_ADDR.startswith(('wss://', 'https://')): + sslctx = ssl.create_default_context() + # FIXME + sslctx.check_hostname = False + sslctx.verify_mode = ssl.CERT_NONE + +def get_answer_sdp(offer, peer_id): + # Here we'd parse the incoming JSON message for ICE and SDP candidates + print("Got: " + offer) + sdp = json.dumps({'sdp': 'reply sdp'}) + answer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp) + print("Sent: " + answer) + return answer + +def get_offer_sdp(peer_id): + sdp = json.dumps({'sdp': 'initial sdp'}) + offer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp) + print("Sent: " + offer) + return offer + +async def hello(): + async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws: + await ws.send('HELLO ' + PEER_ID) + assert(await ws.recv() == 'HELLO') + + await ws.send('ROOM {}'.format(ROOM_ID)) + + sent_offers = set() + # Receive messages + while True: + msg = await ws.recv() + if msg.startswith('ERROR'): + # On error, we bring down the webrtc pipeline, etc + print('{!r}, exiting'.format(msg)) + return + if msg.startswith('ROOM_OK'): + print('Got ROOM_OK for room {!r}'.format(ROOM_ID)) + _, *room_peers = msg.split() + for peer_id in room_peers: + print('Sending offer to {!r}'.format(peer_id)) + # Create a peer connection for each peer and start + # exchanging SDP and ICE candidates + await ws.send(get_offer_sdp(peer_id)) + sent_offers.add(peer_id) + continue + elif msg.startswith('ROOM_PEER'): + if msg.startswith('ROOM_PEER_JOINED'): + _, peer_id = msg.split(maxsplit=1) + print('Peer {!r} joined the room'.format(peer_id)) + # Peer will send us an offer + continue + if msg.startswith('ROOM_PEER_LEFT'): + _, peer_id = msg.split(maxsplit=1) + print('Peer {!r} left the room'.format(peer_id)) + continue + elif msg.startswith('ROOM_PEER_MSG'): + _, peer_id, msg = msg.split(maxsplit=2) + if peer_id in sent_offers: + print('Got answer from {!r}: {}'.format(peer_id, msg)) + continue + print('Got offer from {!r}, replying'.format(peer_id)) + await ws.send(get_answer_sdp(msg, peer_id)) + continue + print('Unknown msg: {!r}, exiting'.format(msg)) + return + +print('Our uid is {!r}'.format(PEER_ID)) + +try: + asyncio.get_event_loop().run_until_complete(hello()) +except websockets.exceptions.InvalidHandshake: + print('Invalid handshake: are you sure this is a websockets server?\n') + raise +except ssl.SSLError: + print('SSL Error: are you sure the server is using TLS?\n') + raise diff --git a/webrtc/signalling/simple-server.py b/webrtc/signalling/simple-server.py index 34af863..9714213 100755 --- a/webrtc/signalling/simple-server.py +++ b/webrtc/signalling/simple-server.py @@ -28,12 +28,19 @@ options = parser.parse_args(sys.argv[1:]) ADDR_PORT = (options.addr, options.port) KEEPALIVE_TIMEOUT = options.keepalive_timeout -# Format: {uid: (Peer WebSocketServerProtocol, remote_address)} +############### Global data ############### + +# Format: {uid: (Peer WebSocketServerProtocol, +# remote_address, +# <'session'|room_id|None>)} peers = dict() # Format: {caller_uid: callee_uid, # callee_uid: caller_uid} # Bidirectional mapping between the two peers sessions = dict() +# Format: {room_id: {peer1_id, peer2_id, peer3_id, ...}} +# Room dict with a set of peers in each room +rooms = dict() ############### Helper functions ############### @@ -65,7 +72,7 @@ async def disconnect(ws, peer_id): # Don't care about errors asyncio.ensure_future(ws.close(reason='hangup')) -async def remove_peer(uid): +async def cleanup_session(uid): if uid in sessions: other_id = sessions[uid] del sessions[uid] @@ -77,47 +84,129 @@ async def remove_peer(uid): # close the connection to reset its state. if other_id in peers: print("Closing connection to {}".format(other_id)) - wso, oaddr = peers[other_id] + wso, oaddr, _ = peers[other_id] del peers[other_id] await wso.close() + +async def cleanup_room(uid, room_id): + room_peers = rooms[room_id] + if uid not in room_peers: + return + room_peers.remove(uid) + for pid in room_peers: + wsp, paddr, _ = peers[pid] + msg = 'ROOM_PEER_LEFT {}'.format(uid) + print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg)) + await wsp.send(msg) + +async def remove_peer(uid): + await cleanup_session(uid) if uid in peers: - ws, raddr = peers[uid] + ws, raddr, status = peers[uid] + if status and status != 'session': + await cleanup_room(uid, status) del peers[uid] await ws.close() print("Disconnected from peer {!r} at {!r}".format(uid, raddr)) ############### Handler functions ############### -async def connection_handler(ws, peer_id): - global peers, sessions +async def connection_handler(ws, uid): + global peers, sessions, rooms raddr = ws.remote_address - peers[peer_id] = (ws, raddr) - print("Registered peer {!r} at {!r}".format(peer_id, raddr)) + peer_status = None + peers[uid] = [ws, raddr, peer_status] + print("Registered peer {!r} at {!r}".format(uid, raddr)) while True: # Receive command, wait forever if necessary msg = await recv_msg_ping(ws, raddr) - if msg.startswith('SESSION'): - print("{!r} command {!r}".format(peer_id, msg)) + # Update current status + peer_status = peers[uid][2] + # We are in a session or a room, messages must be relayed + if peer_status is not None: + # We're in a session, route message to connected peer + if peer_status == 'session': + other_id = sessions[uid] + wso, oaddr, status = peers[other_id] + assert(status == 'session') + print("{} -> {}: {}".format(uid, other_id, msg)) + await wso.send(msg) + # We're in a room, accept room-specific commands + elif peer_status: + # ROOM_PEER_MSG peer_id MSG + if msg.startswith('ROOM_PEER_MSG'): + _, other_id, msg = msg.split(maxsplit=2) + if other_id not in peers: + await ws.send('ERROR peer {!r} not found' + ''.format(other_id)) + continue + wso, oaddr, status = peers[other_id] + if status != room_id: + await ws.send('ERROR peer {!r} is not in the room' + ''.format(other_id)) + continue + msg = 'ROOM_PEER_MSG {} {}'.format(uid, msg) + print('room {}: {} -> {}: {}'.format(room_id, uid, other_id, msg)) + await wso.send(msg) + elif msg == 'ROOM_PEER_LIST': + room_id = peers[peer_id][2] + room_peers = ' '.join([pid for pid in rooms[room_id] if pid != peer_id]) + msg = 'ROOM_PEER_LIST {}'.format(room_peers) + print('room {}: -> {}: {}'.format(room_id, uid, msg)) + await ws.send(msg) + else: + await ws.send('ERROR invalid msg, already in room') + continue + else: + raise AssertionError('Unknown peer status {!r}'.format(peer_status)) + # Requested a session with a specific peer + elif msg.startswith('SESSION'): + print("{!r} command {!r}".format(uid, msg)) _, callee_id = msg.split(maxsplit=1) if callee_id not in peers: await ws.send('ERROR peer {!r} not found'.format(callee_id)) continue - if callee_id in sessions: + if peer_status is not None: await ws.send('ERROR peer {!r} busy'.format(callee_id)) continue await ws.send('SESSION_OK') wsc = peers[callee_id][0] - print("Session from {!r} ({!r}) to {!r} ({!r})".format(peer_id, raddr, callee_id, - wsc.remote_address)) - # Register call - sessions[peer_id] = callee_id - sessions[callee_id] = peer_id - # We're in a session, route message to connected peer - elif peer_id in sessions: - other_id = sessions[peer_id] - wso, oaddr = peers[other_id] - print("{} -> {}: {}".format(peer_id, other_id, msg)) - await wso.send(msg) + print('Session from {!r} ({!r}) to {!r} ({!r})' + ''.format(uid, raddr, callee_id, wsc.remote_address)) + # Register session + peers[uid][2] = peer_status = 'session' + sessions[uid] = callee_id + peers[callee_id][2] = 'session' + sessions[callee_id] = uid + # Requested joining or creation of a room + elif msg.startswith('ROOM'): + print('{!r} command {!r}'.format(uid, msg)) + _, room_id = msg.split(maxsplit=1) + # Room name cannot be 'session', empty, or contain whitespace + if room_id == 'session' or room_id.split() != [room_id]: + await ws.send('ERROR invalid room id {!r}'.format(room_id)) + continue + if room_id in rooms: + if uid in rooms[room_id]: + raise AssertionError('How did we accept a ROOM command ' + 'despite already being in a room?') + else: + # Create room if required + rooms[room_id] = set() + room_peers = ' '.join([pid for pid in rooms[room_id]]) + await ws.send('ROOM_OK {}'.format(room_peers)) + # Enter room + peers[uid][2] = peer_status = room_id + rooms[room_id].add(uid) + for pid in rooms[room_id]: + if pid == uid: + continue + wsp, paddr, _ = peers[pid] + msg = 'ROOM_PEER_JOINED {}'.format(uid) + print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg)) + await wsp.send(msg) + else: + print('Ignoring unknown message {!r} from {!r}'.format(msg, uid)) async def hello_peer(ws): ''' @@ -129,7 +218,7 @@ async def hello_peer(ws): if hello != 'HELLO': await ws.close(code=1002, reason='invalid protocol') raise Exception("Invalid hello from {!r}".format(raddr)) - if not uid or uid in peers: + if not uid or uid in peers or uid.split() != [uid]: # no whitespace await ws.close(code=1002, reason='invalid peer uid') raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr)) # Send back a HELLO -- 2.7.4