webrtc_sendrecv.py: Link pads instead of elements
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 #               2022 Nirbheek Chauhan <nirbheek@centricular.com>
5 #
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
8
9 import random
10 import ssl
11 import websockets
12 import asyncio
13 import os
14 import sys
15 import json
16 import argparse
17
18 import gi
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
25
26 # Ensure that gst-python is installed
27 try:
28     from gi.overrides import Gst as _
29 except ImportError:
30     print('gstreamer-python binding overrides aren\'t available, please install them')
31     raise
32
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
34 PIPELINE_DESC = '''
35 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
36  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37   vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38   queue ! application/x-rtp,media=video,encoding-name=VP8,payload={vp8_pt} ! sendrecv.
39  audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={opus_pt} ! sendrecv.
41 '''
42
43 from websockets.version import version as wsv
44
45
46 def print_status(msg):
47     print(f'--- {msg}')
48
49
50 def print_error(msg):
51     print(f'!!! {msg}', file=sys.stderr)
52
53
54 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
55     '''
56     Find the payload types for the specified video and audio encoding.
57
58     Very simplistically finds the first payload type matching the encoding
59     name. More complex applications will want to match caps on
60     profile-level-id, packetization-mode, etc.
61     '''
62     video_pt = None
63     audio_pt = None
64     for i in range(0, sdpmsg.medias_len()):
65         media = sdpmsg.get_media(i)
66         for j in range(0, media.formats_len()):
67             fmt = media.get_format(j)
68             if fmt == 'webrtc-datachannel':
69                 continue
70             pt = int(fmt)
71             caps = media.get_caps_from_media(pt)
72             s = caps.get_structure(0)
73             encoding_name = s['encoding-name']
74             if video_pt is None and encoding_name == video_encoding:
75                 video_pt = pt
76             elif audio_pt is None and encoding_name == audio_encoding:
77                 audio_pt = pt
78     return {video_encoding: video_pt, audio_encoding: audio_pt}
79
80
81 class WebRTCClient:
82     def __init__(self, loop, our_id, peer_id, server, remote_is_offerer):
83         self.conn = None
84         self.pipe = None
85         self.webrtc = None
86         self.event_loop = loop
87         self.server = server
88         # An optional user-specified ID we can use to register
89         self.our_id = our_id
90         # The actual ID we used to register
91         self.id_ = None
92         # An optional peer ID we should connect to
93         self.peer_id = peer_id
94         # Whether we will send the offer or the remote peer will
95         self.remote_is_offerer = remote_is_offerer
96
97     async def send(self, msg):
98         assert self.conn
99         print(f'>>> {msg}')
100         await self.conn.send(msg)
101
102     async def connect(self):
103         self.conn = await websockets.connect(self.server)
104         if self.our_id is None:
105             self.id_ = str(random.randrange(10, 10000))
106         else:
107             self.id_ = self.our_id
108         await self.send(f'HELLO {self.id_}')
109
110     async def setup_call(self):
111         assert self.peer_id
112         await self.send(f'SESSION {self.peer_id}')
113
114     def send_soon(self, msg):
115         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
116
117     def send_sdp(self, offer):
118         text = offer.sdp.as_text()
119         if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
120             print_status('Sending offer:\n%s' % text)
121             msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
122         elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
123             print_status('Sending answer:\n%s' % text)
124             msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
125         else:
126             raise AssertionError(offer.type)
127         self.send_soon(msg)
128
129     def on_offer_created(self, promise, _, __):
130         assert(promise.wait() == Gst.PromiseResult.REPLIED)
131         reply = promise.get_reply()
132         offer = reply['offer']
133         promise = Gst.Promise.new()
134         print_status('Offer created, setting local description')
135         self.webrtc.emit('set-local-description', offer, promise)
136         promise.interrupt()  # we don't care about the result, discard it
137         self.send_sdp(offer)
138
139     def on_negotiation_needed(self, _, create_offer):
140         if create_offer:
141             print_status('Call was connected: creating offer')
142             promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
143             self.webrtc.emit('create-offer', None, promise)
144
145     def send_ice_candidate_message(self, _, mlineindex, candidate):
146         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
147         self.send_soon(icemsg)
148
149     def on_incoming_decodebin_stream(self, _, pad):
150         if not pad.has_current_caps():
151             print_error(pad, 'has no caps, ignoring')
152             return
153
154         caps = pad.get_current_caps()
155         assert (len(caps))
156         s = caps[0]
157         name = s.get_name()
158         if name.startswith('video'):
159             q = Gst.ElementFactory.make('queue')
160             conv = Gst.ElementFactory.make('videoconvert')
161             sink = Gst.ElementFactory.make('autovideosink')
162             self.pipe.add(q, conv, sink)
163             self.pipe.sync_children_states()
164             pad.link(q.get_static_pad('sink'))
165             q.link(conv)
166             conv.link(sink)
167         elif name.startswith('audio'):
168             q = Gst.ElementFactory.make('queue')
169             conv = Gst.ElementFactory.make('audioconvert')
170             resample = Gst.ElementFactory.make('audioresample')
171             sink = Gst.ElementFactory.make('autoaudiosink')
172             self.pipe.add(q, conv, resample, sink)
173             self.pipe.sync_children_states()
174             pad.link(q.get_static_pad('sink'))
175             q.link(conv)
176             conv.link(resample)
177             resample.link(sink)
178
179     def on_ice_gathering_state_notify(self, pspec, _):
180         state = self.webrtc.get_property('ice-gathering-state')
181         print_status(f'ICE gathering state changed to {state}')
182
183     def on_incoming_stream(self, _, pad):
184         if pad.direction != Gst.PadDirection.SRC:
185             return
186
187         decodebin = Gst.ElementFactory.make('decodebin')
188         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
189         self.pipe.add(decodebin)
190         decodebin.sync_state_with_parent()
191         pad.link(decodebin.get_static_pad('sink'))
192
193     def start_pipeline(self, create_offer=True, opus_pt=96, vp8_pt=97):
194         print_status(f'Creating pipeline, create_offer: {create_offer}')
195         self.pipe = Gst.parse_launch(PIPELINE_DESC.format(vp8_pt=vp8_pt, opus_pt=opus_pt))
196         self.webrtc = self.pipe.get_by_name('sendrecv')
197         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
198         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
199         self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
200         self.webrtc.connect('pad-added', self.on_incoming_stream)
201         self.pipe.set_state(Gst.State.PLAYING)
202
203     def on_answer_created(self, promise, _, __):
204         assert(promise.wait() == Gst.PromiseResult.REPLIED)
205         reply = promise.get_reply()
206         answer = reply['answer']
207         promise = Gst.Promise.new()
208         self.webrtc.emit('set-local-description', answer, promise)
209         promise.interrupt()  # we don't care about the result, discard it
210         self.send_sdp(answer)
211
212     def on_offer_set(self, promise, _, __):
213         assert(promise.wait() == Gst.PromiseResult.REPLIED)
214         promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
215         self.webrtc.emit('create-answer', None, promise)
216
217     def handle_json(self, message):
218         try:
219             msg = json.loads(message)
220         except json.decoder.JSONDecoderError:
221             print_error('Failed to parse JSON message, this might be a bug')
222             raise
223         if 'sdp' in msg:
224             sdp = msg['sdp']['sdp']
225             if msg['sdp']['type'] == 'answer':
226                 print_status('Received answer:\n%s' % sdp)
227                 res, sdpmsg = GstSdp.SDPMessage.new()
228                 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
229                 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
230                 promise = Gst.Promise.new()
231                 self.webrtc.emit('set-remote-description', answer, promise)
232                 promise.interrupt()  # we don't care about the result, discard it
233             else:
234                 print_status('Received offer:\n%s' % sdp)
235                 res, sdpmsg = GstSdp.SDPMessage.new()
236                 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
237
238                 if not self.webrtc:
239                     print_status('Incoming call: received an offer, creating pipeline')
240                     pts = get_payload_types(sdpmsg, video_encoding='VP8', audio_encoding='OPUS')
241                     assert('VP8' in pts)
242                     assert('OPUS' in pts)
243                     self.start_pipeline(create_offer=False, vp8_pt=pts['VP8'], opus_pt=pts['OPUS'])
244
245                 assert(self.webrtc)
246
247                 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
248                 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
249                 self.webrtc.emit('set-remote-description', offer, promise)
250         elif 'ice' in msg:
251             assert(self.webrtc)
252             ice = msg['ice']
253             candidate = ice['candidate']
254             sdpmlineindex = ice['sdpMLineIndex']
255             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
256         else:
257             print_error('Unknown JSON message')
258
259     def close_pipeline(self):
260         if self.pipe:
261             self.pipe.set_state(Gst.State.NULL)
262             self.pipe = None
263         self.webrtc = None
264
265     async def loop(self):
266         assert self.conn
267         async for message in self.conn:
268             print(f'<<< {message}')
269             if message == 'HELLO':
270                 assert self.id_
271                 # If a peer ID is specified, we want to connect to it. If not,
272                 # we wait for an incoming call.
273                 if not self.peer_id:
274                     print_status(f'Waiting for incoming call: ID is {self.id_}')
275                 else:
276                     if self.remote_is_offerer:
277                         print_status('Have peer ID: initiating call (will request remote peer to create offer)')
278                     else:
279                         print_status('Have peer ID: initiating call (will create offer)')
280                     await self.setup_call()
281             elif message == 'SESSION_OK':
282                 if self.remote_is_offerer:
283                     # We are initiating the call, but we want the remote peer to create the offer
284                     print_status('Call was connected: requesting remote peer for offer')
285                     await self.send('OFFER_REQUEST')
286                 else:
287                     self.start_pipeline()
288             elif message == 'OFFER_REQUEST':
289                 print_status('Incoming call: we have been asked to create the offer')
290                 self.start_pipeline()
291             elif message.startswith('ERROR'):
292                 print_error(message)
293                 self.close_pipeline()
294                 return 1
295             else:
296                 self.handle_json(message)
297         self.close_pipeline()
298         return 0
299
300     async def stop(self):
301         if self.conn:
302             await self.conn.close()
303         self.conn = None
304
305
306 def check_plugins():
307     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
308               "rtpmanager", "videotestsrc", "audiotestsrc"]
309     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
310     if len(missing):
311         print_error('Missing gstreamer plugins:', missing)
312         return False
313     return True
314
315
316 if __name__ == '__main__':
317     Gst.init(None)
318     if not check_plugins():
319         sys.exit(1)
320     parser = argparse.ArgumentParser()
321     parser.add_argument('--peer-id', help='String ID of the peer to connect to')
322     parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
323     parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
324                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
325     parser.add_argument('--remote-offerer', default=False, action='store_true',
326                         dest='remote_is_offerer',
327                         help='Request that the peer generate the offer and we\'ll answer')
328     args = parser.parse_args()
329     if not args.peer_id and not args.our_id:
330         print('You must pass either --peer-id or --our-id')
331         sys.exit(1)
332     loop = asyncio.new_event_loop()
333     c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer)
334     loop.run_until_complete(c.connect())
335     res = loop.run_until_complete(c.loop())
336     sys.exit(res)