b7c182f18c7b24b507052f3b4c523bc3c6870561
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 #               2022 Nirbheek Chauhan <nirbheek@centricular.com>
5 #
6 # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
7 # with a browser JS app, implemented in Python.
8
9 import random
10 import ssl
11 import websockets
12 import asyncio
13 import os
14 import sys
15 import json
16 import argparse
17
18 import gi
19 gi.require_version('Gst', '1.0')
20 from gi.repository import Gst
21 gi.require_version('GstWebRTC', '1.0')
22 from gi.repository import GstWebRTC
23 gi.require_version('GstSdp', '1.0')
24 from gi.repository import GstSdp
25
26 # Ensure that gst-python is installed
27 try:
28     from gi.overrides import Gst as _
29 except ImportError:
30     print('gstreamer-python binding overrides aren\'t available, please install them')
31     raise
32
33 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
34 PIPELINE_DESC = '''
35 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
36  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
37   vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay picture-id-mode=15-bit !
38   queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
39  audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
40   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
41 '''
42
43 from websockets.version import version as wsv
44
45
46 def print_status(msg):
47     print(f'--- {msg}')
48
49
50 def print_error(msg):
51     print(f'!!! {msg}', file=sys.stderr)
52
53
54 class WebRTCClient:
55     def __init__(self, loop, our_id, peer_id, server, remote_is_offerer):
56         self.conn = None
57         self.pipe = None
58         self.webrtc = None
59         self.event_loop = loop
60         self.server = server
61         # An optional user-specified ID we can use to register
62         self.our_id = our_id
63         # The actual ID we used to register
64         self.id_ = None
65         # An optional peer ID we should connect to
66         self.peer_id = peer_id
67         # Whether we will send the offer or the remote peer will
68         self.remote_is_offerer = remote_is_offerer
69
70     async def send(self, msg):
71         assert self.conn
72         print(f'>>> {msg}')
73         await self.conn.send(msg)
74
75     async def connect(self):
76         self.conn = await websockets.connect(self.server)
77         if self.our_id is None:
78             self.id_ = str(random.randrange(10, 10000))
79         else:
80             self.id_ = self.our_id
81         await self.send(f'HELLO {self.id_}')
82
83     async def setup_call(self):
84         assert self.peer_id
85         await self.send(f'SESSION {self.peer_id}')
86
87     def send_soon(self, msg):
88         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
89
90     def send_sdp(self, offer):
91         text = offer.sdp.as_text()
92         if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
93             print_status('Sending offer:\n%s' % text)
94             msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
95         elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
96             print_status('Sending answer:\n%s' % text)
97             msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
98         else:
99             raise AssertionError(offer.type)
100         self.send_soon(msg)
101
102     def on_offer_created(self, promise, _, __):
103         assert(promise.wait() == Gst.PromiseResult.REPLIED)
104         reply = promise.get_reply()
105         offer = reply['offer']
106         promise = Gst.Promise.new()
107         print_status('Offer created, setting local description')
108         self.webrtc.emit('set-local-description', offer, promise)
109         promise.interrupt()  # we don't care about the result, discard it
110         self.send_sdp(offer)
111
112     def on_negotiation_needed(self, _, create_offer):
113         if create_offer:
114             print_status('Call was connected: creating offer')
115             promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None)
116             self.webrtc.emit('create-offer', None, promise)
117         elif self.remote_is_offerer:
118             # We are initiating the call, but we want the remote peer to create the offer
119             print_status('Call was connected: requesting remote peer for offer')
120             self.send_soon('OFFER_REQUEST')
121
122     def send_ice_candidate_message(self, _, mlineindex, candidate):
123         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
124         self.send_soon(icemsg)
125
126     def on_incoming_decodebin_stream(self, _, pad):
127         if not pad.has_current_caps():
128             print_error(pad, 'has no caps, ignoring')
129             return
130
131         caps = pad.get_current_caps()
132         assert (len(caps))
133         s = caps[0]
134         name = s.get_name()
135         if name.startswith('video'):
136             q = Gst.ElementFactory.make('queue')
137             conv = Gst.ElementFactory.make('videoconvert')
138             sink = Gst.ElementFactory.make('autovideosink')
139             self.pipe.add(q, conv, sink)
140             self.pipe.sync_children_states()
141             pad.link(q.get_static_pad('sink'))
142             q.link(conv)
143             conv.link(sink)
144         elif name.startswith('audio'):
145             q = Gst.ElementFactory.make('queue')
146             conv = Gst.ElementFactory.make('audioconvert')
147             resample = Gst.ElementFactory.make('audioresample')
148             sink = Gst.ElementFactory.make('autoaudiosink')
149             self.pipe.add(q, conv, resample, sink)
150             self.pipe.sync_children_states()
151             pad.link(q.get_static_pad('sink'))
152             q.link(conv)
153             conv.link(resample)
154             resample.link(sink)
155
156     def on_ice_gathering_state_notify(self, pspec, _):
157         state = self.webrtc.get_property('ice-gathering-state')
158         print_status(f'ICE gathering state changed to {state}')
159
160     def on_incoming_stream(self, _, pad):
161         if pad.direction != Gst.PadDirection.SRC:
162             return
163
164         decodebin = Gst.ElementFactory.make('decodebin')
165         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
166         self.pipe.add(decodebin)
167         decodebin.sync_state_with_parent()
168         self.webrtc.link(decodebin)
169
170     def start_pipeline(self, create_offer=True):
171         print_status(f'Creating pipeline, create_offer: {create_offer}')
172         self.pipe = Gst.parse_launch(PIPELINE_DESC)
173         self.webrtc = self.pipe.get_by_name('sendrecv')
174         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer)
175         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
176         self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify)
177         self.webrtc.connect('pad-added', self.on_incoming_stream)
178         self.pipe.set_state(Gst.State.PLAYING)
179
180     def on_answer_created(self, promise, _, __):
181         assert(promise.wait() == Gst.PromiseResult.REPLIED)
182         reply = promise.get_reply()
183         answer = reply['answer']
184         promise = Gst.Promise.new()
185         self.webrtc.emit('set-local-description', answer, promise)
186         promise.interrupt()  # we don't care about the result, discard it
187         self.send_sdp(answer)
188
189     def on_offer_set(self, promise, _, __):
190         assert(promise.wait() == Gst.PromiseResult.REPLIED)
191         promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
192         self.webrtc.emit('create-answer', None, promise)
193
194     def handle_json(self, message):
195         assert (self.webrtc)
196         try:
197             msg = json.loads(message)
198         except json.decoder.JSONDecoderError:
199             print_error('Failed to parse JSON message, this might be a bug')
200             raise
201         if 'sdp' in msg:
202             sdp = msg['sdp']['sdp']
203             if msg['sdp']['type'] == 'answer':
204                 print_status('Received answer:\n%s' % sdp)
205                 res, sdpmsg = GstSdp.SDPMessage.new()
206                 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
207                 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
208                 promise = Gst.Promise.new()
209                 self.webrtc.emit('set-remote-description', answer, promise)
210                 promise.interrupt()  # we don't care about the result, discard it
211             else:
212                 print_status('Received offer:\n%s' % sdp)
213                 res, sdpmsg = GstSdp.SDPMessage.new()
214                 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
215                 offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
216                 promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None)
217                 self.webrtc.emit('set-remote-description', offer, promise)
218         elif 'ice' in msg:
219             ice = msg['ice']
220             candidate = ice['candidate']
221             sdpmlineindex = ice['sdpMLineIndex']
222             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
223         else:
224             print_error('Unknown JSON message')
225
226     def close_pipeline(self):
227         if self.pipe:
228             self.pipe.set_state(Gst.State.NULL)
229             self.pipe = None
230         self.webrtc = None
231
232     def is_incoming_offer(self, msg):
233         if self.webrtc:
234             return False
235         if self.remote_is_offerer:
236             return True
237         return True
238
239     async def loop(self):
240         assert self.conn
241         async for message in self.conn:
242             print(f'<<< {message}')
243             if message == 'HELLO':
244                 assert self.id_
245                 # If a peer ID is specified, we want to connect to it. If not,
246                 # we wait for an incoming call.
247                 if not self.peer_id:
248                     print_status(f'Waiting for incoming call: ID is {self.id_}')
249                 else:
250                     if self.remote_is_offerer:
251                         print_status('Have peer ID: initiating call (will request remote peer to create offer)')
252                     else:
253                         print_status('Have peer ID: initiating call (will create offer)')
254                     await self.setup_call()
255             elif message == 'SESSION_OK':
256                 if self.remote_is_offerer:
257                     self.start_pipeline(create_offer=False)
258                 else:
259                     self.start_pipeline()
260             elif message == 'OFFER_REQUEST':
261                 print_status('Incoming call: we have been asked to create the offer')
262                 self.start_pipeline()
263             elif message.startswith('ERROR'):
264                 print_error(message)
265                 self.close_pipeline()
266                 return 1
267             else:
268                 if self.is_incoming_offer(message):
269                     print_status('Incoming call: received an offer, creating pipeline')
270                     self.start_pipeline(create_offer=False)
271                 self.handle_json(message)
272         self.close_pipeline()
273         return 0
274
275     async def stop(self):
276         if self.conn:
277             await self.conn.close()
278         self.conn = None
279
280
281 def check_plugins():
282     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
283               "rtpmanager", "videotestsrc", "audiotestsrc"]
284     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
285     if len(missing):
286         print_error('Missing gstreamer plugins:', missing)
287         return False
288     return True
289
290
291 if __name__ == '__main__':
292     Gst.init(None)
293     if not check_plugins():
294         sys.exit(1)
295     parser = argparse.ArgumentParser()
296     parser.add_argument('--peer-id', help='String ID of the peer to connect to')
297     parser.add_argument('--our-id', help='String ID that the peer can use to connect to us')
298     parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
299                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
300     parser.add_argument('--remote-offerer', default=False, action='store_true',
301                         dest='remote_is_offerer',
302                         help='Request that the peer generate the offer and we\'ll answer')
303     args = parser.parse_args()
304     if not args.peer_id and not args.our_id:
305         print('You must pass either --peer-id or --our-id')
306         sys.exit(1)
307     loop = asyncio.new_event_loop()
308     c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer)
309     loop.run_until_complete(c.connect())
310     res = loop.run_until_complete(c.loop())
311     sys.exit(res)