Merging gst-build
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc_sendrecv.py
1 import random
2 import ssl
3 import websockets
4 import asyncio
5 import os
6 import sys
7 import json
8 import argparse
9
10 import gi
11 gi.require_version('Gst', '1.0')
12 from gi.repository import Gst
13 gi.require_version('GstWebRTC', '1.0')
14 from gi.repository import GstWebRTC
15 gi.require_version('GstSdp', '1.0')
16 from gi.repository import GstSdp
17
18 PIPELINE_DESC = '''
19 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
20  videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
21  queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
22  audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
23  queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
24 '''
25
26 from websockets.version import version as wsv
27
28 class WebRTCClient:
29     def __init__(self, id_, peer_id, server):
30         self.id_ = id_
31         self.conn = None
32         self.pipe = None
33         self.webrtc = None
34         self.peer_id = peer_id
35         self.server = server or 'wss://webrtc.nirbheek.in:8443'
36
37
38     async def connect(self):
39         sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
40         self.conn = await websockets.connect(self.server, ssl=sslctx)
41         await self.conn.send('HELLO %d' % self.id_)
42
43     async def setup_call(self):
44         await self.conn.send('SESSION {}'.format(self.peer_id))
45
46     def send_sdp_offer(self, offer):
47         text = offer.sdp.as_text()
48         print ('Sending offer:\n%s' % text)
49         msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
50         loop = asyncio.new_event_loop()
51         loop.run_until_complete(self.conn.send(msg))
52         loop.close()
53
54     def on_offer_created(self, promise, _, __):
55         promise.wait()
56         reply = promise.get_reply()
57         offer = reply['offer']
58         promise = Gst.Promise.new()
59         self.webrtc.emit('set-local-description', offer, promise)
60         promise.interrupt()
61         self.send_sdp_offer(offer)
62
63     def on_negotiation_needed(self, element):
64         promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
65         element.emit('create-offer', None, promise)
66
67     def send_ice_candidate_message(self, _, mlineindex, candidate):
68         icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
69         loop = asyncio.new_event_loop()
70         loop.run_until_complete(self.conn.send(icemsg))
71         loop.close()
72
73     def on_incoming_decodebin_stream(self, _, pad):
74         if not pad.has_current_caps():
75             print (pad, 'has no caps, ignoring')
76             return
77
78         caps = pad.get_current_caps()
79         assert (len(caps))
80         s = caps[0]
81         name = s.get_name()
82         if name.startswith('video'):
83             q = Gst.ElementFactory.make('queue')
84             conv = Gst.ElementFactory.make('videoconvert')
85             sink = Gst.ElementFactory.make('autovideosink')
86             self.pipe.add(q, conv, sink)
87             self.pipe.sync_children_states()
88             pad.link(q.get_static_pad('sink'))
89             q.link(conv)
90             conv.link(sink)
91         elif name.startswith('audio'):
92             q = Gst.ElementFactory.make('queue')
93             conv = Gst.ElementFactory.make('audioconvert')
94             resample = Gst.ElementFactory.make('audioresample')
95             sink = Gst.ElementFactory.make('autoaudiosink')
96             self.pipe.add(q, conv, resample, sink)
97             self.pipe.sync_children_states()
98             pad.link(q.get_static_pad('sink'))
99             q.link(conv)
100             conv.link(resample)
101             resample.link(sink)
102
103     def on_incoming_stream(self, _, pad):
104         if pad.direction != Gst.PadDirection.SRC:
105             return
106
107         decodebin = Gst.ElementFactory.make('decodebin')
108         decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
109         self.pipe.add(decodebin)
110         decodebin.sync_state_with_parent()
111         self.webrtc.link(decodebin)
112
113     def start_pipeline(self):
114         self.pipe = Gst.parse_launch(PIPELINE_DESC)
115         self.webrtc = self.pipe.get_by_name('sendrecv')
116         self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
117         self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
118         self.webrtc.connect('pad-added', self.on_incoming_stream)
119         self.pipe.set_state(Gst.State.PLAYING)
120
121     def handle_sdp(self, message):
122         assert (self.webrtc)
123         msg = json.loads(message)
124         if 'sdp' in msg:
125             sdp = msg['sdp']
126             assert(sdp['type'] == 'answer')
127             sdp = sdp['sdp']
128             print ('Received answer:\n%s' % sdp)
129             res, sdpmsg = GstSdp.SDPMessage.new()
130             GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
131             answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
132             promise = Gst.Promise.new()
133             self.webrtc.emit('set-remote-description', answer, promise)
134             promise.interrupt()
135         elif 'ice' in msg:
136             ice = msg['ice']
137             candidate = ice['candidate']
138             sdpmlineindex = ice['sdpMLineIndex']
139             self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
140
141     def close_pipeline(self):
142         self.pipe.set_state(Gst.State.NULL)
143         self.pipe = None
144         self.webrtc = None
145
146     async def loop(self):
147         assert self.conn
148         async for message in self.conn:
149             if message == 'HELLO':
150                 await self.setup_call()
151             elif message == 'SESSION_OK':
152                 self.start_pipeline()
153             elif message.startswith('ERROR'):
154                 print (message)
155                 self.close_pipeline()
156                 return 1
157             else:
158                 self.handle_sdp(message)
159         self.close_pipeline()
160         return 0
161
162     async def stop(self):
163         if self.conn:
164             await self.conn.close()
165         self.conn = None
166
167
168 def check_plugins():
169     needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
170               "rtpmanager", "videotestsrc", "audiotestsrc"]
171     missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
172     if len(missing):
173         print('Missing gstreamer plugins:', missing)
174         return False
175     return True
176
177
178 if __name__=='__main__':
179     Gst.init(None)
180     if not check_plugins():
181         sys.exit(1)
182     parser = argparse.ArgumentParser()
183     parser.add_argument('peerid', help='String ID of the peer to connect to')
184     parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
185     args = parser.parse_args()
186     our_id = random.randrange(10, 10000)
187     c = WebRTCClient(our_id, args.peerid, args.server)
188     loop = asyncio.get_event_loop()
189     loop.run_until_complete(c.connect())
190     res = loop.run_until_complete(c.loop())
191     sys.exit(res)