webrtc examples: Use webrtc.gstreamer.net
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
index 81f84d3..46c1b7d 100755 (executable)
@@ -6,6 +6,10 @@
 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
 # with a browser JS app, implemented in Python.
 
+from websockets.version import version as wsv
+from gi.repository import GstSdp
+from gi.repository import GstWebRTC
+from gi.repository import Gst
 import random
 import ssl
 import websockets
@@ -17,11 +21,8 @@ import argparse
 
 import gi
 gi.require_version('Gst', '1.0')
-from gi.repository import Gst
 gi.require_version('GstWebRTC', '1.0')
-from gi.repository import GstWebRTC
 gi.require_version('GstSdp', '1.0')
-from gi.repository import GstSdp
 
 # Ensure that gst-python is installed
 try:
@@ -31,16 +32,27 @@ except ImportError:
     raise
 
 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
-PIPELINE_DESC = '''
-webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
+PIPELINE_DESC_VP8 = '''
+webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
-  vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay !
-  queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
- audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
-  queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
+  vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
+  queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
+ audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
+  queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
+'''
+PIPELINE_DESC_H264 = '''
+webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
+ videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
+  x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
+  queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
+ audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
+  queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
 '''
 
-from websockets.version import version as wsv
+PIPELINE_DESC = {
+    'H264': PIPELINE_DESC_H264,
+    'VP8': PIPELINE_DESC_VP8,
+}
 
 
 def print_status(msg):
@@ -51,8 +63,35 @@ def print_error(msg):
     print(f'!!! {msg}', file=sys.stderr)
 
 
+def get_payload_types(sdpmsg, video_encoding, audio_encoding):
+    '''
+    Find the payload types for the specified video and audio encoding.
+
+    Very simplistically finds the first payload type matching the encoding
+    name. More complex applications will want to match caps on
+    profile-level-id, packetization-mode, etc.
+    '''
+    video_pt = None
+    audio_pt = None
+    for i in range(0, sdpmsg.medias_len()):
+        media = sdpmsg.get_media(i)
+        for j in range(0, media.formats_len()):
+            fmt = media.get_format(j)
+            if fmt == 'webrtc-datachannel':
+                continue
+            pt = int(fmt)
+            caps = media.get_caps_from_media(pt)
+            s = caps.get_structure(0)
+            encoding_name = s['encoding-name']
+            if video_pt is None and encoding_name == video_encoding:
+                video_pt = pt
+            elif audio_pt is None and encoding_name == audio_encoding:
+                audio_pt = pt
+    return {video_encoding: video_pt, audio_encoding: audio_pt}
+
+
 class WebRTCClient:
-    def __init__(self, loop, our_id, peer_id, server, remote_is_offerer):
+    def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
         self.conn = None
         self.pipe = None
         self.webrtc = None
@@ -66,6 +105,8 @@ class WebRTCClient:
         self.peer_id = peer_id
         # Whether we will send the offer or the remote peer will
         self.remote_is_offerer = remote_is_offerer
+        # Video encoding: VP8, H264, etc
+        self.video_encoding = video_encoding.upper()
 
     async def send(self, msg):
         assert self.conn
@@ -87,6 +128,23 @@ class WebRTCClient:
     def send_soon(self, msg):
         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
 
+    def on_bus_poll_cb(self, bus):
+        def remove_bus_poll():
+            self.event_loop.remove_reader(bus.get_pollfd().fd)
+            self.event_loop.stop()
+        while bus.peek():
+            msg = bus.pop()
+            if msg.type == Gst.MessageType.ERROR:
+                err = msg.parse_error()
+                print("ERROR:", err.gerror, err.debug)
+                remove_bus_poll()
+                break
+            elif msg.type == Gst.MessageType.EOS:
+                remove_bus_poll()
+                break
+            elif msg.type == Gst.MessageType.LATENCY:
+                self.pipe.recalculate_latency()
+
     def send_sdp(self, offer):
         text = offer.sdp.as_text()
         if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
@@ -100,7 +158,7 @@ class WebRTCClient:
         self.send_soon(msg)
 
     def on_offer_created(self, promise, _, __):
-        assert(promise.wait() == Gst.PromiseResult.REPLIED)
+        assert promise.wait() == Gst.PromiseResult.REPLIED
         reply = promise.get_reply()
         offer = reply['offer']
         promise = Gst.Promise.new()
@@ -114,10 +172,6 @@ class WebRTCClient:
             print_status('Call was connected: creating offer')
             promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
             self.webrtc.emit('create-offer', None, promise)
-        elif self.remote_is_offerer:
-            # We are initiating the call, but we want the remote peer to create the offer
-            print_status('Call was connected: requesting remote peer for offer')
-            self.send_soon('OFFER_REQUEST')
 
     def send_ice_candidate_message(self, _, mlineindex, candidate):
         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
@@ -165,11 +219,13 @@ class WebRTCClient:
         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
         self.pipe.add(decodebin)
         decodebin.sync_state_with_parent()
-        self.webrtc.link(decodebin)
+        pad.link(decodebin.get_static_pad('sink'))
 
-    def start_pipeline(self, create_offer=True):
+    def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
         print_status(f'Creating pipeline, create_offer: {create_offer}')
-        self.pipe = Gst.parse_launch(PIPELINE_DESC)
+        self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
+        bus = self.pipe.get_bus()
+        self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
         self.webrtc = self.pipe.get_by_name('sendrecv')
         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
@@ -178,7 +234,7 @@ class WebRTCClient:
         self.pipe.set_state(Gst.State.PLAYING)
 
     def on_answer_created(self, promise, _, __):
-        assert(promise.wait() == Gst.PromiseResult.REPLIED)
+        assert promise.wait() == Gst.PromiseResult.REPLIED
         reply = promise.get_reply()
         answer = reply['answer']
         promise = Gst.Promise.new()
@@ -187,12 +243,11 @@ class WebRTCClient:
         self.send_sdp(answer)
 
     def on_offer_set(self, promise, _, __):
-        assert(promise.wait() == Gst.PromiseResult.REPLIED)
+        assert promise.wait() == Gst.PromiseResult.REPLIED
         promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
         self.webrtc.emit('create-answer', None, promise)
 
     def handle_json(self, message):
-        assert (self.webrtc)
         try:
             msg = json.loads(message)
         except json.decoder.JSONDecoderError:
@@ -202,24 +257,35 @@ class WebRTCClient:
             sdp = msg['sdp']['sdp']
             if msg['sdp']['type'] == 'answer':
                 print_status('Received answer:\n%s' % sdp)
-                res, sdpmsg = GstSdp.SDPMessage.new()
-                GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
+                res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
                 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
                 promise = Gst.Promise.new()
                 self.webrtc.emit('set-remote-description', answer, promise)
                 promise.interrupt()  # we don't care about the result, discard it
             else:
                 print_status('Received offer:\n%s' % sdp)
-                res, sdpmsg = GstSdp.SDPMessage.new()
-                GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
+                res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
+
+                if not self.webrtc:
+                    print_status('Incoming call: received an offer, creating pipeline')
+                    pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
+                    assert self.video_encoding in pts
+                    assert 'OPUS' in pts
+                    self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
+
+                assert self.webrtc
+
                 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
                 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
                 self.webrtc.emit('set-remote-description', offer, promise)
         elif 'ice' in msg:
+            assert self.webrtc
             ice = msg['ice']
             candidate = ice['candidate']
             sdpmlineindex = ice['sdpMLineIndex']
             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
+        else:
+            print_error('Unknown JSON message')
 
     def close_pipeline(self):
         if self.pipe:
@@ -227,13 +293,6 @@ class WebRTCClient:
             self.pipe = None
         self.webrtc = None
 
-    def is_incoming_offer(self, msg):
-        if self.webrtc:
-            return False
-        if self.remote_is_offerer:
-            return True
-        return True
-
     async def loop(self):
         assert self.conn
         async for message in self.conn:
@@ -252,7 +311,9 @@ class WebRTCClient:
                     await self.setup_call()
             elif message == 'SESSION_OK':
                 if self.remote_is_offerer:
-                    self.start_pipeline(create_offer=False)
+                    # We are initiating the call, but we want the remote peer to create the offer
+                    print_status('Call was connected: requesting remote peer for offer')
+                    await self.send('OFFER_REQUEST')
                 else:
                     self.start_pipeline()
             elif message == 'OFFER_REQUEST':
@@ -263,9 +324,6 @@ class WebRTCClient:
                 self.close_pipeline()
                 return 1
             else:
-                if self.is_incoming_offer(message):
-                    print_status('Incoming call: received an offer, creating pipeline')
-                    self.start_pipeline(create_offer=False)
                 self.handle_json(message)
         self.close_pipeline()
         return 0
@@ -291,9 +349,11 @@ if __name__ == '__main__':
     if not check_plugins():
         sys.exit(1)
     parser = argparse.ArgumentParser()
+    parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
+                        help='Video encoding to negotiate')
     parser.add_argument('--peer-id', help='String ID of the peer to connect to')
     parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
-    parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
+    parser.add_argument('--server', default='wss://webrtc.gstreamer.net:8443',
                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
     parser.add_argument('--remote-offerer', default=False, action='store_true',
                         dest='remote_is_offerer',
@@ -303,7 +363,7 @@ if __name__ == '__main__':
         print('You must pass either --peer-id or --our-id')
         sys.exit(1)
     loop = asyncio.new_event_loop()
-    c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer)
+    c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
     loop.run_until_complete(c.connect())
     res = loop.run_until_complete(c.loop())
     sys.exit(res)