webrtc examples: Use webrtc.gstreamer.net
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 #               2022 Nirbheek Chauhan <nirbheek@centricular.com>
5 #
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
8
9 from websockets.version import version as wsv
10 from gi.repository import GstSdp
11 from gi.repository import GstWebRTC
12 from gi.repository import Gst
13 import random
14 import ssl
15 import websockets
16 import asyncio
17 import os
18 import sys
19 import json
20 import argparse
21
22 import gi
23 gi.require_version('Gst', '1.0')
24 gi.require_version('GstWebRTC', '1.0')
25 gi.require_version('GstSdp', '1.0')
26
27 # Ensure that gst-python is installed
28 try:
29     from gi.overrides import Gst as _
30 except ImportError:
31     print('gstreamer-python binding overrides aren\'t available, please install them')
32     raise
33
34 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
35 PIPELINE_DESC_VP8 = '''
36 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
37  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
38   vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
39   queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
40  audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
41   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
42 '''
43 PIPELINE_DESC_H264 = '''
44 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
45  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
46   x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
47   queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
48  audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
49   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
50 '''
51
52 PIPELINE_DESC = {
53     'H264': PIPELINE_DESC_H264,
54     'VP8': PIPELINE_DESC_VP8,
55 }
56
57
58 def print_status(msg):
59     print(f'--- {msg}')
60
61
62 def print_error(msg):
63     print(f'!!! {msg}', file=sys.stderr)
64
65
66 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
67     '''
68     Find the payload types for the specified video and audio encoding.
69
70     Very simplistically finds the first payload type matching the encoding
71     name. More complex applications will want to match caps on
72     profile-level-id, packetization-mode, etc.
73     '''
74     video_pt = None
75     audio_pt = None
76     for i in range(0, sdpmsg.medias_len()):
77         media = sdpmsg.get_media(i)
78         for j in range(0, media.formats_len()):
79             fmt = media.get_format(j)
80             if fmt == 'webrtc-datachannel':
81                 continue
82             pt = int(fmt)
83             caps = media.get_caps_from_media(pt)
84             s = caps.get_structure(0)
85             encoding_name = s['encoding-name']
86             if video_pt is None and encoding_name == video_encoding:
87                 video_pt = pt
88             elif audio_pt is None and encoding_name == audio_encoding:
89                 audio_pt = pt
90     return {video_encoding: video_pt, audio_encoding: audio_pt}
91
92
93 class WebRTCClient:
94     def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
95         self.conn = None
96         self.pipe = None
97         self.webrtc = None
98         self.event_loop = loop
99         self.server = server
100         # An optional user-specified ID we can use to register
101         self.our_id = our_id
102         # The actual ID we used to register
103         self.id_ = None
104         # An optional peer ID we should connect to
105         self.peer_id = peer_id
106         # Whether we will send the offer or the remote peer will
107         self.remote_is_offerer = remote_is_offerer
108         # Video encoding: VP8, H264, etc
109         self.video_encoding = video_encoding.upper()
110
111     async def send(self, msg):
112         assert self.conn
113         print(f'>>> {msg}')
114         await self.conn.send(msg)
115
116     async def connect(self):
117         self.conn = await websockets.connect(self.server)
118         if self.our_id is None:
119             self.id_ = str(random.randrange(10, 10000))
120         else:
121             self.id_ = self.our_id
122         await self.send(f'HELLO {self.id_}')
123
124     async def setup_call(self):
125         assert self.peer_id
126         await self.send(f'SESSION {self.peer_id}')
127
128     def send_soon(self, msg):
129         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
130
131     def on_bus_poll_cb(self, bus):
132         def remove_bus_poll():
133             self.event_loop.remove_reader(bus.get_pollfd().fd)
134             self.event_loop.stop()
135         while bus.peek():
136             msg = bus.pop()
137             if msg.type == Gst.MessageType.ERROR:
138                 err = msg.parse_error()
139                 print("ERROR:", err.gerror, err.debug)
140                 remove_bus_poll()
141                 break
142             elif msg.type == Gst.MessageType.EOS:
143                 remove_bus_poll()
144                 break
145             elif msg.type == Gst.MessageType.LATENCY:
146                 self.pipe.recalculate_latency()
147
148     def send_sdp(self, offer):
149         text = offer.sdp.as_text()
150         if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
151             print_status('Sending offer:\n%s' % text)
152             msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
153         elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
154             print_status('Sending answer:\n%s' % text)
155             msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
156         else:
157             raise AssertionError(offer.type)
158         self.send_soon(msg)
159
160     def on_offer_created(self, promise, _, __):
161         assert promise.wait() == Gst.PromiseResult.REPLIED
162         reply = promise.get_reply()
163         offer = reply['offer']
164         promise = Gst.Promise.new()
165         print_status('Offer created, setting local description')
166         self.webrtc.emit('set-local-description', offer, promise)
167         promise.interrupt()  # we don't care about the result, discard it
168         self.send_sdp(offer)
169
170     def on_negotiation_needed(self, _, create_offer):
171         if create_offer:
172             print_status('Call was connected: creating offer')
173             promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
174             self.webrtc.emit('create-offer', None, promise)
175
176     def send_ice_candidate_message(self, _, mlineindex, candidate):
177         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
178         self.send_soon(icemsg)
179
180     def on_incoming_decodebin_stream(self, _, pad):
181         if not pad.has_current_caps():
182             print_error(pad, 'has no caps, ignoring')
183             return
184
185         caps = pad.get_current_caps()
186         assert (len(caps))
187         s = caps[0]
188         name = s.get_name()
189         if name.startswith('video'):
190             q = Gst.ElementFactory.make('queue')
191             conv = Gst.ElementFactory.make('videoconvert')
192             sink = Gst.ElementFactory.make('autovideosink')
193             self.pipe.add(q, conv, sink)
194             self.pipe.sync_children_states()
195             pad.link(q.get_static_pad('sink'))
196             q.link(conv)
197             conv.link(sink)
198         elif name.startswith('audio'):
199             q = Gst.ElementFactory.make('queue')
200             conv = Gst.ElementFactory.make('audioconvert')
201             resample = Gst.ElementFactory.make('audioresample')
202             sink = Gst.ElementFactory.make('autoaudiosink')
203             self.pipe.add(q, conv, resample, sink)
204             self.pipe.sync_children_states()
205             pad.link(q.get_static_pad('sink'))
206             q.link(conv)
207             conv.link(resample)
208             resample.link(sink)
209
210     def on_ice_gathering_state_notify(self, pspec, _):
211         state = self.webrtc.get_property('ice-gathering-state')
212         print_status(f'ICE gathering state changed to {state}')
213
214     def on_incoming_stream(self, _, pad):
215         if pad.direction != Gst.PadDirection.SRC:
216             return
217
218         decodebin = Gst.ElementFactory.make('decodebin')
219         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
220         self.pipe.add(decodebin)
221         decodebin.sync_state_with_parent()
222         pad.link(decodebin.get_static_pad('sink'))
223
224     def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
225         print_status(f'Creating pipeline, create_offer: {create_offer}')
226         self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
227         bus = self.pipe.get_bus()
228         self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
229         self.webrtc = self.pipe.get_by_name('sendrecv')
230         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
231         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
232         self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
233         self.webrtc.connect('pad-added', self.on_incoming_stream)
234         self.pipe.set_state(Gst.State.PLAYING)
235
236     def on_answer_created(self, promise, _, __):
237         assert promise.wait() == Gst.PromiseResult.REPLIED
238         reply = promise.get_reply()
239         answer = reply['answer']
240         promise = Gst.Promise.new()
241         self.webrtc.emit('set-local-description', answer, promise)
242         promise.interrupt()  # we don't care about the result, discard it
243         self.send_sdp(answer)
244
245     def on_offer_set(self, promise, _, __):
246         assert promise.wait() == Gst.PromiseResult.REPLIED
247         promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
248         self.webrtc.emit('create-answer', None, promise)
249
250     def handle_json(self, message):
251         try:
252             msg = json.loads(message)
253         except json.decoder.JSONDecoderError:
254             print_error('Failed to parse JSON message, this might be a bug')
255             raise
256         if 'sdp' in msg:
257             sdp = msg['sdp']['sdp']
258             if msg['sdp']['type'] == 'answer':
259                 print_status('Received answer:\n%s' % sdp)
260                 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
261                 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
262                 promise = Gst.Promise.new()
263                 self.webrtc.emit('set-remote-description', answer, promise)
264                 promise.interrupt()  # we don't care about the result, discard it
265             else:
266                 print_status('Received offer:\n%s' % sdp)
267                 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
268
269                 if not self.webrtc:
270                     print_status('Incoming call: received an offer, creating pipeline')
271                     pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
272                     assert self.video_encoding in pts
273                     assert 'OPUS' in pts
274                     self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
275
276                 assert self.webrtc
277
278                 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
279                 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
280                 self.webrtc.emit('set-remote-description', offer, promise)
281         elif 'ice' in msg:
282             assert self.webrtc
283             ice = msg['ice']
284             candidate = ice['candidate']
285             sdpmlineindex = ice['sdpMLineIndex']
286             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
287         else:
288             print_error('Unknown JSON message')
289
290     def close_pipeline(self):
291         if self.pipe:
292             self.pipe.set_state(Gst.State.NULL)
293             self.pipe = None
294         self.webrtc = None
295
296     async def loop(self):
297         assert self.conn
298         async for message in self.conn:
299             print(f'<<< {message}')
300             if message == 'HELLO':
301                 assert self.id_
302                 # If a peer ID is specified, we want to connect to it. If not,
303                 # we wait for an incoming call.
304                 if not self.peer_id:
305                     print_status(f'Waiting for incoming call: ID is {self.id_}')
306                 else:
307                     if self.remote_is_offerer:
308                         print_status('Have peer ID: initiating call (will request remote peer to create offer)')
309                     else:
310                         print_status('Have peer ID: initiating call (will create offer)')
311                     await self.setup_call()
312             elif message == 'SESSION_OK':
313                 if self.remote_is_offerer:
314                     # We are initiating the call, but we want the remote peer to create the offer
315                     print_status('Call was connected: requesting remote peer for offer')
316                     await self.send('OFFER_REQUEST')
317                 else:
318                     self.start_pipeline()
319             elif message == 'OFFER_REQUEST':
320                 print_status('Incoming call: we have been asked to create the offer')
321                 self.start_pipeline()
322             elif message.startswith('ERROR'):
323                 print_error(message)
324                 self.close_pipeline()
325                 return 1
326             else:
327                 self.handle_json(message)
328         self.close_pipeline()
329         return 0
330
331     async def stop(self):
332         if self.conn:
333             await self.conn.close()
334         self.conn = None
335
336
337 def check_plugins():
338     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
339               "rtpmanager", "videotestsrc", "audiotestsrc"]
340     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
341     if len(missing):
342         print_error('Missing gstreamer plugins:', missing)
343         return False
344     return True
345
346
347 if __name__ == '__main__':
348     Gst.init(None)
349     if not check_plugins():
350         sys.exit(1)
351     parser = argparse.ArgumentParser()
352     parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
353                         help='Video encoding to negotiate')
354     parser.add_argument('--peer-id', help='String ID of the peer to connect to')
355     parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
356     parser.add_argument('--server', default='wss://webrtc.gstreamer.net:8443',
357                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
358     parser.add_argument('--remote-offerer', default=False, action='store_true',
359                         dest='remote_is_offerer',
360                         help='Request that the peer generate the offer and we\'ll answer')
361     args = parser.parse_args()
362     if not args.peer_id and not args.our_id:
363         print('You must pass either --peer-id or --our-id')
364         sys.exit(1)
365     loop = asyncio.new_event_loop()
366     c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
367     loop.run_until_complete(c.connect())
368     res = loop.run_until_complete(c.loop())
369     sys.exit(res)