3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 # 2022 Nirbheek Chauhan <nirbheek@centricular.com>
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
9 from websockets.version import version as wsv
10 from gi.repository import GstSdp
11 from gi.repository import GstWebRTC
12 from gi.repository import Gst
23 gi.require_version('Gst', '1.0')
24 gi.require_version('GstWebRTC', '1.0')
25 gi.require_version('GstSdp', '1.0')
27 # Ensure that gst-python is installed
29 from gi.overrides import Gst as _
31 print('gstreamer-python binding overrides aren\'t available, please install them')
34 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
35 PIPELINE_DESC_VP8 = '''
36 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
37 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
38 vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
39 queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
40 audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
41 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
43 PIPELINE_DESC_H264 = '''
44 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
45 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
46 x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
47 queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
48 audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
49 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
53 'H264': PIPELINE_DESC_H264,
54 'VP8': PIPELINE_DESC_VP8,
58 def print_status(msg):
63 print(f'!!! {msg}', file=sys.stderr)
66 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
68 Find the payload types for the specified video and audio encoding.
70 Very simplistically finds the first payload type matching the encoding
71 name. More complex applications will want to match caps on
72 profile-level-id, packetization-mode, etc.
76 for i in range(0, sdpmsg.medias_len()):
77 media = sdpmsg.get_media(i)
78 for j in range(0, media.formats_len()):
79 fmt = media.get_format(j)
80 if fmt == 'webrtc-datachannel':
83 caps = media.get_caps_from_media(pt)
84 s = caps.get_structure(0)
85 encoding_name = s['encoding-name']
86 if video_pt is None and encoding_name == video_encoding:
88 elif audio_pt is None and encoding_name == audio_encoding:
90 return {video_encoding: video_pt, audio_encoding: audio_pt}
94 def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
98 self.event_loop = loop
100 # An optional user-specified ID we can use to register
102 # The actual ID we used to register
104 # An optional peer ID we should connect to
105 self.peer_id = peer_id
106 # Whether we will send the offer or the remote peer will
107 self.remote_is_offerer = remote_is_offerer
108 # Video encoding: VP8, H264, etc
109 self.video_encoding = video_encoding.upper()
111 async def send(self, msg):
114 await self.conn.send(msg)
116 async def connect(self):
117 self.conn = await websockets.connect(self.server)
118 if self.our_id is None:
119 self.id_ = str(random.randrange(10, 10000))
121 self.id_ = self.our_id
122 await self.send(f'HELLO {self.id_}')
124 async def setup_call(self):
126 await self.send(f'SESSION {self.peer_id}')
128 def send_soon(self, msg):
129 asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
131 def on_bus_poll_cb(self, bus):
132 def remove_bus_poll():
133 self.event_loop.remove_reader(bus.get_pollfd().fd)
134 self.event_loop.stop()
137 if msg.type == Gst.MessageType.ERROR:
138 err = msg.parse_error()
139 print("ERROR:", err.gerror, err.debug)
142 elif msg.type == Gst.MessageType.EOS:
145 elif msg.type == Gst.MessageType.LATENCY:
146 self.pipe.recalculate_latency()
148 def send_sdp(self, offer):
149 text = offer.sdp.as_text()
150 if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
151 print_status('Sending offer:\n%s' % text)
152 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
153 elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
154 print_status('Sending answer:\n%s' % text)
155 msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
157 raise AssertionError(offer.type)
160 def on_offer_created(self, promise, _, __):
161 assert promise.wait() == Gst.PromiseResult.REPLIED
162 reply = promise.get_reply()
163 offer = reply['offer']
164 promise = Gst.Promise.new()
165 print_status('Offer created, setting local description')
166 self.webrtc.emit('set-local-description', offer, promise)
167 promise.interrupt() # we don't care about the result, discard it
170 def on_negotiation_needed(self, _, create_offer):
172 print_status('Call was connected: creating offer')
173 promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
174 self.webrtc.emit('create-offer', None, promise)
176 def send_ice_candidate_message(self, _, mlineindex, candidate):
177 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
178 self.send_soon(icemsg)
180 def on_incoming_decodebin_stream(self, _, pad):
181 if not pad.has_current_caps():
182 print_error(pad, 'has no caps, ignoring')
185 caps = pad.get_current_caps()
189 if name.startswith('video'):
190 q = Gst.ElementFactory.make('queue')
191 conv = Gst.ElementFactory.make('videoconvert')
192 sink = Gst.ElementFactory.make('autovideosink')
193 self.pipe.add(q, conv, sink)
194 self.pipe.sync_children_states()
195 pad.link(q.get_static_pad('sink'))
198 elif name.startswith('audio'):
199 q = Gst.ElementFactory.make('queue')
200 conv = Gst.ElementFactory.make('audioconvert')
201 resample = Gst.ElementFactory.make('audioresample')
202 sink = Gst.ElementFactory.make('autoaudiosink')
203 self.pipe.add(q, conv, resample, sink)
204 self.pipe.sync_children_states()
205 pad.link(q.get_static_pad('sink'))
210 def on_ice_gathering_state_notify(self, pspec, _):
211 state = self.webrtc.get_property('ice-gathering-state')
212 print_status(f'ICE gathering state changed to {state}')
214 def on_incoming_stream(self, _, pad):
215 if pad.direction != Gst.PadDirection.SRC:
218 decodebin = Gst.ElementFactory.make('decodebin')
219 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
220 self.pipe.add(decodebin)
221 decodebin.sync_state_with_parent()
222 pad.link(decodebin.get_static_pad('sink'))
224 def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
225 print_status(f'Creating pipeline, create_offer: {create_offer}')
226 self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
227 bus = self.pipe.get_bus()
228 self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
229 self.webrtc = self.pipe.get_by_name('sendrecv')
230 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
231 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
232 self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
233 self.webrtc.connect('pad-added', self.on_incoming_stream)
234 self.pipe.set_state(Gst.State.PLAYING)
236 def on_answer_created(self, promise, _, __):
237 assert promise.wait() == Gst.PromiseResult.REPLIED
238 reply = promise.get_reply()
239 answer = reply['answer']
240 promise = Gst.Promise.new()
241 self.webrtc.emit('set-local-description', answer, promise)
242 promise.interrupt() # we don't care about the result, discard it
243 self.send_sdp(answer)
245 def on_offer_set(self, promise, _, __):
246 assert promise.wait() == Gst.PromiseResult.REPLIED
247 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
248 self.webrtc.emit('create-answer', None, promise)
250 def handle_json(self, message):
252 msg = json.loads(message)
253 except json.decoder.JSONDecoderError:
254 print_error('Failed to parse JSON message, this might be a bug')
257 sdp = msg['sdp']['sdp']
258 if msg['sdp']['type'] == 'answer':
259 print_status('Received answer:\n%s' % sdp)
260 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
261 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
262 promise = Gst.Promise.new()
263 self.webrtc.emit('set-remote-description', answer, promise)
264 promise.interrupt() # we don't care about the result, discard it
266 print_status('Received offer:\n%s' % sdp)
267 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
270 print_status('Incoming call: received an offer, creating pipeline')
271 pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
272 assert self.video_encoding in pts
274 self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
278 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
279 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
280 self.webrtc.emit('set-remote-description', offer, promise)
284 candidate = ice['candidate']
285 sdpmlineindex = ice['sdpMLineIndex']
286 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
288 print_error('Unknown JSON message')
290 def close_pipeline(self):
292 self.pipe.set_state(Gst.State.NULL)
296 async def loop(self):
298 async for message in self.conn:
299 print(f'<<< {message}')
300 if message == 'HELLO':
302 # If a peer ID is specified, we want to connect to it. If not,
303 # we wait for an incoming call.
305 print_status(f'Waiting for incoming call: ID is {self.id_}')
307 if self.remote_is_offerer:
308 print_status('Have peer ID: initiating call (will request remote peer to create offer)')
310 print_status('Have peer ID: initiating call (will create offer)')
311 await self.setup_call()
312 elif message == 'SESSION_OK':
313 if self.remote_is_offerer:
314 # We are initiating the call, but we want the remote peer to create the offer
315 print_status('Call was connected: requesting remote peer for offer')
316 await self.send('OFFER_REQUEST')
318 self.start_pipeline()
319 elif message == 'OFFER_REQUEST':
320 print_status('Incoming call: we have been asked to create the offer')
321 self.start_pipeline()
322 elif message.startswith('ERROR'):
324 self.close_pipeline()
327 self.handle_json(message)
328 self.close_pipeline()
331 async def stop(self):
333 await self.conn.close()
338 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
339 "rtpmanager", "videotestsrc", "audiotestsrc"]
340 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
342 print_error('Missing gstreamer plugins:', missing)
347 if __name__ == '__main__':
349 if not check_plugins():
351 parser = argparse.ArgumentParser()
352 parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
353 help='Video encoding to negotiate')
354 parser.add_argument('--peer-id', help='String ID of the peer to connect to')
355 parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
356 parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
357 help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
358 parser.add_argument('--remote-offerer', default=False, action='store_true',
359 dest='remote_is_offerer',
360 help='Request that the peer generate the offer and we\'ll answer')
361 args = parser.parse_args()
362 if not args.peer_id and not args.our_id:
363 print('You must pass either --peer-id or --our-id')
365 loop = asyncio.new_event_loop()
366 c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
367 loop.run_until_complete(c.connect())
368 res = loop.run_until_complete(c.loop())