13 gi.require_version('Gst', '1.0')
14 from gi.repository import Gst
15 gi.require_version('GstWebRTC', '1.0')
16 from gi.repository import GstWebRTC
17 gi.require_version('GstSdp', '1.0')
18 from gi.repository import GstSdp
20 # Ensure that gst-python is installed
22 from gi.overrides import Gst as _
24 print('gstreamer-python binding overrides aren\'t available, please install them')
27 # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
29 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
30 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
31 vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay !
32 queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
33 audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
34 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
37 from websockets.version import version as wsv
41 def __init__(self, loop, id_, peer_id, server):
42 self.event_loop = loop
47 self.peer_id = peer_id
50 async def send(self, msg):
52 print(f'>>> Sending {msg}')
53 await self.conn.send(msg)
55 async def connect(self):
56 self.conn = await websockets.connect(self.server)
57 await self.send('HELLO %d' % self.id_)
59 async def setup_call(self):
60 await self.send('SESSION {}'.format(self.peer_id))
62 def send_soon(self, msg):
63 asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
65 def send_sdp_offer(self, offer):
66 text = offer.sdp.as_text()
67 print('Sending offer:\n%s' % text)
68 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
71 def on_offer_created(self, promise, _, __):
73 reply = promise.get_reply()
74 offer = reply['offer']
75 promise = Gst.Promise.new()
76 print('Offer created, setting local description')
77 self.webrtc.emit('set-local-description', offer, promise)
79 self.send_sdp_offer(offer)
81 def on_negotiation_needed(self, element):
82 promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
83 element.emit('create-offer', None, promise)
85 def send_ice_candidate_message(self, _, mlineindex, candidate):
86 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
87 self.send_soon(icemsg)
89 def on_incoming_decodebin_stream(self, _, pad):
90 if not pad.has_current_caps():
91 print(pad, 'has no caps, ignoring')
94 caps = pad.get_current_caps()
98 if name.startswith('video'):
99 q = Gst.ElementFactory.make('queue')
100 conv = Gst.ElementFactory.make('videoconvert')
101 sink = Gst.ElementFactory.make('autovideosink')
102 self.pipe.add(q, conv, sink)
103 self.pipe.sync_children_states()
104 pad.link(q.get_static_pad('sink'))
107 elif name.startswith('audio'):
108 q = Gst.ElementFactory.make('queue')
109 conv = Gst.ElementFactory.make('audioconvert')
110 resample = Gst.ElementFactory.make('audioresample')
111 sink = Gst.ElementFactory.make('autoaudiosink')
112 self.pipe.add(q, conv, resample, sink)
113 self.pipe.sync_children_states()
114 pad.link(q.get_static_pad('sink'))
119 def on_incoming_stream(self, _, pad):
120 if pad.direction != Gst.PadDirection.SRC:
123 decodebin = Gst.ElementFactory.make('decodebin')
124 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
125 self.pipe.add(decodebin)
126 decodebin.sync_state_with_parent()
127 self.webrtc.link(decodebin)
129 def start_pipeline(self):
130 self.pipe = Gst.parse_launch(PIPELINE_DESC)
131 self.webrtc = self.pipe.get_by_name('sendrecv')
132 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
133 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
134 self.webrtc.connect('pad-added', self.on_incoming_stream)
135 self.pipe.set_state(Gst.State.PLAYING)
137 def handle_sdp(self, message):
139 msg = json.loads(message)
142 assert(sdp['type'] == 'answer')
144 print('Received answer:\n%s' % sdp)
145 res, sdpmsg = GstSdp.SDPMessage.new()
146 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
147 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
148 promise = Gst.Promise.new()
149 self.webrtc.emit('set-remote-description', answer, promise)
153 candidate = ice['candidate']
154 sdpmlineindex = ice['sdpMLineIndex']
155 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
157 def close_pipeline(self):
159 self.pipe.set_state(Gst.State.NULL)
163 async def loop(self):
165 async for message in self.conn:
166 if message == 'HELLO':
167 await self.setup_call()
168 elif message == 'SESSION_OK':
169 self.start_pipeline()
170 elif message.startswith('ERROR'):
172 self.close_pipeline()
175 self.handle_sdp(message)
176 self.close_pipeline()
179 async def stop(self):
181 await self.conn.close()
186 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
187 "rtpmanager", "videotestsrc", "audiotestsrc"]
188 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
190 print('Missing gstreamer plugins:', missing)
195 if __name__ == '__main__':
197 if not check_plugins():
199 parser = argparse.ArgumentParser()
200 parser.add_argument('peerid', help='String ID of the peer to connect to')
201 parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
202 help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
203 args = parser.parse_args()
204 our_id = random.randrange(10, 10000)
205 loop = asyncio.new_event_loop()
206 c = WebRTCClient(loop, our_id, args.peerid, args.server)
207 loop.run_until_complete(c.connect())
208 res = loop.run_until_complete(c.loop())