3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 # 2022 Nirbheek Chauhan <nirbheek@centricular.com>
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
26 # Ensure that gst-python is installed
28 from gi.overrides import Gst as _
30 print('gstreamer-python binding overrides aren\'t available, please install them')
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
34 PIPELINE_DESC_VP8 = '''
35 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
36 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37 vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38 queue ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv.
39 audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
42 PIPELINE_DESC_H264 = '''
43 webrtcbin name=sendrecv latency=0 stun-server=stun://stun.l.google.com:19302
44 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
45 x264enc tune=zerolatency speed-preset=ultrafast key-int-max=30 intra-refresh=true ! rtph264pay aggregate-mode=zero-latency config-interval=-1 !
46 queue ! application/x-rtp,media=video,encoding-name=H264,payload={video_pt} ! sendrecv.
47 audiotestsrc is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
48 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv.
52 'H264': PIPELINE_DESC_H264,
53 'VP8': PIPELINE_DESC_VP8,
56 from websockets.version import version as wsv
59 def print_status(msg):
64 print(f'!!! {msg}', file=sys.stderr)
67 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
69 Find the payload types for the specified video and audio encoding.
71 Very simplistically finds the first payload type matching the encoding
72 name. More complex applications will want to match caps on
73 profile-level-id, packetization-mode, etc.
77 for i in range(0, sdpmsg.medias_len()):
78 media = sdpmsg.get_media(i)
79 for j in range(0, media.formats_len()):
80 fmt = media.get_format(j)
81 if fmt == 'webrtc-datachannel':
84 caps = media.get_caps_from_media(pt)
85 s = caps.get_structure(0)
86 encoding_name = s['encoding-name']
87 if video_pt is None and encoding_name == video_encoding:
89 elif audio_pt is None and encoding_name == audio_encoding:
91 return {video_encoding: video_pt, audio_encoding: audio_pt}
95 def __init__(self, loop, our_id, peer_id, server, remote_is_offerer, video_encoding):
99 self.event_loop = loop
101 # An optional user-specified ID we can use to register
103 # The actual ID we used to register
105 # An optional peer ID we should connect to
106 self.peer_id = peer_id
107 # Whether we will send the offer or the remote peer will
108 self.remote_is_offerer = remote_is_offerer
109 # Video encoding: VP8, H264, etc
110 self.video_encoding = video_encoding.upper()
112 async def send(self, msg):
115 await self.conn.send(msg)
117 async def connect(self):
118 self.conn = await websockets.connect(self.server)
119 if self.our_id is None:
120 self.id_ = str(random.randrange(10, 10000))
122 self.id_ = self.our_id
123 await self.send(f'HELLO {self.id_}')
125 async def setup_call(self):
127 await self.send(f'SESSION {self.peer_id}')
129 def send_soon(self, msg):
130 asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
132 def send_sdp(self, offer):
133 text = offer.sdp.as_text()
134 if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
135 print_status('Sending offer:\n%s' % text)
136 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
137 elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
138 print_status('Sending answer:\n%s' % text)
139 msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
141 raise AssertionError(offer.type)
144 def on_offer_created(self, promise, _, __):
145 assert(promise.wait() == Gst.PromiseResult.REPLIED)
146 reply = promise.get_reply()
147 offer = reply['offer']
148 promise = Gst.Promise.new()
149 print_status('Offer created, setting local description')
150 self.webrtc.emit('set-local-description', offer, promise)
151 promise.interrupt() # we don't care about the result, discard it
154 def on_negotiation_needed(self, _, create_offer):
156 print_status('Call was connected: creating offer')
157 promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
158 self.webrtc.emit('create-offer', None, promise)
160 def send_ice_candidate_message(self, _, mlineindex, candidate):
161 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
162 self.send_soon(icemsg)
164 def on_incoming_decodebin_stream(self, _, pad):
165 if not pad.has_current_caps():
166 print_error(pad, 'has no caps, ignoring')
169 caps = pad.get_current_caps()
173 if name.startswith('video'):
174 q = Gst.ElementFactory.make('queue')
175 conv = Gst.ElementFactory.make('videoconvert')
176 sink = Gst.ElementFactory.make('autovideosink')
177 self.pipe.add(q, conv, sink)
178 self.pipe.sync_children_states()
179 pad.link(q.get_static_pad('sink'))
182 elif name.startswith('audio'):
183 q = Gst.ElementFactory.make('queue')
184 conv = Gst.ElementFactory.make('audioconvert')
185 resample = Gst.ElementFactory.make('audioresample')
186 sink = Gst.ElementFactory.make('autoaudiosink')
187 self.pipe.add(q, conv, resample, sink)
188 self.pipe.sync_children_states()
189 pad.link(q.get_static_pad('sink'))
194 def on_ice_gathering_state_notify(self, pspec, _):
195 state = self.webrtc.get_property('ice-gathering-state')
196 print_status(f'ICE gathering state changed to {state}')
198 def on_incoming_stream(self, _, pad):
199 if pad.direction != Gst.PadDirection.SRC:
202 decodebin = Gst.ElementFactory.make('decodebin')
203 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
204 self.pipe.add(decodebin)
205 decodebin.sync_state_with_parent()
206 pad.link(decodebin.get_static_pad('sink'))
208 def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
209 print_status(f'Creating pipeline, create_offer: {create_offer}')
210 self.pipe = Gst.parse_launch(PIPELINE_DESC[self.video_encoding].format(video_pt=video_pt, audio_pt=audio_pt))
211 self.webrtc = self.pipe.get_by_name('sendrecv')
212 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
213 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
214 self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
215 self.webrtc.connect('pad-added', self.on_incoming_stream)
216 self.pipe.set_state(Gst.State.PLAYING)
218 def on_answer_created(self, promise, _, __):
219 assert(promise.wait() == Gst.PromiseResult.REPLIED)
220 reply = promise.get_reply()
221 answer = reply['answer']
222 promise = Gst.Promise.new()
223 self.webrtc.emit('set-local-description', answer, promise)
224 promise.interrupt() # we don't care about the result, discard it
225 self.send_sdp(answer)
227 def on_offer_set(self, promise, _, __):
228 assert(promise.wait() == Gst.PromiseResult.REPLIED)
229 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
230 self.webrtc.emit('create-answer', None, promise)
232 def handle_json(self, message):
234 msg = json.loads(message)
235 except json.decoder.JSONDecoderError:
236 print_error('Failed to parse JSON message, this might be a bug')
239 sdp = msg['sdp']['sdp']
240 if msg['sdp']['type'] == 'answer':
241 print_status('Received answer:\n%s' % sdp)
242 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
243 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
244 promise = Gst.Promise.new()
245 self.webrtc.emit('set-remote-description', answer, promise)
246 promise.interrupt() # we don't care about the result, discard it
248 print_status('Received offer:\n%s' % sdp)
249 res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
252 print_status('Incoming call: received an offer, creating pipeline')
253 pts = get_payload_types(sdpmsg, video_encoding=self.video_encoding, audio_encoding='OPUS')
254 assert(self.video_encoding in pts)
255 assert('OPUS' in pts)
256 self.start_pipeline(create_offer=False, video_pt=pts[self.video_encoding], audio_pt=pts['OPUS'])
260 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
261 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
262 self.webrtc.emit('set-remote-description', offer, promise)
266 candidate = ice['candidate']
267 sdpmlineindex = ice['sdpMLineIndex']
268 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
270 print_error('Unknown JSON message')
272 def close_pipeline(self):
274 self.pipe.set_state(Gst.State.NULL)
278 async def loop(self):
280 async for message in self.conn:
281 print(f'<<< {message}')
282 if message == 'HELLO':
284 # If a peer ID is specified, we want to connect to it. If not,
285 # we wait for an incoming call.
287 print_status(f'Waiting for incoming call: ID is {self.id_}')
289 if self.remote_is_offerer:
290 print_status('Have peer ID: initiating call (will request remote peer to create offer)')
292 print_status('Have peer ID: initiating call (will create offer)')
293 await self.setup_call()
294 elif message == 'SESSION_OK':
295 if self.remote_is_offerer:
296 # We are initiating the call, but we want the remote peer to create the offer
297 print_status('Call was connected: requesting remote peer for offer')
298 await self.send('OFFER_REQUEST')
300 self.start_pipeline()
301 elif message == 'OFFER_REQUEST':
302 print_status('Incoming call: we have been asked to create the offer')
303 self.start_pipeline()
304 elif message.startswith('ERROR'):
306 self.close_pipeline()
309 self.handle_json(message)
310 self.close_pipeline()
313 async def stop(self):
315 await self.conn.close()
320 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
321 "rtpmanager", "videotestsrc", "audiotestsrc"]
322 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
324 print_error('Missing gstreamer plugins:', missing)
329 if __name__ == '__main__':
331 if not check_plugins():
333 parser = argparse.ArgumentParser()
334 parser.add_argument('--video-encoding', default='vp8', nargs='?', choices=['vp8', 'h264'],
335 help='Video encoding to negotiate')
336 parser.add_argument('--peer-id', help='String ID of the peer to connect to')
337 parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
338 parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
339 help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
340 parser.add_argument('--remote-offerer', default=False, action='store_true',
341 dest='remote_is_offerer',
342 help='Request that the peer generate the offer and we\'ll answer')
343 args = parser.parse_args()
344 if not args.peer_id and not args.our_id:
345 print('You must pass either --peer-id or --our-id')
347 loop = asyncio.new_event_loop()
348 c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer, args.video_encoding)
349 loop.run_until_complete(c.connect())
350 res = loop.run_until_complete(c.loop())