webrtc_sendrecv.py: Implement all negotiation modes
authorNirbheek Chauhan <nirbheek@centricular.com>
Sat, 5 Mar 2022 14:36:37 +0000 (20:06 +0530)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 18 Mar 2022 08:16:46 +0000 (08:16 +0000)
Earlier, the example only supported one negotiation mode:
* Browser client is running, gstreamer starts a call and sends offer

Now these three modes are also supported:
* Browser client is running, gstreamer starts a call and sends an
  offer request
* gstreamer connects and waits for browser client to start a call and
  send an offer
* gstreamer connects and waits for browser client to start a call and
  send an offer request

The following features are still missing:
* Data channel support
* TWCC support + stats logging

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1864>

subprojects/gst-examples/webrtc/sendrecv/gst/webrtc-sendrecv.c
subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py

index 6ba6909..98f2155 100644 (file)
@@ -59,7 +59,7 @@ static GOptionEntry entries[] = {
   {"peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id,
       "String ID of the peer to connect to", "ID"},
   {"our-id", 0, 0, G_OPTION_ARG_STRING, &our_id,
-      "String ID of the session that peer can connect to us", "ID"},
+      "String ID that the peer can use to connect to us", "ID"},
   {"server", 0, 0, G_OPTION_ARG_STRING, &server_url,
       "Signalling server to connect to", "URL"},
   {"disable-ssl", 0, 0, G_OPTION_ARG_NONE, &disable_ssl, "Disable ssl", NULL},
index 6822434..1502eec 100755 (executable)
@@ -37,50 +37,81 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl
 from websockets.version import version as wsv
 
 
+def print_status(msg):
+    print(f'--- {msg}')
+
+
+def print_error(msg):
+    print(f'!!! {msg}', file=sys.stderr)
+
+
 class WebRTCClient:
-    def __init__(self, loop, id_, peer_id, server):
-        self.event_loop = loop
-        self.id_ = id_
+    def __init__(self, loop, our_id, peer_id, server, remote_is_offerer):
         self.conn = None
         self.pipe = None
         self.webrtc = None
-        self.peer_id = peer_id
+        self.event_loop = loop
         self.server = server
+        # An optional user-specified ID we can use to register
+        self.our_id = our_id
+        # The actual ID we used to register
+        self.id_ = None
+        # An optional peer ID we should connect to
+        self.peer_id = peer_id
+        # Whether we will send the offer or the remote peer will
+        self.remote_is_offerer = remote_is_offerer
 
     async def send(self, msg):
         assert self.conn
-        print(f'>>> Sending {msg}')
+        print(f'>>> {msg}')
         await self.conn.send(msg)
 
     async def connect(self):
         self.conn = await websockets.connect(self.server)
-        await self.send('HELLO %d' % self.id_)
+        if self.our_id is None:
+            self.id_ = str(random.randrange(10, 10000))
+        else:
+            self.id_ = self.our_id
+        await self.send(f'HELLO {self.id_}')
 
     async def setup_call(self):
-        await self.send('SESSION {}'.format(self.peer_id))
+        assert self.peer_id
+        await self.send(f'SESSION {self.peer_id}')
 
     def send_soon(self, msg):
         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
 
-    def send_sdp_offer(self, offer):
+    def send_sdp(self, offer):
         text = offer.sdp.as_text()
-        print('Sending offer:\n%s' % text)
-        msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
+        if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
+            print_status('Sending offer:\n%s' % text)
+            msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
+        elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
+            print_status('Sending answer:\n%s' % text)
+            msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
+        else:
+            raise AssertionError(offer.type)
         self.send_soon(msg)
 
     def on_offer_created(self, promise, _, __):
-        promise.wait()
+        assert(promise.wait() == Gst.PromiseResult.REPLIED)
         reply = promise.get_reply()
         offer = reply['offer']
         promise = Gst.Promise.new()
-        print('Offer created, setting local description')
+        print_status('Offer created, setting local description')
         self.webrtc.emit('set-local-description', offer, promise)
-        promise.interrupt()
-        self.send_sdp_offer(offer)
+        promise.interrupt()  # we don't care about the result, discard it
+        self.send_sdp(offer)
 
-    def on_negotiation_needed(self, element):
-        promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
-        element.emit('create-offer', None, promise)
+    def on_negotiation_needed(self, _, create_offer):
+        if create_offer:
+            print_status('Call was connected: creating offer')
+            promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
+            self.webrtc.emit('create-offer', None, promise)
+        elif self.remote_is_offerer:
+            # We are initiating the call, but we want the remote peer to create the offer
+            print_status('Call was connected: requesting remote peer for offer')
+            self.send_soon('OFFER_REQUEST')
 
     def send_ice_candidate_message(self, _, mlineindex, candidate):
         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
@@ -88,7 +119,7 @@ class WebRTCClient:
 
     def on_incoming_decodebin_stream(self, _, pad):
         if not pad.has_current_caps():
-            print(pad, 'has no caps, ignoring')
+            print_error(pad, 'has no caps, ignoring')
             return
 
         caps = pad.get_current_caps()
@@ -116,6 +147,10 @@ class WebRTCClient:
             conv.link(resample)
             resample.link(sink)
 
+    def on_ice_gathering_state_notify(self, pspec, _):
+        state = self.webrtc.get_property('ice-gathering-state')
+        print_status(f'ICE gathering state changed to {state}')
+
     def on_incoming_stream(self, _, pad):
         if pad.direction != Gst.PadDirection.SRC:
             return
@@ -126,28 +161,54 @@ class WebRTCClient:
         decodebin.sync_state_with_parent()
         self.webrtc.link(decodebin)
 
-    def start_pipeline(self):
+    def start_pipeline(self, create_offer=True):
+        print_status(f'Creating pipeline, create_offer: {create_offer}')
         self.pipe = Gst.parse_launch(PIPELINE_DESC)
         self.webrtc = self.pipe.get_by_name('sendrecv')
-        self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
+        self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
+        self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
         self.webrtc.connect('pad-added', self.on_incoming_stream)
         self.pipe.set_state(Gst.State.PLAYING)
 
-    def handle_sdp(self, message):
+    def on_answer_created(self, promise, _, __):
+        assert(promise.wait() == Gst.PromiseResult.REPLIED)
+        reply = promise.get_reply()
+        answer = reply['answer']
+        promise = Gst.Promise.new()
+        self.webrtc.emit('set-local-description', answer, promise)
+        promise.interrupt()  # we don't care about the result, discard it
+        self.send_sdp(answer)
+
+    def on_offer_set(self, promise, _, __):
+        assert(promise.wait() == Gst.PromiseResult.REPLIED)
+        promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
+        self.webrtc.emit('create-answer', None, promise)
+
+    def handle_json(self, message):
         assert (self.webrtc)
-        msg = json.loads(message)
+        try:
+            msg = json.loads(message)
+        except json.decoder.JSONDecoderError:
+            print_error('Failed to parse JSON message, this might be a bug')
+            raise
         if 'sdp' in msg:
-            sdp = msg['sdp']
-            assert(sdp['type'] == 'answer')
-            sdp = sdp['sdp']
-            print('Received answer:\n%s' % sdp)
-            res, sdpmsg = GstSdp.SDPMessage.new()
-            GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
-            answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
-            promise = Gst.Promise.new()
-            self.webrtc.emit('set-remote-description', answer, promise)
-            promise.interrupt()
+            sdp = msg['sdp']['sdp']
+            if msg['sdp']['type'] == 'answer':
+                print_status('Received answer:\n%s' % sdp)
+                res, sdpmsg = GstSdp.SDPMessage.new()
+                GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
+                answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
+                promise = Gst.Promise.new()
+                self.webrtc.emit('set-remote-description', answer, promise)
+                promise.interrupt()  # we don't care about the result, discard it
+            else:
+                print_status('Received offer:\n%s' % sdp)
+                res, sdpmsg = GstSdp.SDPMessage.new()
+                GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
+                offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
+                promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
+                self.webrtc.emit('set-remote-description', offer, promise)
         elif 'ice' in msg:
             ice = msg['ice']
             candidate = ice['candidate']
@@ -160,19 +221,46 @@ class WebRTCClient:
             self.pipe = None
         self.webrtc = None
 
+    def is_incoming_offer(self, msg):
+        if self.webrtc:
+            return False
+        if self.remote_is_offerer:
+            return True
+        return True
+
     async def loop(self):
         assert self.conn
         async for message in self.conn:
+            print(f'<<< {message}')
             if message == 'HELLO':
-                await self.setup_call()
+                assert self.id_
+                # If a peer ID is specified, we want to connect to it. If not,
+                # we wait for an incoming call.
+                if not self.peer_id:
+                    print_status(f'Waiting for incoming call: ID is {self.id_}')
+                else:
+                    if self.remote_is_offerer:
+                        print_status('Have peer ID: initiating call (will request remote peer to create offer)')
+                    else:
+                        print_status('Have peer ID: initiating call (will create offer)')
+                    await self.setup_call()
             elif message == 'SESSION_OK':
+                if self.remote_is_offerer:
+                    self.start_pipeline(create_offer=False)
+                else:
+                    self.start_pipeline()
+            elif message == 'OFFER_REQUEST':
+                print_status('Incoming call: we have been asked to create the offer')
                 self.start_pipeline()
             elif message.startswith('ERROR'):
-                print(message)
+                print_error(message)
                 self.close_pipeline()
                 return 1
             else:
-                self.handle_sdp(message)
+                if self.is_incoming_offer(message):
+                    print_status('Incoming call: received an offer, creating pipeline')
+                    self.start_pipeline(create_offer=False)
+                self.handle_json(message)
         self.close_pipeline()
         return 0
 
@@ -187,7 +275,7 @@ def check_plugins():
               "rtpmanager", "videotestsrc", "audiotestsrc"]
     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
     if len(missing):
-        print('Missing gstreamer plugins:', missing)
+        print_error('Missing gstreamer plugins:', missing)
         return False
     return True
 
@@ -197,13 +285,19 @@ if __name__ == '__main__':
     if not check_plugins():
         sys.exit(1)
     parser = argparse.ArgumentParser()
-    parser.add_argument('peerid', help='String ID of the peer to connect to')
+    parser.add_argument('--peer-id', help='String ID of the peer to connect to')
+    parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
     parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
+    parser.add_argument('--remote-offerer', default=False, action='store_true',
+                        dest='remote_is_offerer',
+                        help='Request that the peer generate the offer and we\'ll answer')
     args = parser.parse_args()
-    our_id = random.randrange(10, 10000)
+    if not args.peer_id and not args.our_id:
+        print('You must pass either --peer-id or --our-id')
+        sys.exit(1)
     loop = asyncio.new_event_loop()
-    c = WebRTCClient(loop, our_id, args.peerid, args.server)
+    c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer)
     loop.run_until_complete(c.connect())
     res = loop.run_until_complete(c.loop())
     sys.exit(res)