Add python Janus videoroom streaming example.
authorJan Schmidt <jan@centricular.com>
Tue, 14 Jan 2020 23:47:27 +0000 (10:47 +1100)
committerJan Schmidt <jan@centricular.com>
Tue, 14 Jan 2020 23:47:27 +0000 (10:47 +1100)
Added with permission and copyright @tobiasfriden and @saket424
on github. See https://github.com/centricular/gstwebrtc-demos/issues/66

webrtc/janus/janusvideoroom.py [new file with mode: 0644]

diff --git a/webrtc/janus/janusvideoroom.py b/webrtc/janus/janusvideoroom.py
new file mode 100644 (file)
index 0000000..80ee47f
--- /dev/null
@@ -0,0 +1,443 @@
+# Janus Videoroom example
+# Copyright @tobiasfriden and @saket424 on github
+# See https://github.com/centricular/gstwebrtc-demos/issues/66
+# Copyright Jan Schmidt <jan@centricular.com> 2020
+import random
+import ssl
+import websockets
+import asyncio
+import os
+import sys
+import json
+import argparse
+import string
+from websockets.exceptions import ConnectionClosed
+
+import attr
+
+@attr.s
+class JanusEvent:
+    sender = attr.ib(validator=attr.validators.instance_of(int))
+
+@attr.s
+class PluginData(JanusEvent):
+    plugin = attr.ib(validator=attr.validators.instance_of(str))
+    data = attr.ib()
+    jsep = attr.ib()
+
+@attr.s
+class WebrtcUp(JanusEvent):
+    pass
+
+@attr.s
+class Media(JanusEvent):
+    receiving = attr.ib(validator=attr.validators.instance_of(bool))
+    kind = attr.ib(validator=attr.validators.in_(["audio", "video"]))
+
+    @kind.validator
+    def validate_kind(self, attribute, kind):
+        if kind not in ["video", "audio"]:
+            raise ValueError("kind must equal video or audio")
+
+@attr.s
+class SlowLink(JanusEvent):
+    uplink = attr.ib(validator=attr.validators.instance_of(bool))
+    lost = attr.ib(validator=attr.validators.instance_of(int))
+
+@attr.s
+class HangUp(JanusEvent):
+    reason = attr.ib(validator=attr.validators.instance_of(str))
+
+@attr.s(cmp=False)
+class Ack:
+    transaction = attr.ib(validator=attr.validators.instance_of(str))
+
+@attr.s
+class Jsep:
+    sdp = attr.ib()
+    type = attr.ib(validator=attr.validators.in_(["offer", "pranswer", "answer", "rollback"]))
+
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+gi.require_version('GstWebRTC', '1.0')
+from gi.repository import GstWebRTC
+gi.require_version('GstSdp', '1.0')
+from gi.repository import GstSdp
+
+DO_VP8 = True
+
+if DO_VP8:
+    ( encoder, payloader, rtp_encoding) = ( "vp8enc target-bitrate=500000", "rtpvp8pay", "VP8" )
+else:
+    ( encoder, payloader, rtp_encoding) = ( "x264enc", "rtph264pay", "H264" )
+
+PIPELINE_DESC = '''
+ webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302
+ videotestsrc pattern=ball ! video/x-raw,width=320,height=240 ! videoconvert ! queue !
+ {} ! {} !  queue ! application/x-rtp,media=video,encoding-name={},payload=96 ! sendrecv.
+'''.format(encoder, payloader, rtp_encoding)
+
+def transaction_id():
+    return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8))
+
+@attr.s
+class JanusGateway:
+    server = attr.ib(validator=attr.validators.instance_of(str))
+    #secure = attr.ib(default=True)
+    _messages = attr.ib(factory=set)
+    conn = None
+
+    async def connect(self):
+        sslCon=None
+        if self.server.startswith("wss"):
+            sslCon=ssl.SSLContext()
+        self.conn = await websockets.connect(self.server, subprotocols=['janus-protocol'], ssl=sslCon)
+        transaction = transaction_id()
+        await self.conn.send(json.dumps({
+            "janus": "create",
+            "transaction": transaction
+            }))
+        resp = await self.conn.recv()
+        print (resp)
+        parsed = json.loads(resp)
+        assert parsed["janus"] == "success", "Failed creating session"
+        assert parsed["transaction"] == transaction, "Incorrect transaction"
+        self.session = parsed["data"]["id"]
+
+    async def close(self):
+        if self.conn:
+            await self.conn.close()
+
+    async def attach(self, plugin):
+        assert hasattr(self, "session"), "Must connect before attaching to plugin"
+        transaction = transaction_id()
+        await self.conn.send(json.dumps({
+            "janus": "attach",
+            "session_id": self.session,
+            "plugin": plugin,
+            "transaction": transaction
+        }))
+        resp = await self.conn.recv()
+        parsed = json.loads(resp)
+        assert parsed["janus"] == "success", "Failed attaching to {}".format(plugin)
+        assert parsed["transaction"] == transaction, "Incorrect transaction"
+        self.handle = parsed["data"]["id"]
+
+    async def sendtrickle(self, candidate):
+        assert hasattr(self, "session"), "Must connect before sending messages"
+        assert hasattr(self, "handle"), "Must attach before sending messages"
+
+        transaction = transaction_id()
+        janus_message = {
+            "janus": "trickle",
+            "session_id": self.session,
+            "handle_id": self.handle,
+            "transaction": transaction,
+            "candidate": candidate
+        }
+
+        await self.conn.send(json.dumps(janus_message))
+
+        #while True:
+        #    resp = await self._recv_and_parse()
+        #    if isinstance(resp, PluginData):
+        #        return resp
+        #    else:
+        #        self._messages.add(resp)
+#
+    async def sendmessage(self, body, jsep=None):
+        assert hasattr(self, "session"), "Must connect before sending messages"
+        assert hasattr(self, "handle"), "Must attach before sending messages"
+
+        transaction = transaction_id()
+        janus_message = {
+            "janus": "message",
+            "session_id": self.session,
+            "handle_id": self.handle,
+            "transaction": transaction,
+            "body": body
+        }
+        if jsep is not None:
+            janus_message["jsep"] = jsep
+
+        await self.conn.send(json.dumps(janus_message))
+
+        #while True:
+        #    resp = await self._recv_and_parse()
+        #    if isinstance(resp, PluginData):
+        #        if jsep is not None:
+        #            await client.handle_sdp(resp.jsep)
+        #        return resp
+        #    else:
+        #        self._messages.add(resp)
+
+    async def keepalive(self):
+        assert hasattr(self, "session"), "Must connect before sending messages"
+        assert hasattr(self, "handle"), "Must attach before sending messages"
+
+        while True:
+            try:
+                await asyncio.sleep(10)
+                transaction = transaction_id()
+                await self.conn.send(json.dumps({
+                    "janus": "keepalive",
+                    "session_id": self.session,
+                    "handle_id": self.handle,
+                    "transaction": transaction
+                }))
+            except KeyboardInterrupt:
+                return
+
+    async def recv(self):
+        if len(self._messages) > 0:
+            return self._messages.pop()
+        else:
+            return await self._recv_and_parse()
+
+    async def _recv_and_parse(self):
+        raw = json.loads(await self.conn.recv())
+        janus = raw["janus"]
+
+        if janus == "event":
+            return PluginData(
+                sender=raw["sender"],
+                plugin=raw["plugindata"]["plugin"],
+                data=raw["plugindata"]["data"],
+                jsep=raw["jsep"] if "jsep" in raw else None
+            )
+        elif janus == "webrtcup":
+            return WebrtcUp(
+                sender=raw["sender"]
+            )
+        elif janus == "media":
+            return Media(
+                sender=raw["sender"],
+                receiving=raw["receiving"],
+                kind=raw["type"]
+            )
+        elif janus == "slowlink":
+            return SlowLink(
+                sender=raw["sender"],
+                uplink=raw["uplink"],
+                lost=raw["lost"]
+            )
+        elif janus == "hangup":
+            return HangUp(
+                sender=raw["sender"],
+                reason=raw["reason"]
+            )
+        elif janus == "ack":
+            return Ack(
+                transaction=raw["transaction"]
+            )
+        else:
+            return raw
+
+class WebRTCClient:
+    def __init__(self, id_, peer_id, server, signaling):
+        self.id_ = id_
+        self.conn = None
+        self.pipe = None
+        self.webrtc = None
+        self.peer_id = peer_id
+        self.server = server or 'wss://127.0.0.1:8989'
+        self.signaling = signaling
+        self.request = None
+        self.offermsg = None
+
+    def send_sdp_offer(self, offer):
+        text = offer.sdp.as_text()
+        print ('Sending offer:\n%s' % text)
+        # configure media
+        media = {'audio': True, 'video': True}
+        request = {'request': 'publish'}
+        request.update(media)
+        self.request = request
+        self.offermsg = { 'sdp': text, 'trickle': True, 'type': 'offer' }
+        print (self.offermsg)
+        loop = asyncio.new_event_loop()
+        loop.run_until_complete(self.signaling.sendmessage(self.request, self.offermsg))
+
+    def on_offer_created(self, promise, _, __):
+        promise.wait()
+        reply = promise.get_reply()
+        offer = reply.get_value('offer')
+        promise = Gst.Promise.new()
+        self.webrtc.emit('set-local-description', offer, promise)
+        promise.interrupt()
+        self.send_sdp_offer(offer)
+
+    def on_negotiation_needed(self, element):
+        promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
+        element.emit('create-offer', None, promise)
+
+    def send_ice_candidate_message(self, _, mlineindex, candidate):
+        icemsg = {'candidate': candidate, 'sdpMLineIndex': mlineindex}
+        print ("Sending ICE", icemsg)
+        loop = asyncio.new_event_loop()
+        loop.run_until_complete(self.signaling.sendtrickle(icemsg))
+
+    def on_incoming_decodebin_stream(self, _, pad):
+        if not pad.has_current_caps():
+            print (pad, 'has no caps, ignoring')
+            return
+
+        caps = pad.get_current_caps()
+        name = caps.to_string()
+        if name.startswith('video'):
+            q = Gst.ElementFactory.make('queue')
+            conv = Gst.ElementFactory.make('videoconvert')
+            sink = Gst.ElementFactory.make('autovideosink')
+            self.pipe.add(q)
+            self.pipe.add(conv)
+            self.pipe.add(sink)
+            self.pipe.sync_children_states()
+            pad.link(q.get_static_pad('sink'))
+            q.link(conv)
+            conv.link(sink)
+        elif name.startswith('audio'):
+            q = Gst.ElementFactory.make('queue')
+            conv = Gst.ElementFactory.make('audioconvert')
+            resample = Gst.ElementFactory.make('audioresample')
+            sink = Gst.ElementFactory.make('autoaudiosink')
+            self.pipe.add(q)
+            self.pipe.add(conv)
+            self.pipe.add(resample)
+            self.pipe.add(sink)
+            self.pipe.sync_children_states()
+            pad.link(q.get_static_pad('sink'))
+            q.link(conv)
+            conv.link(resample)
+            resample.link(sink)
+
+    def on_incoming_stream(self, _, pad):
+        if pad.direction != Gst.PadDirection.SRC:
+            return
+
+        decodebin = Gst.ElementFactory.make('decodebin')
+        decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
+        self.pipe.add(decodebin)
+        decodebin.sync_state_with_parent()
+        self.webrtc.link(decodebin)
+
+    def start_pipeline(self):
+        self.pipe = Gst.parse_launch(PIPELINE_DESC)
+        self.webrtc = self.pipe.get_by_name('sendrecv')
+        self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
+        self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
+        self.webrtc.connect('pad-added', self.on_incoming_stream)
+        self.pipe.set_state(Gst.State.PLAYING)
+
+    def extract_ice_from_sdp(self, sdp):
+        mlineindex = -1
+        for line in sdp.splitlines():
+            if line.startswith("a=candidate"):
+                candidate = line[2:]
+                if mlineindex < 0:
+                    print("Received ice candidate in SDP before any m= line")
+                    continue
+                print ('Received remote ice-candidate mlineindex {}: {}'.format(mlineindex, candidate))
+                self.webrtc.emit('add-ice-candidate', mlineindex, candidate)
+            elif line.startswith("m="):
+                mlineindex += 1
+
+    async def handle_sdp(self, msg):
+        print (msg)
+        if 'sdp' in msg:
+            sdp = msg['sdp']
+            assert(msg['type'] == 'answer')
+            print ('Received answer:\n%s' % sdp)
+            res, sdpmsg = GstSdp.SDPMessage.new()
+            GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
+
+            answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
+            promise = Gst.Promise.new()
+            self.webrtc.emit('set-remote-description', answer, promise)
+            promise.interrupt()
+
+            # Extract ICE candidates from the SDP to work around a GStreamer
+            # limitation in (at least) 1.16.2 and below
+            self.extract_ice_from_sdp (sdp)
+
+        elif 'ice' in msg:
+            ice = msg['ice']
+            candidate = ice['candidate']
+            sdpmlineindex = ice['sdpMLineIndex']
+            self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
+
+    async def loop(self, signaling):
+        await signaling.connect()
+        await signaling.attach("janus.plugin.videoroom")
+
+        loop = asyncio.get_event_loop()
+        loop.create_task(signaling.keepalive())
+        #asyncio.create_task(self.keepalive())
+
+        joinmessage = { "request": "join", "ptype": "publisher", "room": 1234, "display": self.peer_id }
+        await signaling.sendmessage(joinmessage)
+
+        assert signaling.conn
+        self.start_pipeline()
+
+        while True:
+            try:
+                msg = await signaling.recv()
+                if isinstance(msg, PluginData):
+                    if msg.jsep is not None:
+                        await self.handle_sdp(msg.jsep)
+                elif isinstance(msg, Media):
+                    print (msg)
+                elif isinstance(msg, WebrtcUp):
+                    print (msg)
+                elif isinstance(msg, SlowLink):
+                    print (msg)
+                elif isinstance(msg, HangUp):
+                    print (msg)
+                elif not isinstance(msg, Ack):
+                    if 'candidate' in msg:
+                       ice = msg['candidate']
+                       print (ice)
+                       if 'candidate' in ice:
+                           candidate = ice['candidate']
+                           sdpmlineindex = ice['sdpMLineIndex']
+                           self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
+                    print(msg)
+            except (KeyboardInterrupt, ConnectionClosed):
+                return
+
+        return 0
+
+
+def check_plugins():
+    needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
+              "rtpmanager", "videotestsrc", "audiotestsrc"]
+    missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
+    if len(missing):
+        print('Missing gstreamer plugins:', missing)
+        return False
+    return True
+
+
+if __name__=='__main__':
+    Gst.init(None)
+    if not check_plugins():
+        sys.exit(1)
+    parser = argparse.ArgumentParser()
+    parser.add_argument('label', help='videoroom label')
+    parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8989"')
+    args = parser.parse_args()
+    our_id = random.randrange(10, 10000)
+    signaling = JanusGateway(args.server)
+    c = WebRTCClient(our_id, args.label, args.server, signaling)
+    loop = asyncio.get_event_loop()
+    try:
+        loop.run_until_complete(
+            c.loop(signaling)
+        )
+    except KeyboardInterrupt:
+        pass
+    finally:
+        print("Interrupted, cleaning up")
+        loop.run_until_complete(signaling.close())