11 gi.require_version('Gst', '1.0')
12 from gi.repository import Gst
13 gi.require_version('GstWebRTC', '1.0')
14 from gi.repository import GstWebRTC
15 gi.require_version('GstSdp', '1.0')
16 from gi.repository import GstSdp
19 webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
20 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
21 queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
22 audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
23 queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
26 from websockets.version import version as wsv
29 def __init__(self, id_, peer_id, server):
34 self.peer_id = peer_id
35 self.server = server or 'wss://webrtc.nirbheek.in:8443'
38 async def connect(self):
39 sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
40 self.conn = await websockets.connect(self.server, ssl=sslctx)
41 await self.conn.send('HELLO %d' % self.id_)
43 async def setup_call(self):
44 await self.conn.send('SESSION {}'.format(self.peer_id))
46 def send_sdp_offer(self, offer):
47 text = offer.sdp.as_text()
48 print ('Sending offer:\n%s' % text)
49 msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
50 loop = asyncio.new_event_loop()
51 loop.run_until_complete(self.conn.send(msg))
54 def on_offer_created(self, promise, _, __):
56 reply = promise.get_reply()
57 offer = reply['offer']
58 promise = Gst.Promise.new()
59 self.webrtc.emit('set-local-description', offer, promise)
61 self.send_sdp_offer(offer)
63 def on_negotiation_needed(self, element):
64 promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
65 element.emit('create-offer', None, promise)
67 def send_ice_candidate_message(self, _, mlineindex, candidate):
68 icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
69 loop = asyncio.new_event_loop()
70 loop.run_until_complete(self.conn.send(icemsg))
73 def on_incoming_decodebin_stream(self, _, pad):
74 if not pad.has_current_caps():
75 print (pad, 'has no caps, ignoring')
78 caps = pad.get_current_caps()
82 if name.startswith('video'):
83 q = Gst.ElementFactory.make('queue')
84 conv = Gst.ElementFactory.make('videoconvert')
85 sink = Gst.ElementFactory.make('autovideosink')
86 self.pipe.add(q, conv, sink)
87 self.pipe.sync_children_states()
88 pad.link(q.get_static_pad('sink'))
91 elif name.startswith('audio'):
92 q = Gst.ElementFactory.make('queue')
93 conv = Gst.ElementFactory.make('audioconvert')
94 resample = Gst.ElementFactory.make('audioresample')
95 sink = Gst.ElementFactory.make('autoaudiosink')
96 self.pipe.add(q, conv, resample, sink)
97 self.pipe.sync_children_states()
98 pad.link(q.get_static_pad('sink'))
103 def on_incoming_stream(self, _, pad):
104 if pad.direction != Gst.PadDirection.SRC:
107 decodebin = Gst.ElementFactory.make('decodebin')
108 decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
109 self.pipe.add(decodebin)
110 decodebin.sync_state_with_parent()
111 self.webrtc.link(decodebin)
113 def start_pipeline(self):
114 self.pipe = Gst.parse_launch(PIPELINE_DESC)
115 self.webrtc = self.pipe.get_by_name('sendrecv')
116 self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
117 self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
118 self.webrtc.connect('pad-added', self.on_incoming_stream)
119 self.pipe.set_state(Gst.State.PLAYING)
121 def handle_sdp(self, message):
123 msg = json.loads(message)
126 assert(sdp['type'] == 'answer')
128 print ('Received answer:\n%s' % sdp)
129 res, sdpmsg = GstSdp.SDPMessage.new()
130 GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
131 answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
132 promise = Gst.Promise.new()
133 self.webrtc.emit('set-remote-description', answer, promise)
137 candidate = ice['candidate']
138 sdpmlineindex = ice['sdpMLineIndex']
139 self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
141 def close_pipeline(self):
142 self.pipe.set_state(Gst.State.NULL)
146 async def loop(self):
148 async for message in self.conn:
149 if message == 'HELLO':
150 await self.setup_call()
151 elif message == 'SESSION_OK':
152 self.start_pipeline()
153 elif message.startswith('ERROR'):
155 self.close_pipeline()
158 self.handle_sdp(message)
159 self.close_pipeline()
162 async def stop(self):
164 await self.conn.close()
169 needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
170 "rtpmanager", "videotestsrc", "audiotestsrc"]
171 missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
173 print('Missing gstreamer plugins:', missing)
178 if __name__=='__main__':
180 if not check_plugins():
182 parser = argparse.ArgumentParser()
183 parser.add_argument('peerid', help='String ID of the peer to connect to')
184 parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
185 args = parser.parse_args()
186 our_id = random.randrange(10, 10000)
187 c = WebRTCClient(our_id, args.peerid, args.server)
188 loop = asyncio.get_event_loop()
189 loop.run_until_complete(c.connect())
190 res = loop.run_until_complete(c.loop())