3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 # 2022 Nirbheek Chauhan <nirbheek@centricular.com>
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
26 # Ensure that gst-python is installed
28 from gi.overrides import Gst as _
30 print('gstreamer-python binding overrides aren\'t available, please install them')
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
35 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
36 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37 vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38 queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
39 audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
43 from websockets.version import version as wsv
46 def print_status(msg):
51 print(f'!!! {msg}', file=sys.stderr)
55 def __init__(self, loop, our_id, peer_id, server, remote_is_offerer):
59 self.event_loop = loop
61 # An optional user-specified ID we can use to register
63 # The actual ID we used to register
65 # An optional peer ID we should connect to
66 self.peer_id = peer_id
67 # Whether we will send the offer or the remote peer will
68 self.remote_is_offerer = remote_is_offerer
70 async def send(self, msg):
73 await self.conn.send(msg)
75 async def connect(self):
76 self.conn = await websockets.connect(self.server)
77 if self.our_id is None:
78 self.id_ = str(random.randrange(10, 10000))
80 self.id_ = self.our_id
81 await self.send(f'HELLO {self.id_}')
83 async def setup_call(self):
85 await self.send(f'SESSION {self.peer_id}')
87 def send_soon(self, msg):
88 asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
90 def send_sdp(self, offer):
91 text = offer.sdp.as_text()
92 if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
93 print_status('Sending offer:\n%s' % text)
94 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
95 elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
96 print_status('Sending answer:\n%s' % text)
97 msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
99 raise AssertionError(offer.type)
102 def on_offer_created(self, promise, _, __):
103 assert(promise.wait() == Gst.PromiseResult.REPLIED)
104 reply = promise.get_reply()
105 offer = reply['offer']
106 promise = Gst.Promise.new()
107 print_status('Offer created, setting local description')
108 self.webrtc.emit('set-local-description', offer, promise)
109 promise.interrupt() # we don't care about the result, discard it
112 def on_negotiation_needed(self, _, create_offer):
114 print_status('Call was connected: creating offer')
115 promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
116 self.webrtc.emit('create-offer', None, promise)
117 elif self.remote_is_offerer:
118 # We are initiating the call, but we want the remote peer to create the offer
119 print_status('Call was connected: requesting remote peer for offer')
120 self.send_soon('OFFER_REQUEST')
122 def send_ice_candidate_message(self, _, mlineindex, candidate):
123 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
124 self.send_soon(icemsg)
126 def on_incoming_decodebin_stream(self, _, pad):
127 if not pad.has_current_caps():
128 print_error(pad, 'has no caps, ignoring')
131 caps = pad.get_current_caps()
135 if name.startswith('video'):
136 q = Gst.ElementFactory.make('queue')
137 conv = Gst.ElementFactory.make('videoconvert')
138 sink = Gst.ElementFactory.make('autovideosink')
139 self.pipe.add(q, conv, sink)
140 self.pipe.sync_children_states()
141 pad.link(q.get_static_pad('sink'))
144 elif name.startswith('audio'):
145 q = Gst.ElementFactory.make('queue')
146 conv = Gst.ElementFactory.make('audioconvert')
147 resample = Gst.ElementFactory.make('audioresample')
148 sink = Gst.ElementFactory.make('autoaudiosink')
149 self.pipe.add(q, conv, resample, sink)
150 self.pipe.sync_children_states()
151 pad.link(q.get_static_pad('sink'))
156 def on_ice_gathering_state_notify(self, pspec, _):
157 state = self.webrtc.get_property('ice-gathering-state')
158 print_status(f'ICE gathering state changed to {state}')
160 def on_incoming_stream(self, _, pad):
161 if pad.direction != Gst.PadDirection.SRC:
164 decodebin = Gst.ElementFactory.make('decodebin')
165 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
166 self.pipe.add(decodebin)
167 decodebin.sync_state_with_parent()
168 self.webrtc.link(decodebin)
170 def start_pipeline(self, create_offer=True):
171 print_status(f'Creating pipeline, create_offer: {create_offer}')
172 self.pipe = Gst.parse_launch(PIPELINE_DESC)
173 self.webrtc = self.pipe.get_by_name('sendrecv')
174 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
175 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
176 self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
177 self.webrtc.connect('pad-added', self.on_incoming_stream)
178 self.pipe.set_state(Gst.State.PLAYING)
180 def on_answer_created(self, promise, _, __):
181 assert(promise.wait() == Gst.PromiseResult.REPLIED)
182 reply = promise.get_reply()
183 answer = reply['answer']
184 promise = Gst.Promise.new()
185 self.webrtc.emit('set-local-description', answer, promise)
186 promise.interrupt() # we don't care about the result, discard it
187 self.send_sdp(answer)
189 def on_offer_set(self, promise, _, __):
190 assert(promise.wait() == Gst.PromiseResult.REPLIED)
191 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
192 self.webrtc.emit('create-answer', None, promise)
194 def handle_json(self, message):
197 msg = json.loads(message)
198 except json.decoder.JSONDecoderError:
199 print_error('Failed to parse JSON message, this might be a bug')
202 sdp = msg['sdp']['sdp']
203 if msg['sdp']['type'] == 'answer':
204 print_status('Received answer:\n%s' % sdp)
205 res, sdpmsg = GstSdp.SDPMessage.new()
206 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
207 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
208 promise = Gst.Promise.new()
209 self.webrtc.emit('set-remote-description', answer, promise)
210 promise.interrupt() # we don't care about the result, discard it
212 print_status('Received offer:\n%s' % sdp)
213 res, sdpmsg = GstSdp.SDPMessage.new()
214 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
215 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
216 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
217 self.webrtc.emit('set-remote-description', offer, promise)
220 candidate = ice['candidate']
221 sdpmlineindex = ice['sdpMLineIndex']
222 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
224 print_error('Unknown JSON message')
226 def close_pipeline(self):
228 self.pipe.set_state(Gst.State.NULL)
232 def is_incoming_offer(self, msg):
235 if self.remote_is_offerer:
239 async def loop(self):
241 async for message in self.conn:
242 print(f'<<< {message}')
243 if message == 'HELLO':
245 # If a peer ID is specified, we want to connect to it. If not,
246 # we wait for an incoming call.
248 print_status(f'Waiting for incoming call: ID is {self.id_}')
250 if self.remote_is_offerer:
251 print_status('Have peer ID: initiating call (will request remote peer to create offer)')
253 print_status('Have peer ID: initiating call (will create offer)')
254 await self.setup_call()
255 elif message == 'SESSION_OK':
256 if self.remote_is_offerer:
257 self.start_pipeline(create_offer=False)
259 self.start_pipeline()
260 elif message == 'OFFER_REQUEST':
261 print_status('Incoming call: we have been asked to create the offer')
262 self.start_pipeline()
263 elif message.startswith('ERROR'):
265 self.close_pipeline()
268 if self.is_incoming_offer(message):
269 print_status('Incoming call: received an offer, creating pipeline')
270 self.start_pipeline(create_offer=False)
271 self.handle_json(message)
272 self.close_pipeline()
275 async def stop(self):
277 await self.conn.close()
282 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
283 "rtpmanager", "videotestsrc", "audiotestsrc"]
284 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
286 print_error('Missing gstreamer plugins:', missing)
291 if __name__ == '__main__':
293 if not check_plugins():
295 parser = argparse.ArgumentParser()
296 parser.add_argument('--peer-id', help='String ID of the peer to connect to')
297 parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
298 parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
299 help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
300 parser.add_argument('--remote-offerer', default=False, action='store_true',
301 dest='remote_is_offerer',
302 help='Request that the peer generate the offer and we\'ll answer')
303 args = parser.parse_args()
304 if not args.peer_id and not args.our_id:
305 print('You must pass either --peer-id or --our-id')
307 loop = asyncio.new_event_loop()
308 c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer)
309 loop.run_until_complete(c.connect())
310 res = loop.run_until_complete(c.loop())