webrtc_sendrecv.py: Handle LATENCY messages
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 #               2022 Nirbheek Chauhan <nirbheek@centricular.com>
5 #
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
8
9 import random
10 import ssl
11 import websockets
12 import asyncio
13 import os
14 import sys
15 import json
16 import argparse
17
18 import gi
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
25
26 # Ensure that gst-python is installed
27 try:
28     from gi.overrides import Gst as _
29 except ImportError:
30     print('gstreamer-python binding overrides aren\'t available, please install them')
31     raise
32
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
34 PIPELINE_DESC_VP8 = '''
35 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
36  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37   vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38   queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
39  audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
41 '''
42 PIPELINE_DESC_H264 = '''
43 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
44  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
45   x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
46   queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
47  audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
48   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
49 '''
50
51 PIPELINE_DESC = {
52     'H264': PIPELINE_DESC_H264,
53     'VP8': PIPELINE_DESC_VP8,
54 }
55
56 from websockets.version import version as wsv
57
58
59 def print_status(msg):
60     print(f'--- {msg}')
61
62
63 def print_error(msg):
64     print(f'!!! {msg}', file=sys.stderr)
65
66
67 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
68     '''
69     Find the payload types for the specified video and audio encoding.
70
71     Very simplistically finds the first payload type matching the encoding
72     name. More complex applications will want to match caps on
73     profile-level-id, packetization-mode, etc.
74     '''
75     video_pt = None
76     audio_pt = None
77     for i in range(0, sdpmsg.medias_len()):
78         media = sdpmsg.get_media(i)
79         for j in range(0, media.formats_len()):
80             fmt = media.get_format(j)
81             if fmt == 'webrtc-datachannel':
82                 continue
83             pt = int(fmt)
84             caps = media.get_caps_from_media(pt)
85             s = caps.get_structure(0)
86             encoding_name = s['encoding-name']
87             if video_pt is None and encoding_name == video_encoding:
88                 video_pt = pt
89             elif audio_pt is None and encoding_name == audio_encoding:
90                 audio_pt = pt
91     return {video_encoding: video_pt, audio_encoding: audio_pt}
92
93
94 class WebRTCClient:
95     def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
96         self.conn = None
97         self.pipe = None
98         self.webrtc = None
99         self.event_loop = loop
100         self.server = server
101         # An optional user-specified ID we can use to register
102         self.our_id = our_id
103         # The actual ID we used to register
104         self.id_ = None
105         # An optional peer ID we should connect to
106         self.peer_id = peer_id
107         # Whether we will send the offer or the remote peer will
108         self.remote_is_offerer = remote_is_offerer
109         # Video encoding: VP8, H264, etc
110         self.video_encoding = video_encoding.upper()
111
112     async def send(self, msg):
113         assert self.conn
114         print(f'>>> {msg}')
115         await self.conn.send(msg)
116
117     async def connect(self):
118         self.conn = await websockets.connect(self.server)
119         if self.our_id is None:
120             self.id_ = str(random.randrange(10, 10000))
121         else:
122             self.id_ = self.our_id
123         await self.send(f'HELLO {self.id_}')
124
125     async def setup_call(self):
126         assert self.peer_id
127         await self.send(f'SESSION {self.peer_id}')
128
129     def send_soon(self, msg):
130         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
131
132     def on_bus_poll_cb(self, bus):
133         def remove_bus_poll():
134             self.event_loop.remove_reader(bus.get_pollfd().fd)
135             self.event_loop.stop()
136         while bus.peek():
137             msg = bus.pop()
138             if msg.type == Gst.MessageType.ERROR:
139                 err = msg.parse_error()
140                 print("ERROR:", err.gerror, err.debug)
141                 remove_bus_poll()
142                 break
143             elif msg.type == Gst.MessageType.EOS:
144                 remove_bus_poll()
145                 break
146             elif msg.type == Gst.MessageType.LATENCY:
147                 self.pipe.recalculate_latency()
148
149     def send_sdp(self, offer):
150         text = offer.sdp.as_text()
151         if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
152             print_status('Sending offer:\n%s' % text)
153             msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
154         elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
155             print_status('Sending answer:\n%s' % text)
156             msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
157         else:
158             raise AssertionError(offer.type)
159         self.send_soon(msg)
160
161     def on_offer_created(self, promise, _, __):
162         assert(promise.wait() == Gst.PromiseResult.REPLIED)
163         reply = promise.get_reply()
164         offer = reply['offer']
165         promise = Gst.Promise.new()
166         print_status('Offer created, setting local description')
167         self.webrtc.emit('set-local-description', offer, promise)
168         promise.interrupt()  # we don't care about the result, discard it
169         self.send_sdp(offer)
170
171     def on_negotiation_needed(self, _, create_offer):
172         if create_offer:
173             print_status('Call was connected: creating offer')
174             promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
175             self.webrtc.emit('create-offer', None, promise)
176
177     def send_ice_candidate_message(self, _, mlineindex, candidate):
178         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
179         self.send_soon(icemsg)
180
181     def on_incoming_decodebin_stream(self, _, pad):
182         if not pad.has_current_caps():
183             print_error(pad, 'has no caps, ignoring')
184             return
185
186         caps = pad.get_current_caps()
187         assert (len(caps))
188         s = caps[0]
189         name = s.get_name()
190         if name.startswith('video'):
191             q = Gst.ElementFactory.make('queue')
192             conv = Gst.ElementFactory.make('videoconvert')
193             sink = Gst.ElementFactory.make('autovideosink')
194             self.pipe.add(q, conv, sink)
195             self.pipe.sync_children_states()
196             pad.link(q.get_static_pad('sink'))
197             q.link(conv)
198             conv.link(sink)
199         elif name.startswith('audio'):
200             q = Gst.ElementFactory.make('queue')
201             conv = Gst.ElementFactory.make('audioconvert')
202             resample = Gst.ElementFactory.make('audioresample')
203             sink = Gst.ElementFactory.make('autoaudiosink')
204             self.pipe.add(q, conv, resample, sink)
205             self.pipe.sync_children_states()
206             pad.link(q.get_static_pad('sink'))
207             q.link(conv)
208             conv.link(resample)
209             resample.link(sink)
210
211     def on_ice_gathering_state_notify(self, pspec, _):
212         state = self.webrtc.get_property('ice-gathering-state')
213         print_status(f'ICE gathering state changed to {state}')
214
215     def on_incoming_stream(self, _, pad):
216         if pad.direction != Gst.PadDirection.SRC:
217             return
218
219         decodebin = Gst.ElementFactory.make('decodebin')
220         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
221         self.pipe.add(decodebin)
222         decodebin.sync_state_with_parent()
223         pad.link(decodebin.get_static_pad('sink'))
224
225     def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
226         print_status(f'Creating pipeline, create_offer: {create_offer}')
227         self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
228         bus = self.pipe.get_bus()
229         self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
230         self.webrtc = self.pipe.get_by_name('sendrecv')
231         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
232         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
233         self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
234         self.webrtc.connect('pad-added', self.on_incoming_stream)
235         self.pipe.set_state(Gst.State.PLAYING)
236
237     def on_answer_created(self, promise, _, __):
238         assert(promise.wait() == Gst.PromiseResult.REPLIED)
239         reply = promise.get_reply()
240         answer = reply['answer']
241         promise = Gst.Promise.new()
242         self.webrtc.emit('set-local-description', answer, promise)
243         promise.interrupt()  # we don't care about the result, discard it
244         self.send_sdp(answer)
245
246     def on_offer_set(self, promise, _, __):
247         assert(promise.wait() == Gst.PromiseResult.REPLIED)
248         promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
249         self.webrtc.emit('create-answer', None, promise)
250
251     def handle_json(self, message):
252         try:
253             msg = json.loads(message)
254         except json.decoder.JSONDecoderError:
255             print_error('Failed to parse JSON message, this might be a bug')
256             raise
257         if 'sdp' in msg:
258             sdp = msg['sdp']['sdp']
259             if msg['sdp']['type'] == 'answer':
260                 print_status('Received answer:\n%s' % sdp)
261                 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
262                 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
263                 promise = Gst.Promise.new()
264                 self.webrtc.emit('set-remote-description', answer, promise)
265                 promise.interrupt()  # we don't care about the result, discard it
266             else:
267                 print_status('Received offer:\n%s' % sdp)
268                 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
269
270                 if not self.webrtc:
271                     print_status('Incoming call: received an offer, creating pipeline')
272                     pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
273                     assert(self.video_encoding in pts)
274                     assert('OPUS' in pts)
275                     self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
276
277                 assert(self.webrtc)
278
279                 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
280                 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
281                 self.webrtc.emit('set-remote-description', offer, promise)
282         elif 'ice' in msg:
283             assert(self.webrtc)
284             ice = msg['ice']
285             candidate = ice['candidate']
286             sdpmlineindex = ice['sdpMLineIndex']
287             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
288         else:
289             print_error('Unknown JSON message')
290
291     def close_pipeline(self):
292         if self.pipe:
293             self.pipe.set_state(Gst.State.NULL)
294             self.pipe = None
295         self.webrtc = None
296
297     async def loop(self):
298         assert self.conn
299         async for message in self.conn:
300             print(f'<<< {message}')
301             if message == 'HELLO':
302                 assert self.id_
303                 # If a peer ID is specified, we want to connect to it. If not,
304                 # we wait for an incoming call.
305                 if not self.peer_id:
306                     print_status(f'Waiting for incoming call: ID is {self.id_}')
307                 else:
308                     if self.remote_is_offerer:
309                         print_status('Have peer ID: initiating call (will request remote peer to create offer)')
310                     else:
311                         print_status('Have peer ID: initiating call (will create offer)')
312                     await self.setup_call()
313             elif message == 'SESSION_OK':
314                 if self.remote_is_offerer:
315                     # We are initiating the call, but we want the remote peer to create the offer
316                     print_status('Call was connected: requesting remote peer for offer')
317                     await self.send('OFFER_REQUEST')
318                 else:
319                     self.start_pipeline()
320             elif message == 'OFFER_REQUEST':
321                 print_status('Incoming call: we have been asked to create the offer')
322                 self.start_pipeline()
323             elif message.startswith('ERROR'):
324                 print_error(message)
325                 self.close_pipeline()
326                 return 1
327             else:
328                 self.handle_json(message)
329         self.close_pipeline()
330         return 0
331
332     async def stop(self):
333         if self.conn:
334             await self.conn.close()
335         self.conn = None
336
337
338 def check_plugins():
339     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
340               "rtpmanager", "videotestsrc", "audiotestsrc"]
341     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
342     if len(missing):
343         print_error('Missing gstreamer plugins:', missing)
344         return False
345     return True
346
347
348 if __name__ == '__main__':
349     Gst.init(None)
350     if not check_plugins():
351         sys.exit(1)
352     parser = argparse.ArgumentParser()
353     parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
354                         help='Video encoding to negotiate')
355     parser.add_argument('--peer-id', help='String ID of the peer to connect to')
356     parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
357     parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
358                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
359     parser.add_argument('--remote-offerer', default=False, action='store_true',
360                         dest='remote_is_offerer',
361                         help='Request that the peer generate the offer and we\'ll answer')
362     args = parser.parse_args()
363     if not args.peer_id and not args.our_id:
364         print('You must pass either --peer-id or --our-id')
365         sys.exit(1)
366     loop = asyncio.new_event_loop()
367     c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
368     loop.run_until_complete(c.connect())
369     res = loop.run_until_complete(c.loop())
370     sys.exit(res)