sendrecv: python version
authorMathieu Duponchelle <mathieu@centricular.com>
Mon, 11 Jun 2018 16:49:53 +0000 (18:49 +0200)
committerMathieu Duponchelle <mathieu@centricular.com>
Mon, 11 Jun 2018 16:49:53 +0000 (18:49 +0200)
webrtc/README.md
webrtc/sendrecv/gst/webrtc-sendrecv.py [new file with mode: 0644]

index 5e8ac67..5456c90 100644 (file)
@@ -56,17 +56,31 @@ http://blog.nirbheek.in/2018/02/gstreamer-webrtc.html
 
 * Serve the `js/` directory on the root of your website, or open https://webrtc.nirbheek.in
   - The JS code assumes the signalling server is on port 8443 of the same server serving the HTML
+
+* Open the website in a browser and ensure that the status is "Registered with server, waiting for call", and note the `id` too.
+
+#### Running the C version
+
 * Build the sources in the `gst/` directory on your machine
 
 ```console
 $ gcc webrtc-sendrecv.c $(pkg-config --cflags --libs gstreamer-webrtc-1.0 gstreamer-sdp-1.0 libsoup-2.4 json-glib-1.0) -o webrtc-sendrecv
 ```
 
-* Open the website in a browser and ensure that the status is "Registered with server, waiting for call", and note the `id` too.
 * Run `webrtc-sendrecv --peer-id=ID` with the `id` from the browser. You will see state changes and an SDP exchange.
-* You will see a bouncing ball + hear red noise in the browser, and your browser's webcam + mic in the gst app
 
-TODO: Port to Python and Rust.
+#### Running the Python version
+
+* python3 -m pip install --user websockets
+* run `python3 sendrecv/gst/webrtc-sendrecv.py ID` with the `id` from the browser. You will see state changes and an SDP exchange.
+
+With all versions, you will see a bouncing ball + hear red noise in the browser, and your browser's webcam + mic in the gst app.
+
+You can pass a --server argument to all versions, for example `--server=wss://127.0.0.1:8443`.
+
+<!---
+TODO: Port to Rust.
+-->
 
 ### multiparty-sendrecv: Multiparty audio conference with N peers
 
diff --git a/webrtc/sendrecv/gst/webrtc-sendrecv.py b/webrtc/sendrecv/gst/webrtc-sendrecv.py
new file mode 100644 (file)
index 0000000..dee4800
--- /dev/null
@@ -0,0 +1,160 @@
+import random
+import ssl
+import websockets
+import asyncio
+import os
+import sys
+import json
+import argparse
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+gi.require_version('GstWebRTC', '1.0')
+from gi.repository import GstWebRTC
+gi.require_version('GstSdp', '1.0')
+from gi.repository import GstSdp
+
+PIPELINE_DESC = '''
+webrtcbin name=sendrecv
+ videotestsrc pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
+ queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
+ audiotestsrc wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
+ queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
+'''
+
+class WebRTCClient:
+    def __init__(self, id_, peer_id, server):
+        self.id_ = id_
+        self.conn = None
+        self.pipe = None
+        self.webrtc = None
+        self.peer_id = peer_id
+        self.server = server or 'wss://webrtc.nirbheek.in:8443'
+
+    async def connect(self):
+        sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
+        self.conn = await websockets.connect(self.server, ssl=sslctx)
+        await self.conn.send('HELLO %d' % our_id)
+
+    async def setup_call(self):
+        await self.conn.send('SESSION {}'.format(self.peer_id))
+
+    def send_sdp_offer(self, offer):
+        text = offer.sdp.as_text()
+        print ('Sending offer:\n%s', text)
+        msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
+        loop = asyncio.new_event_loop()
+        loop.run_until_complete(self.conn.send(msg))
+
+    def on_offer_created(self, promise, _, __):
+        promise.wait()
+        reply = promise.get_reply()
+        offer = reply['offer']
+        promise = Gst.Promise.new()
+        self.webrtc.emit('set-local-description', offer, promise)
+        promise.interrupt()
+        self.send_sdp_offer(offer)
+
+    def on_negotiation_needed(self, element):
+        promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
+        element.emit('create-offer', None, promise)
+
+    def send_ice_candidate_message(self, _, mlineindex, candidate):
+        icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
+        loop = asyncio.new_event_loop()
+        loop.run_until_complete(self.conn.send(icemsg))
+
+    def on_incoming_decodebin_stream(self, _, pad):
+        if not pad.has_current_caps():
+            print (pad, 'has no caps, ignoring')
+            return
+
+        caps = pad.get_current_caps()
+        assert (len(caps))
+        s = caps[0]
+        name = s.get_name()
+        if name.startswith('video'):
+            q = Gst.ElementFactory.make('queue')
+            conv = Gst.ElementFactory.make('videoconvert')
+            sink = Gst.ElementFactory.make('autovideosink')
+            self.pipe.add(q, conv, sink)
+            self.pipe.sync_children_states()
+            pad.link(q.get_static_pad('sink'))
+            q.link(conv)
+            conv.link(sink)
+        elif name.startswith('audio'):
+            q = Gst.ElementFactory.make('queue')
+            conv = Gst.ElementFactory.make('audioconvert')
+            resample = Gst.ElementFactory.make('audioresample')
+            sink = Gst.ElementFactory.make('autoaudiosink')
+            self.pipe.add(q, conv, resample, sink)
+            self.pipe.sync_children_states()
+            pad.link(q.get_static_pad('sink'))
+            q.link(conv)
+            conv.link(resample)
+            resample.link(sink)
+
+    def on_incoming_stream(self, _, pad):
+        if pad.direction != Gst.PadDirection.SRC:
+            return
+
+        decodebin = Gst.ElementFactory.make('decodebin')
+        decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
+        self.pipe.add(decodebin)
+        decodebin.sync_state_with_parent()
+        self.webrtc.link(decodebin)
+
+    def start_pipeline(self):
+        self.pipe = Gst.parse_launch(PIPELINE_DESC)
+        self.webrtc = self.pipe.get_by_name('sendrecv')
+        self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
+        self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
+        self.webrtc.connect('pad-added', self.on_incoming_stream)
+        self.pipe.set_state(Gst.State.PLAYING)
+
+    async def handle_sdp(self, message):
+        assert (self.webrtc)
+        msg = json.loads(message)
+        if 'sdp' in msg:
+            sdp = msg['sdp']
+            assert(sdp['type'] == 'answer')
+            sdp = sdp['sdp']
+            print ('Received answer:\n%s', sdp)
+            res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
+            answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
+            promise = Gst.Promise.new()
+            self.webrtc.emit('set-remote-description', answer, promise)
+            promise.interrupt()
+        elif 'ice' in msg:
+            ice = msg['ice']
+            candidate = ice['candidate']
+            sdpmlineindex = ice['sdpMLineIndex']
+            self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
+
+    async def loop(self):
+        assert self.conn
+        async for message in self.conn:
+            if message == 'HELLO':
+                await self.setup_call()
+            elif message == 'SESSION_OK':
+                self.start_pipeline()
+            elif message.startswith('ERROR'):
+                print (message)
+                return 1
+            else:
+                await self.handle_sdp(message)
+        return 0
+
+
+if __name__=='__main__':
+    Gst.init(None)
+    parser = argparse.ArgumentParser()
+    parser.add_argument('peerid', help='String ID of the peer to connect to')
+    parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
+    args = parser.parse_args()
+    our_id = random.randrange(10, 10000)
+    c = WebRTCClient(our_id, args.peerid, args.server)
+    asyncio.get_event_loop().run_until_complete(c.connect())
+    res = asyncio.get_event_loop().run_until_complete(c.loop())
+    sys.exit(res)