3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 # 2022 Nirbheek Chauhan <nirbheek@centricular.com>
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
26 # Ensure that gst-python is installed
28 from gi.overrides import Gst as _
30 print('gstreamer-python binding overrides aren\'t available, please install them')
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
35 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
36 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37 vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38 queue ! application/x-rtp,media=video,encoding-name=VP8,payload={vp8_pt} ! sendrecv.
39 audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload={opus_pt} ! sendrecv.
43 from websockets.version import version as wsv
46 def print_status(msg):
51 print(f'!!! {msg}', file=sys.stderr)
54 def get_payload_types(sdpmsg, video_encoding, audio_encoding):
56 Find the payload types for the specified video and audio encoding.
58 Very simplistically finds the first payload type matching the encoding
59 name. More complex applications will want to match caps on
60 profile-level-id, packetization-mode, etc.
64 for i in range(0, sdpmsg.medias_len()):
65 media = sdpmsg.get_media(i)
66 for j in range(0, media.formats_len()):
67 fmt = media.get_format(j)
68 if fmt == 'webrtc-datachannel':
71 caps = media.get_caps_from_media(pt)
72 s = caps.get_structure(0)
73 encoding_name = s['encoding-name']
74 if video_pt is None and encoding_name == video_encoding:
76 elif audio_pt is None and encoding_name == audio_encoding:
78 return {video_encoding: video_pt, audio_encoding: audio_pt}
82 def __init__(self, loop, our_id, peer_id, server, remote_is_offerer):
86 self.event_loop = loop
88 # An optional user-specified ID we can use to register
90 # The actual ID we used to register
92 # An optional peer ID we should connect to
93 self.peer_id = peer_id
94 # Whether we will send the offer or the remote peer will
95 self.remote_is_offerer = remote_is_offerer
97 async def send(self, msg):
100 await self.conn.send(msg)
102 async def connect(self):
103 self.conn = await websockets.connect(self.server)
104 if self.our_id is None:
105 self.id_ = str(random.randrange(10, 10000))
107 self.id_ = self.our_id
108 await self.send(f'HELLO {self.id_}')
110 async def setup_call(self):
112 await self.send(f'SESSION {self.peer_id}')
114 def send_soon(self, msg):
115 asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
117 def send_sdp(self, offer):
118 text = offer.sdp.as_text()
119 if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
120 print_status('Sending offer:\n%s' % text)
121 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
122 elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
123 print_status('Sending answer:\n%s' % text)
124 msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
126 raise AssertionError(offer.type)
129 def on_offer_created(self, promise, _, __):
130 assert(promise.wait() == Gst.PromiseResult.REPLIED)
131 reply = promise.get_reply()
132 offer = reply['offer']
133 promise = Gst.Promise.new()
134 print_status('Offer created, setting local description')
135 self.webrtc.emit('set-local-description', offer, promise)
136 promise.interrupt() # we don't care about the result, discard it
139 def on_negotiation_needed(self, _, create_offer):
141 print_status('Call was connected: creating offer')
142 promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
143 self.webrtc.emit('create-offer', None, promise)
145 def send_ice_candidate_message(self, _, mlineindex, candidate):
146 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
147 self.send_soon(icemsg)
149 def on_incoming_decodebin_stream(self, _, pad):
150 if not pad.has_current_caps():
151 print_error(pad, 'has no caps, ignoring')
154 caps = pad.get_current_caps()
158 if name.startswith('video'):
159 q = Gst.ElementFactory.make('queue')
160 conv = Gst.ElementFactory.make('videoconvert')
161 sink = Gst.ElementFactory.make('autovideosink')
162 self.pipe.add(q, conv, sink)
163 self.pipe.sync_children_states()
164 pad.link(q.get_static_pad('sink'))
167 elif name.startswith('audio'):
168 q = Gst.ElementFactory.make('queue')
169 conv = Gst.ElementFactory.make('audioconvert')
170 resample = Gst.ElementFactory.make('audioresample')
171 sink = Gst.ElementFactory.make('autoaudiosink')
172 self.pipe.add(q, conv, resample, sink)
173 self.pipe.sync_children_states()
174 pad.link(q.get_static_pad('sink'))
179 def on_ice_gathering_state_notify(self, pspec, _):
180 state = self.webrtc.get_property('ice-gathering-state')
181 print_status(f'ICE gathering state changed to {state}')
183 def on_incoming_stream(self, _, pad):
184 if pad.direction != Gst.PadDirection.SRC:
187 decodebin = Gst.ElementFactory.make('decodebin')
188 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
189 self.pipe.add(decodebin)
190 decodebin.sync_state_with_parent()
191 pad.link(decodebin.get_static_pad('sink'))
193 def start_pipeline(self, create_offer=True, opus_pt=96, vp8_pt=97):
194 print_status(f'Creating pipeline, create_offer: {create_offer}')
195 self.pipe = Gst.parse_launch(PIPELINE_DESC.format(vp8_pt=vp8_pt, opus_pt=opus_pt))
196 self.webrtc = self.pipe.get_by_name('sendrecv')
197 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
198 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
199 self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
200 self.webrtc.connect('pad-added', self.on_incoming_stream)
201 self.pipe.set_state(Gst.State.PLAYING)
203 def on_answer_created(self, promise, _, __):
204 assert(promise.wait() == Gst.PromiseResult.REPLIED)
205 reply = promise.get_reply()
206 answer = reply['answer']
207 promise = Gst.Promise.new()
208 self.webrtc.emit('set-local-description', answer, promise)
209 promise.interrupt() # we don't care about the result, discard it
210 self.send_sdp(answer)
212 def on_offer_set(self, promise, _, __):
213 assert(promise.wait() == Gst.PromiseResult.REPLIED)
214 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
215 self.webrtc.emit('create-answer', None, promise)
217 def handle_json(self, message):
219 msg = json.loads(message)
220 except json.decoder.JSONDecoderError:
221 print_error('Failed to parse JSON message, this might be a bug')
224 sdp = msg['sdp']['sdp']
225 if msg['sdp']['type'] == 'answer':
226 print_status('Received answer:\n%s' % sdp)
227 res, sdpmsg = GstSdp.SDPMessage.new()
228 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
229 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
230 promise = Gst.Promise.new()
231 self.webrtc.emit('set-remote-description', answer, promise)
232 promise.interrupt() # we don't care about the result, discard it
234 print_status('Received offer:\n%s' % sdp)
235 res, sdpmsg = GstSdp.SDPMessage.new()
236 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
239 print_status('Incoming call: received an offer, creating pipeline')
240 pts = get_payload_types(sdpmsg, video_encoding='VP8', audio_encoding='OPUS')
242 assert('OPUS' in pts)
243 self.start_pipeline(create_offer=False, vp8_pt=pts['VP8'], opus_pt=pts['OPUS'])
247 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
248 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
249 self.webrtc.emit('set-remote-description', offer, promise)
253 candidate = ice['candidate']
254 sdpmlineindex = ice['sdpMLineIndex']
255 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
257 print_error('Unknown JSON message')
259 def close_pipeline(self):
261 self.pipe.set_state(Gst.State.NULL)
265 async def loop(self):
267 async for message in self.conn:
268 print(f'<<< {message}')
269 if message == 'HELLO':
271 # If a peer ID is specified, we want to connect to it. If not,
272 # we wait for an incoming call.
274 print_status(f'Waiting for incoming call: ID is {self.id_}')
276 if self.remote_is_offerer:
277 print_status('Have peer ID: initiating call (will request remote peer to create offer)')
279 print_status('Have peer ID: initiating call (will create offer)')
280 await self.setup_call()
281 elif message == 'SESSION_OK':
282 if self.remote_is_offerer:
283 # We are initiating the call, but we want the remote peer to create the offer
284 print_status('Call was connected: requesting remote peer for offer')
285 await self.send('OFFER_REQUEST')
287 self.start_pipeline()
288 elif message == 'OFFER_REQUEST':
289 print_status('Incoming call: we have been asked to create the offer')
290 self.start_pipeline()
291 elif message.startswith('ERROR'):
293 self.close_pipeline()
296 self.handle_json(message)
297 self.close_pipeline()
300 async def stop(self):
302 await self.conn.close()
307 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
308 "rtpmanager", "videotestsrc", "audiotestsrc"]
309 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
311 print_error('Missing gstreamer plugins:', missing)
316 if __name__ == '__main__':
318 if not check_plugins():
320 parser = argparse.ArgumentParser()
321 parser.add_argument('--peer-id', help='String ID of the peer to connect to')
322 parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
323 parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
324 help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
325 parser.add_argument('--remote-offerer', default=False, action='store_true',
326 dest='remote_is_offerer',
327 help='Request that the peer generate the offer and we\'ll answer')
328 args = parser.parse_args()
329 if not args.peer_id and not args.our_id:
330 print('You must pass either --peer-id or --our-id')
332 loop = asyncio.new_event_loop()
333 c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer)
334 loop.run_until_complete(c.connect())
335 res = loop.run_until_complete(c.loop())