webrtc_sendrecv.py: Make it executable
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 #!/usr/bin/env python3
2
3 import random
4 import ssl
5 import websockets
6 import asyncio
7 import os
8 import sys
9 import json
10 import argparse
11
12 import gi
13 gi.require_version('Gst', '1.0')
14 from gi.repository import Gst
15 gi.require_version('GstWebRTC', '1.0')
16 from gi.repository import GstWebRTC
17 gi.require_version('GstSdp', '1.0')
18 from gi.repository import GstSdp
19
20 # Ensure that gst-python is installed
21 try:
22     from gi.overrides import Gst as _
23 except ImportError:
24     print('gstreamer-python binding overrides aren\'t available, please install them')
25     raise
26
27 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
28 PIPELINE_DESC = '''
29 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
30  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
31   vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay !
32   queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
33  audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
34   queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
35 '''
36
37 from websockets.version import version as wsv
38
39
40 class WebRTCClient:
41     def __init__(self, loop, id_, peer_id, server):
42         self.event_loop = loop
43         self.id_ = id_
44         self.conn = None
45         self.pipe = None
46         self.webrtc = None
47         self.peer_id = peer_id
48         self.server = server
49
50     async def send(self, msg):
51         assert self.conn
52         print(f'>>> Sending {msg}')
53         await self.conn.send(msg)
54
55     async def connect(self):
56         self.conn = await websockets.connect(self.server)
57         await self.send('HELLO %d' % self.id_)
58
59     async def setup_call(self):
60         await self.send('SESSION {}'.format(self.peer_id))
61
62     def send_soon(self, msg):
63         asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
64
65     def send_sdp_offer(self, offer):
66         text = offer.sdp.as_text()
67         print('Sending offer:\n%s' % text)
68         msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
69         self.send_soon(msg)
70
71     def on_offer_created(self, promise, _, __):
72         promise.wait()
73         reply = promise.get_reply()
74         offer = reply['offer']
75         promise = Gst.Promise.new()
76         print('Offer created, setting local description')
77         self.webrtc.emit('set-local-description', offer, promise)
78         promise.interrupt()
79         self.send_sdp_offer(offer)
80
81     def on_negotiation_needed(self, element):
82         promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
83         element.emit('create-offer', None, promise)
84
85     def send_ice_candidate_message(self, _, mlineindex, candidate):
86         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
87         self.send_soon(icemsg)
88
89     def on_incoming_decodebin_stream(self, _, pad):
90         if not pad.has_current_caps():
91             print(pad, 'has no caps, ignoring')
92             return
93
94         caps = pad.get_current_caps()
95         assert (len(caps))
96         s = caps[0]
97         name = s.get_name()
98         if name.startswith('video'):
99             q = Gst.ElementFactory.make('queue')
100             conv = Gst.ElementFactory.make('videoconvert')
101             sink = Gst.ElementFactory.make('autovideosink')
102             self.pipe.add(q, conv, sink)
103             self.pipe.sync_children_states()
104             pad.link(q.get_static_pad('sink'))
105             q.link(conv)
106             conv.link(sink)
107         elif name.startswith('audio'):
108             q = Gst.ElementFactory.make('queue')
109             conv = Gst.ElementFactory.make('audioconvert')
110             resample = Gst.ElementFactory.make('audioresample')
111             sink = Gst.ElementFactory.make('autoaudiosink')
112             self.pipe.add(q, conv, resample, sink)
113             self.pipe.sync_children_states()
114             pad.link(q.get_static_pad('sink'))
115             q.link(conv)
116             conv.link(resample)
117             resample.link(sink)
118
119     def on_incoming_stream(self, _, pad):
120         if pad.direction != Gst.PadDirection.SRC:
121             return
122
123         decodebin = Gst.ElementFactory.make('decodebin')
124         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
125         self.pipe.add(decodebin)
126         decodebin.sync_state_with_parent()
127         self.webrtc.link(decodebin)
128
129     def start_pipeline(self):
130         self.pipe = Gst.parse_launch(PIPELINE_DESC)
131         self.webrtc = self.pipe.get_by_name('sendrecv')
132         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
133         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
134         self.webrtc.connect('pad-added', self.on_incoming_stream)
135         self.pipe.set_state(Gst.State.PLAYING)
136
137     def handle_sdp(self, message):
138         assert (self.webrtc)
139         msg = json.loads(message)
140         if 'sdp' in msg:
141             sdp = msg['sdp']
142             assert(sdp['type'] == 'answer')
143             sdp = sdp['sdp']
144             print('Received answer:\n%s' % sdp)
145             res, sdpmsg = GstSdp.SDPMessage.new()
146             GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
147             answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
148             promise = Gst.Promise.new()
149             self.webrtc.emit('set-remote-description', answer, promise)
150             promise.interrupt()
151         elif 'ice' in msg:
152             ice = msg['ice']
153             candidate = ice['candidate']
154             sdpmlineindex = ice['sdpMLineIndex']
155             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
156
157     def close_pipeline(self):
158         if self.pipe:
159             self.pipe.set_state(Gst.State.NULL)
160             self.pipe = None
161         self.webrtc = None
162
163     async def loop(self):
164         assert self.conn
165         async for message in self.conn:
166             if message == 'HELLO':
167                 await self.setup_call()
168             elif message == 'SESSION_OK':
169                 self.start_pipeline()
170             elif message.startswith('ERROR'):
171                 print(message)
172                 self.close_pipeline()
173                 return 1
174             else:
175                 self.handle_sdp(message)
176         self.close_pipeline()
177         return 0
178
179     async def stop(self):
180         if self.conn:
181             await self.conn.close()
182         self.conn = None
183
184
185 def check_plugins():
186     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
187               "rtpmanager", "videotestsrc", "audiotestsrc"]
188     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
189     if len(missing):
190         print('Missing gstreamer plugins:', missing)
191         return False
192     return True
193
194
195 if __name__ == '__main__':
196     Gst.init(None)
197     if not check_plugins():
198         sys.exit(1)
199     parser = argparse.ArgumentParser()
200     parser.add_argument('peerid', help='String ID of the peer to connect to')
201     parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
202                         help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
203     args = parser.parse_args()
204     our_id = random.randrange(10, 10000)
205     loop = asyncio.new_event_loop()
206     c = WebRTCClient(loop, our_id, args.peerid, args.server)
207     loop.run_until_complete(c.connect())
208     res = loop.run_until_complete(c.loop())
209     sys.exit(res)