webrtc_sendrecv.py: Fix event loop usage for messages
authorNirbheek Chauhan <nirbheek@centricular.com>
Tue, 1 Mar 2022 18:33:29 +0000 (00:03 +0530)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 18 Mar 2022 08:16:45 +0000 (08:16 +0000)
Instead of creating a new loop, we should just be fetching the running
loop, then doing a blocking network call inside the callback, schedule
it on the event loop. This is what the C example does too.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1864>

subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py

index ec31e94..bfb9903 100644 (file)
@@ -36,7 +36,8 @@ from websockets.version import version as wsv
 
 
 class WebRTCClient:
-    def __init__(self, id_, peer_id, server):
+    def __init__(self, loop, id_, peer_id, server):
+        self.event_loop = loop
         self.id_ = id_
         self.conn = None
         self.pipe = None
@@ -44,26 +45,33 @@ class WebRTCClient:
         self.peer_id = peer_id
         self.server = server
 
+    async def send(self, msg):
+        assert self.conn
+        print(f'>>> Sending {msg}')
+        await self.conn.send(msg)
+
     async def connect(self):
         self.conn = await websockets.connect(self.server)
-        await self.conn.send('HELLO %d' % self.id_)
+        await self.send('HELLO %d' % self.id_)
 
     async def setup_call(self):
-        await self.conn.send('SESSION {}'.format(self.peer_id))
+        await self.send('SESSION {}'.format(self.peer_id))
+
+    def send_soon(self, msg):
+        asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
 
     def send_sdp_offer(self, offer):
         text = offer.sdp.as_text()
         print('Sending offer:\n%s' % text)
         msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
-        loop = asyncio.new_event_loop()
-        loop.run_until_complete(self.conn.send(msg))
-        loop.close()
+        self.send_soon(msg)
 
     def on_offer_created(self, promise, _, __):
         promise.wait()
         reply = promise.get_reply()
         offer = reply['offer']
         promise = Gst.Promise.new()
+        print('Offer created, setting local description')
         self.webrtc.emit('set-local-description', offer, promise)
         promise.interrupt()
         self.send_sdp_offer(offer)
@@ -74,9 +82,7 @@ class WebRTCClient:
 
     def send_ice_candidate_message(self, _, mlineindex, candidate):
         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
-        loop = asyncio.new_event_loop()
-        loop.run_until_complete(self.conn.send(icemsg))
-        loop.close()
+        self.send_soon(icemsg)
 
     def on_incoming_decodebin_stream(self, _, pad):
         if not pad.has_current_caps():
@@ -194,8 +200,8 @@ if __name__ == '__main__':
                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
     args = parser.parse_args()
     our_id = random.randrange(10, 10000)
-    c = WebRTCClient(our_id, args.peerid, args.server)
     loop = asyncio.new_event_loop()
+    c = WebRTCClient(loop, our_id, args.peerid, args.server)
     loop.run_until_complete(c.connect())
     res = loop.run_until_complete(c.loop())
     sys.exit(res)