3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 # 2022 Nirbheek Chauhan <nirbheek@centricular.com>
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
26 # Ensure that gst-python is installed
28 from gi.overrides import Gst as _
30 print('gstreamer-python binding overrides aren\'t available, please install them')
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
34 PIPELINE_DESC_VP8 = '''
35 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
36 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37 vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38 queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
39 audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
42 PIPELINE_DESC_H264 = '''
43 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
44 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
45 x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
46 queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
47 audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
48 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
52 'H264': PIPELINE_DESC_H264,
53 'VP8': PIPELINE_DESC_VP8,
56 from websockets.version import version as wsv
59 def print_status(msg):
64 print(f'!!! {msg}', file=sys.stderr)
67 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
69 Find the payload types for the specified video and audio encoding.
71 Very simplistically finds the first payload type matching the encoding
72 name. More complex applications will want to match caps on
73 profile-level-id, packetization-mode, etc.
77 for i in range(0, sdpmsg.medias_len()):
78 media = sdpmsg.get_media(i)
79 for j in range(0, media.formats_len()):
80 fmt = media.get_format(j)
81 if fmt == 'webrtc-datachannel':
84 caps = media.get_caps_from_media(pt)
85 s = caps.get_structure(0)
86 encoding_name = s['encoding-name']
87 if video_pt is None and encoding_name == video_encoding:
89 elif audio_pt is None and encoding_name == audio_encoding:
91 return {video_encoding: video_pt, audio_encoding: audio_pt}
95 def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
99 self.event_loop = loop
101 # An optional user-specified ID we can use to register
103 # The actual ID we used to register
105 # An optional peer ID we should connect to
106 self.peer_id = peer_id
107 # Whether we will send the offer or the remote peer will
108 self.remote_is_offerer = remote_is_offerer
109 # Video encoding: VP8, H264, etc
110 self.video_encoding = video_encoding.upper()
112 async def send(self, msg):
115 await self.conn.send(msg)
117 async def connect(self):
118 self.conn = await websockets.connect(self.server)
119 if self.our_id is None:
120 self.id_ = str(random.randrange(10, 10000))
122 self.id_ = self.our_id
123 await self.send(f'HELLO {self.id_}')
125 async def setup_call(self):
127 await self.send(f'SESSION {self.peer_id}')
129 def send_soon(self, msg):
130 asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
132 def on_bus_poll_cb(self, bus):
133 def remove_bus_poll():
134 self.event_loop.remove_reader(bus.get_pollfd().fd)
135 self.event_loop.stop()
138 if msg.type == Gst.MessageType.ERROR:
139 err = msg.parse_error()
140 print("ERROR:", err.gerror, err.debug)
143 elif msg.type == Gst.MessageType.EOS:
146 elif msg.type == Gst.MessageType.LATENCY:
147 self.pipe.recalculate_latency()
149 def send_sdp(self, offer):
150 text = offer.sdp.as_text()
151 if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
152 print_status('Sending offer:\n%s' % text)
153 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
154 elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
155 print_status('Sending answer:\n%s' % text)
156 msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
158 raise AssertionError(offer.type)
161 def on_offer_created(self, promise, _, __):
162 assert(promise.wait() == Gst.PromiseResult.REPLIED)
163 reply = promise.get_reply()
164 offer = reply['offer']
165 promise = Gst.Promise.new()
166 print_status('Offer created, setting local description')
167 self.webrtc.emit('set-local-description', offer, promise)
168 promise.interrupt() # we don't care about the result, discard it
171 def on_negotiation_needed(self, _, create_offer):
173 print_status('Call was connected: creating offer')
174 promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
175 self.webrtc.emit('create-offer', None, promise)
177 def send_ice_candidate_message(self, _, mlineindex, candidate):
178 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
179 self.send_soon(icemsg)
181 def on_incoming_decodebin_stream(self, _, pad):
182 if not pad.has_current_caps():
183 print_error(pad, 'has no caps, ignoring')
186 caps = pad.get_current_caps()
190 if name.startswith('video'):
191 q = Gst.ElementFactory.make('queue')
192 conv = Gst.ElementFactory.make('videoconvert')
193 sink = Gst.ElementFactory.make('autovideosink')
194 self.pipe.add(q, conv, sink)
195 self.pipe.sync_children_states()
196 pad.link(q.get_static_pad('sink'))
199 elif name.startswith('audio'):
200 q = Gst.ElementFactory.make('queue')
201 conv = Gst.ElementFactory.make('audioconvert')
202 resample = Gst.ElementFactory.make('audioresample')
203 sink = Gst.ElementFactory.make('autoaudiosink')
204 self.pipe.add(q, conv, resample, sink)
205 self.pipe.sync_children_states()
206 pad.link(q.get_static_pad('sink'))
211 def on_ice_gathering_state_notify(self, pspec, _):
212 state = self.webrtc.get_property('ice-gathering-state')
213 print_status(f'ICE gathering state changed to {state}')
215 def on_incoming_stream(self, _, pad):
216 if pad.direction != Gst.PadDirection.SRC:
219 decodebin = Gst.ElementFactory.make('decodebin')
220 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
221 self.pipe.add(decodebin)
222 decodebin.sync_state_with_parent()
223 pad.link(decodebin.get_static_pad('sink'))
225 def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
226 print_status(f'Creating pipeline, create_offer: {create_offer}')
227 self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
228 bus = self.pipe.get_bus()
229 self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
230 self.webrtc = self.pipe.get_by_name('sendrecv')
231 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
232 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
233 self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
234 self.webrtc.connect('pad-added', self.on_incoming_stream)
235 self.pipe.set_state(Gst.State.PLAYING)
237 def on_answer_created(self, promise, _, __):
238 assert(promise.wait() == Gst.PromiseResult.REPLIED)
239 reply = promise.get_reply()
240 answer = reply['answer']
241 promise = Gst.Promise.new()
242 self.webrtc.emit('set-local-description', answer, promise)
243 promise.interrupt() # we don't care about the result, discard it
244 self.send_sdp(answer)
246 def on_offer_set(self, promise, _, __):
247 assert(promise.wait() == Gst.PromiseResult.REPLIED)
248 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
249 self.webrtc.emit('create-answer', None, promise)
251 def handle_json(self, message):
253 msg = json.loads(message)
254 except json.decoder.JSONDecoderError:
255 print_error('Failed to parse JSON message, this might be a bug')
258 sdp = msg['sdp']['sdp']
259 if msg['sdp']['type'] == 'answer':
260 print_status('Received answer:\n%s' % sdp)
261 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
262 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
263 promise = Gst.Promise.new()
264 self.webrtc.emit('set-remote-description', answer, promise)
265 promise.interrupt() # we don't care about the result, discard it
267 print_status('Received offer:\n%s' % sdp)
268 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
271 print_status('Incoming call: received an offer, creating pipeline')
272 pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
273 assert(self.video_encoding in pts)
274 assert('OPUS' in pts)
275 self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
279 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
280 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
281 self.webrtc.emit('set-remote-description', offer, promise)
285 candidate = ice['candidate']
286 sdpmlineindex = ice['sdpMLineIndex']
287 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
289 print_error('Unknown JSON message')
291 def close_pipeline(self):
293 self.pipe.set_state(Gst.State.NULL)
297 async def loop(self):
299 async for message in self.conn:
300 print(f'<<< {message}')
301 if message == 'HELLO':
303 # If a peer ID is specified, we want to connect to it. If not,
304 # we wait for an incoming call.
306 print_status(f'Waiting for incoming call: ID is {self.id_}')
308 if self.remote_is_offerer:
309 print_status('Have peer ID: initiating call (will request remote peer to create offer)')
311 print_status('Have peer ID: initiating call (will create offer)')
312 await self.setup_call()
313 elif message == 'SESSION_OK':
314 if self.remote_is_offerer:
315 # We are initiating the call, but we want the remote peer to create the offer
316 print_status('Call was connected: requesting remote peer for offer')
317 await self.send('OFFER_REQUEST')
319 self.start_pipeline()
320 elif message == 'OFFER_REQUEST':
321 print_status('Incoming call: we have been asked to create the offer')
322 self.start_pipeline()
323 elif message.startswith('ERROR'):
325 self.close_pipeline()
328 self.handle_json(message)
329 self.close_pipeline()
332 async def stop(self):
334 await self.conn.close()
339 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
340 "rtpmanager", "videotestsrc", "audiotestsrc"]
341 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
343 print_error('Missing gstreamer plugins:', missing)
348 if __name__ == '__main__':
350 if not check_plugins():
352 parser = argparse.ArgumentParser()
353 parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
354 help='Video encoding to negotiate')
355 parser.add_argument('--peer-id', help='String ID of the peer to connect to')
356 parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
357 parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
358 help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
359 parser.add_argument('--remote-offerer', default=False, action='store_true',
360 dest='remote_is_offerer',
361 help='Request that the peer generate the offer and we\'ll answer')
362 args = parser.parse_args()
363 if not args.peer_id and not args.our_id:
364 print('You must pass either --peer-id or --our-id')
366 loop = asyncio.new_event_loop()
367 c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
368 loop.run_until_complete(c.connect())
369 res = loop.run_until_complete(c.loop())