webrtc_sendrecv.py: Add support for using H264 encoding
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 #               2022 Nirbheek Chauhan <nirbheek@centricular.com>
5 #
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
8
9 import random
10 import ssl
11 import websockets
12 import asyncio
13 import os
14 import sys
15 import json
16 import argparse
17
18 import gi
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
25
26 # Ensure that gst-python is installed
27 try:
28     from gi.overrides import Gst as _
29 except ImportError:
30     print('gstreamer-python binding overrides aren\'t available, please install them')
31     raise
32
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
34 PIPELINE_DESC_VP8 = '''
35 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
36  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37   vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38   queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
39  audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
41 '''
42 PIPELINE_DESC_H264 = '''
43 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
44  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
45   x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
46   queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
47  audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
48   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
49 '''
50
51 PIPELINE_DESC = {
52     'H264': PIPELINE_DESC_H264,
53     'VP8': PIPELINE_DESC_VP8,
54 }
55
56 from websockets.version import version as wsv
57
58
59 def print_status(msg):
60     print(f'--- {msg}')
61
62
63 def print_error(msg):
64     print(f'!!! {msg}', file=sys.stderr)
65
66
67 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
68     '''
69     Find the payload types for the specified video and audio encoding.
70
71     Very simplistically finds the first payload type matching the encoding
72     name. More complex applications will want to match caps on
73     profile-level-id, packetization-mode, etc.
74     '''
75     video_pt = None
76     audio_pt = None
77     for i in range(0, sdpmsg.medias_len()):
78         media = sdpmsg.get_media(i)
79         for j in range(0, media.formats_len()):
80             fmt = media.get_format(j)
81             if fmt == 'webrtc-datachannel':
82                 continue
83             pt = int(fmt)
84             caps = media.get_caps_from_media(pt)
85             s = caps.get_structure(0)
86             encoding_name = s['encoding-name']
87             if video_pt is None and encoding_name == video_encoding:
88                 video_pt = pt
89             elif audio_pt is None and encoding_name == audio_encoding:
90                 audio_pt = pt
91     return {video_encoding: video_pt, audio_encoding: audio_pt}
92
93
94 class WebRTCClient:
95     def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
96         self.conn = None
97         self.pipe = None
98         self.webrtc = None
99         self.event_loop = loop
100         self.server = server
101         # An optional user-specified ID we can use to register
102         self.our_id = our_id
103         # The actual ID we used to register
104         self.id_ = None
105         # An optional peer ID we should connect to
106         self.peer_id = peer_id
107         # Whether we will send the offer or the remote peer will
108         self.remote_is_offerer = remote_is_offerer
109         # Video encoding: VP8, H264, etc
110         self.video_encoding = video_encoding.upper()
111
112     async def send(self, msg):
113         assert self.conn
114         print(f'>>> {msg}')
115         await self.conn.send(msg)
116
117     async def connect(self):
118         self.conn = await websockets.connect(self.server)
119         if self.our_id is None:
120             self.id_ = str(random.randrange(10, 10000))
121         else:
122             self.id_ = self.our_id
123         await self.send(f'HELLO {self.id_}')
124
125     async def setup_call(self):
126         assert self.peer_id
127         await self.send(f'SESSION {self.peer_id}')
128
129     def send_soon(self, msg):
130         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
131
132     def send_sdp(self, offer):
133         text = offer.sdp.as_text()
134         if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
135             print_status('Sending offer:\n%s' % text)
136             msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
137         elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
138             print_status('Sending answer:\n%s' % text)
139             msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
140         else:
141             raise AssertionError(offer.type)
142         self.send_soon(msg)
143
144     def on_offer_created(self, promise, _, __):
145         assert(promise.wait() == Gst.PromiseResult.REPLIED)
146         reply = promise.get_reply()
147         offer = reply['offer']
148         promise = Gst.Promise.new()
149         print_status('Offer created, setting local description')
150         self.webrtc.emit('set-local-description', offer, promise)
151         promise.interrupt()  # we don't care about the result, discard it
152         self.send_sdp(offer)
153
154     def on_negotiation_needed(self, _, create_offer):
155         if create_offer:
156             print_status('Call was connected: creating offer')
157             promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
158             self.webrtc.emit('create-offer', None, promise)
159
160     def send_ice_candidate_message(self, _, mlineindex, candidate):
161         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
162         self.send_soon(icemsg)
163
164     def on_incoming_decodebin_stream(self, _, pad):
165         if not pad.has_current_caps():
166             print_error(pad, 'has no caps, ignoring')
167             return
168
169         caps = pad.get_current_caps()
170         assert (len(caps))
171         s = caps[0]
172         name = s.get_name()
173         if name.startswith('video'):
174             q = Gst.ElementFactory.make('queue')
175             conv = Gst.ElementFactory.make('videoconvert')
176             sink = Gst.ElementFactory.make('autovideosink')
177             self.pipe.add(q, conv, sink)
178             self.pipe.sync_children_states()
179             pad.link(q.get_static_pad('sink'))
180             q.link(conv)
181             conv.link(sink)
182         elif name.startswith('audio'):
183             q = Gst.ElementFactory.make('queue')
184             conv = Gst.ElementFactory.make('audioconvert')
185             resample = Gst.ElementFactory.make('audioresample')
186             sink = Gst.ElementFactory.make('autoaudiosink')
187             self.pipe.add(q, conv, resample, sink)
188             self.pipe.sync_children_states()
189             pad.link(q.get_static_pad('sink'))
190             q.link(conv)
191             conv.link(resample)
192             resample.link(sink)
193
194     def on_ice_gathering_state_notify(self, pspec, _):
195         state = self.webrtc.get_property('ice-gathering-state')
196         print_status(f'ICE gathering state changed to {state}')
197
198     def on_incoming_stream(self, _, pad):
199         if pad.direction != Gst.PadDirection.SRC:
200             return
201
202         decodebin = Gst.ElementFactory.make('decodebin')
203         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
204         self.pipe.add(decodebin)
205         decodebin.sync_state_with_parent()
206         pad.link(decodebin.get_static_pad('sink'))
207
208     def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
209         print_status(f'Creating pipeline, create_offer: {create_offer}')
210         self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
211         self.webrtc = self.pipe.get_by_name('sendrecv')
212         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
213         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
214         self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
215         self.webrtc.connect('pad-added', self.on_incoming_stream)
216         self.pipe.set_state(Gst.State.PLAYING)
217
218     def on_answer_created(self, promise, _, __):
219         assert(promise.wait() == Gst.PromiseResult.REPLIED)
220         reply = promise.get_reply()
221         answer = reply['answer']
222         promise = Gst.Promise.new()
223         self.webrtc.emit('set-local-description', answer, promise)
224         promise.interrupt()  # we don't care about the result, discard it
225         self.send_sdp(answer)
226
227     def on_offer_set(self, promise, _, __):
228         assert(promise.wait() == Gst.PromiseResult.REPLIED)
229         promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
230         self.webrtc.emit('create-answer', None, promise)
231
232     def handle_json(self, message):
233         try:
234             msg = json.loads(message)
235         except json.decoder.JSONDecoderError:
236             print_error('Failed to parse JSON message, this might be a bug')
237             raise
238         if 'sdp' in msg:
239             sdp = msg['sdp']['sdp']
240             if msg['sdp']['type'] == 'answer':
241                 print_status('Received answer:\n%s' % sdp)
242                 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
243                 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
244                 promise = Gst.Promise.new()
245                 self.webrtc.emit('set-remote-description', answer, promise)
246                 promise.interrupt()  # we don't care about the result, discard it
247             else:
248                 print_status('Received offer:\n%s' % sdp)
249                 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
250
251                 if not self.webrtc:
252                     print_status('Incoming call: received an offer, creating pipeline')
253                     pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
254                     assert(self.video_encoding in pts)
255                     assert('OPUS' in pts)
256                     self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
257
258                 assert(self.webrtc)
259
260                 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
261                 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
262                 self.webrtc.emit('set-remote-description', offer, promise)
263         elif 'ice' in msg:
264             assert(self.webrtc)
265             ice = msg['ice']
266             candidate = ice['candidate']
267             sdpmlineindex = ice['sdpMLineIndex']
268             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
269         else:
270             print_error('Unknown JSON message')
271
272     def close_pipeline(self):
273         if self.pipe:
274             self.pipe.set_state(Gst.State.NULL)
275             self.pipe = None
276         self.webrtc = None
277
278     async def loop(self):
279         assert self.conn
280         async for message in self.conn:
281             print(f'<<< {message}')
282             if message == 'HELLO':
283                 assert self.id_
284                 # If a peer ID is specified, we want to connect to it. If not,
285                 # we wait for an incoming call.
286                 if not self.peer_id:
287                     print_status(f'Waiting for incoming call: ID is {self.id_}')
288                 else:
289                     if self.remote_is_offerer:
290                         print_status('Have peer ID: initiating call (will request remote peer to create offer)')
291                     else:
292                         print_status('Have peer ID: initiating call (will create offer)')
293                     await self.setup_call()
294             elif message == 'SESSION_OK':
295                 if self.remote_is_offerer:
296                     # We are initiating the call, but we want the remote peer to create the offer
297                     print_status('Call was connected: requesting remote peer for offer')
298                     await self.send('OFFER_REQUEST')
299                 else:
300                     self.start_pipeline()
301             elif message == 'OFFER_REQUEST':
302                 print_status('Incoming call: we have been asked to create the offer')
303                 self.start_pipeline()
304             elif message.startswith('ERROR'):
305                 print_error(message)
306                 self.close_pipeline()
307                 return 1
308             else:
309                 self.handle_json(message)
310         self.close_pipeline()
311         return 0
312
313     async def stop(self):
314         if self.conn:
315             await self.conn.close()
316         self.conn = None
317
318
319 def check_plugins():
320     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
321               "rtpmanager", "videotestsrc", "audiotestsrc"]
322     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
323     if len(missing):
324         print_error('Missing gstreamer plugins:', missing)
325         return False
326     return True
327
328
329 if __name__ == '__main__':
330     Gst.init(None)
331     if not check_plugins():
332         sys.exit(1)
333     parser = argparse.ArgumentParser()
334     parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
335                         help='Video encoding to negotiate')
336     parser.add_argument('--peer-id', help='String ID of the peer to connect to')
337     parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
338     parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
339                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
340     parser.add_argument('--remote-offerer', default=False, action='store_true',
341                         dest='remote_is_offerer',
342                         help='Request that the peer generate the offer and we\'ll answer')
343     args = parser.parse_args()
344     if not args.peer_id and not args.our_id:
345         print('You must pass either --peer-id or --our-id')
346         sys.exit(1)
347     loop = asyncio.new_event_loop()
348     c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
349     loop.run_until_complete(c.connect())
350     res = loop.run_until_complete(c.loop())
351     sys.exit(res)