3 use std::sync::{Arc, Mutex, Weak};
9 use async_std::prelude::*;
11 use futures::channel::mpsc;
12 use futures::sink::{Sink, SinkExt};
13 use futures::stream::StreamExt;
15 use async_tungstenite::tungstenite;
16 use tungstenite::Error as WsError;
17 use tungstenite::Message as WsMessage;
22 use serde_derive::{Deserialize, Serialize};
24 use anyhow::{anyhow, bail, Context};
26 const STUN_SERVER: &str = "stun://stun.l.google.com:19302";
27 const TURN_SERVER: &str = "turn://foo:bar@webrtc.nirbheek.in:3478";
29 // upgrade weak reference or return
31 macro_rules! upgrade_weak {
32 ($x:ident, $r:expr) => {{
43 #[derive(Debug, clap::Parser)]
45 #[clap(short, long, default_value = "wss://webrtc.nirbheek.in:8443")]
51 // JSON messages we communicate with
52 #[derive(Serialize, Deserialize)]
53 #[serde(rename_all = "lowercase")]
57 #[serde(rename = "sdpMLineIndex")]
61 #[serde(rename = "type")]
67 // Strong reference to our application state
68 #[derive(Debug, Clone)]
69 struct App(Arc<AppInner>);
71 // Weak reference to our application state
72 #[derive(Debug, Clone)]
73 struct AppWeak(Weak<AppInner>);
75 // Actual application state
79 pipeline: gst::Pipeline,
80 webrtcbin: gst::Element,
81 send_msg_tx: Mutex<mpsc::UnboundedSender<WsMessage>>,
84 // To be able to access the App's fields directly
85 impl std::ops::Deref for App {
86 type Target = AppInner;
88 fn deref(&self) -> &AppInner {
94 // Try upgrading a weak reference to a strong one
95 fn upgrade(&self) -> Option<App> {
96 self.0.upgrade().map(App)
101 // Downgrade the strong reference to a weak reference
102 fn downgrade(&self) -> AppWeak {
103 AppWeak(Arc::downgrade(&self.0))
111 impl Stream<Item = gst::Message>,
112 impl Stream<Item = WsMessage>,
116 // Create the GStreamer pipeline
117 let pipeline = gst::parse_launch(
118 "videotestsrc pattern=ball is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! webrtcbin. \
119 audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 ! webrtcbin. \
120 webrtcbin name=webrtcbin"
123 // Downcast from gst::Element to gst::Pipeline
124 let pipeline = pipeline
125 .downcast::<gst::Pipeline>()
126 .expect("not a pipeline");
128 // Get access to the webrtcbin by name
129 let webrtcbin = pipeline.by_name("webrtcbin").expect("can't find webrtcbin");
131 // Set some properties on webrtcbin
132 webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
133 webrtcbin.set_property_from_str("turn-server", TURN_SERVER);
134 webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
136 // Create a stream for handling the GStreamer message asynchronously
137 let bus = pipeline.bus().unwrap();
138 let send_gst_msg_rx = bus.stream();
140 // Channel for outgoing WebSocket messages from other threads
141 let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>();
143 let app = App(Arc::new(AppInner {
147 send_msg_tx: Mutex::new(send_ws_msg_tx),
150 // Connect to on-negotiation-needed to handle sending an Offer
151 if app.args.peer_id.is_some() {
152 let app_clone = app.downgrade();
153 app.webrtcbin.connect_closure(
154 "on-negotiation-needed",
156 glib::closure!(move |_webrtcbin: &gst::Element| {
157 let app = upgrade_weak!(app_clone);
158 if let Err(err) = app.on_negotiation_needed() {
161 gst::LibraryError::Failed,
162 ("Failed to negotiate: {:?}", err)
169 // Whenever there is a new ICE candidate, send it to the peer
170 let app_clone = app.downgrade();
171 app.webrtcbin.connect_closure(
175 move |_webrtcbin: &gst::Element, mlineindex: u32, candidate: &str| {
176 let app = upgrade_weak!(app_clone);
178 if let Err(err) = app.on_ice_candidate(mlineindex, candidate) {
181 gst::LibraryError::Failed,
182 ("Failed to send ICE candidate: {:?}", err)
189 // Whenever there is a new stream incoming from the peer, handle it
190 let app_clone = app.downgrade();
191 app.webrtcbin.connect_pad_added(move |_webrtc, pad| {
192 let app = upgrade_weak!(app_clone);
194 if let Err(err) = app.on_incoming_stream(pad) {
197 gst::LibraryError::Failed,
198 ("Failed to handle incoming stream: {:?}", err)
203 // Asynchronously set the pipeline to Playing
204 app.pipeline.call_async(|pipeline| {
205 // If this fails, post an error on the bus so we exit
206 if pipeline.set_state(gst::State::Playing).is_err() {
209 gst::LibraryError::Failed,
210 ("Failed to set pipeline to Playing")
215 // Asynchronously set the pipeline to Playing
216 app.pipeline.call_async(|pipeline| {
218 .set_state(gst::State::Playing)
219 .expect("Couldn't set pipeline to Playing");
222 Ok((app, send_gst_msg_rx, send_ws_msg_rx))
225 // Handle WebSocket messages, both our own as well as WebSocket protocol messages
226 fn handle_websocket_message(&self, msg: &str) -> Result<(), anyhow::Error> {
227 if msg.starts_with("ERROR") {
228 bail!("Got error message: {}", msg);
231 let json_msg: JsonMsg = serde_json::from_str(msg)?;
234 JsonMsg::Sdp { type_, sdp } => self.handle_sdp(&type_, &sdp),
238 } => self.handle_ice(sdp_mline_index, &candidate),
242 // Handle GStreamer messages coming from the pipeline
243 fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> {
244 use gst::message::MessageView;
246 match message.view() {
247 MessageView::Error(err) => bail!(
248 "Error from element {}: {} ({})",
250 .map(|s| String::from(s.path_string()))
251 .unwrap_or_else(|| String::from("None")),
253 err.debug().unwrap_or_else(|| String::from("None")),
255 MessageView::Warning(warning) => {
256 println!("Warning: \"{}\"", warning.debug().unwrap());
264 // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask
265 // for a new offer SDP from webrtcbin without any customization and then
266 // asynchronously send it to the peer via the WebSocket connection
267 fn on_negotiation_needed(&self) -> Result<(), anyhow::Error> {
268 println!("starting negotiation");
270 let app_clone = self.downgrade();
271 let promise = gst::Promise::with_change_func(move |reply| {
272 let app = upgrade_weak!(app_clone);
274 if let Err(err) = app.on_offer_created(reply) {
277 gst::LibraryError::Failed,
278 ("Failed to send SDP offer: {:?}", err)
284 .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
289 // Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the
290 // WebSocket connection
293 reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
294 ) -> Result<(), anyhow::Error> {
295 let reply = match reply {
296 Ok(Some(reply)) => reply,
298 bail!("Offer creation future got no response");
301 bail!("Offer creation future got error response: {:?}", err);
308 .get::<gst_webrtc::WebRTCSessionDescription>()
309 .expect("Invalid argument");
311 .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
314 "sending SDP offer to peer: {}",
315 offer.sdp().as_text().unwrap()
318 let message = serde_json::to_string(&JsonMsg::Sdp {
319 type_: "offer".to_string(),
320 sdp: offer.sdp().as_text().unwrap(),
327 .unbounded_send(WsMessage::Text(message))
328 .context("Failed to send SDP offer")?;
333 // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the
334 // WebSocket connection
335 fn on_answer_created(
337 reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
338 ) -> Result<(), anyhow::Error> {
339 let reply = match reply {
340 Ok(Some(reply)) => reply,
342 bail!("Answer creation future got no response");
345 bail!("Answer creation future got error response: {:?}", err);
352 .get::<gst_webrtc::WebRTCSessionDescription>()
353 .expect("Invalid argument");
355 .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
358 "sending SDP answer to peer: {}",
359 answer.sdp().as_text().unwrap()
362 let message = serde_json::to_string(&JsonMsg::Sdp {
363 type_: "answer".to_string(),
364 sdp: answer.sdp().as_text().unwrap(),
371 .unbounded_send(WsMessage::Text(message))
372 .context("Failed to send SDP answer")?;
377 // Handle incoming SDP answers from the peer
378 fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> {
379 if type_ == "answer" {
380 print!("Received answer:\n{}\n", sdp);
382 let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
383 .map_err(|_| anyhow!("Failed to parse SDP answer"))?;
385 gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
388 .emit_by_name::<()>("set-remote-description", &[&answer, &None::<gst::Promise>]);
391 } else if type_ == "offer" {
392 print!("Received offer:\n{}\n", sdp);
394 let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
395 .map_err(|_| anyhow!("Failed to parse SDP offer"))?;
397 // And then asynchronously start our pipeline and do the next steps. The
398 // pipeline needs to be started before we can create an answer
399 let app_clone = self.downgrade();
400 self.pipeline.call_async(move |_pipeline| {
401 let app = upgrade_weak!(app_clone);
403 let offer = gst_webrtc::WebRTCSessionDescription::new(
404 gst_webrtc::WebRTCSDPType::Offer,
410 .emit_by_name::<()>("set-remote-description", &[&offer, &None::<gst::Promise>]);
412 let app_clone = app.downgrade();
413 let promise = gst::Promise::with_change_func(move |reply| {
414 let app = upgrade_weak!(app_clone);
416 if let Err(err) = app.on_answer_created(reply) {
419 gst::LibraryError::Failed,
420 ("Failed to send SDP answer: {:?}", err)
427 .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
432 bail!("Sdp type is not \"answer\" but \"{}\"", type_)
436 // Handle incoming ICE candidates from the peer by passing them to webrtcbin
437 fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) -> Result<(), anyhow::Error> {
439 .emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]);
444 // Asynchronously send ICE candidates to the peer via the WebSocket connection as a JSON
446 fn on_ice_candidate(&self, mlineindex: u32, candidate: &str) -> Result<(), anyhow::Error> {
447 let message = serde_json::to_string(&JsonMsg::Ice {
448 candidate: candidate.to_string(),
449 sdp_mline_index: mlineindex,
456 .unbounded_send(WsMessage::Text(message))
457 .context("Failed to send ICE candidate")?;
462 // Whenever there's a new incoming, encoded stream from the peer create a new decodebin
463 fn on_incoming_stream(&self, pad: &gst::Pad) -> Result<(), anyhow::Error> {
464 // Early return for the source pads we're adding ourselves
465 if pad.direction() != gst::PadDirection::Src {
469 let decodebin = gst::ElementFactory::make("decodebin").build().unwrap();
470 let app_clone = self.downgrade();
471 decodebin.connect_pad_added(move |_decodebin, pad| {
472 let app = upgrade_weak!(app_clone);
474 if let Err(err) = app.on_incoming_decodebin_stream(pad) {
477 gst::LibraryError::Failed,
478 ("Failed to handle decoded stream: {:?}", err)
483 self.pipeline.add(&decodebin).unwrap();
484 decodebin.sync_state_with_parent().unwrap();
486 let sinkpad = decodebin.static_pad("sink").unwrap();
487 pad.link(&sinkpad).unwrap();
492 // Handle a newly decoded decodebin stream and depending on its type, create the relevant
493 // elements or simply ignore it
494 fn on_incoming_decodebin_stream(&self, pad: &gst::Pad) -> Result<(), anyhow::Error> {
495 let caps = pad.current_caps().unwrap();
496 let name = caps.structure(0).unwrap().name();
498 let sink = if name.starts_with("video/") {
499 gst::parse_bin_from_description(
500 "queue ! videoconvert ! videoscale ! autovideosink",
503 } else if name.starts_with("audio/") {
504 gst::parse_bin_from_description(
505 "queue ! audioconvert ! audioresample ! autoaudiosink",
509 println!("Unknown pad {:?}, ignoring", pad);
513 self.pipeline.add(&sink).unwrap();
514 sink.sync_state_with_parent()
515 .with_context(|| format!("can't start sink for stream {:?}", caps))?;
517 let sinkpad = sink.static_pad("sink").unwrap();
519 .with_context(|| format!("can't link sink for stream {:?}", caps))?;
525 // Make sure to shut down the pipeline when it goes out of scope
526 // to release any system resources
527 impl Drop for AppInner {
529 let _ = self.pipeline.set_state(gst::State::Null);
535 ws: impl Sink<WsMessage, Error = WsError> + Stream<Item = Result<WsMessage, WsError>>,
536 ) -> Result<(), anyhow::Error> {
537 // Split the websocket into the Sink and Stream
538 let (mut ws_sink, ws_stream) = ws.split();
539 // Fuse the Stream, required for the select macro
540 let mut ws_stream = ws_stream.fuse();
542 // Create our application state
543 let (app, send_gst_msg_rx, send_ws_msg_rx) = App::new(args)?;
545 let mut send_gst_msg_rx = send_gst_msg_rx.fuse();
546 let mut send_ws_msg_rx = send_ws_msg_rx.fuse();
548 // And now let's start our message loop
550 let ws_msg = futures::select! {
551 // Handle the WebSocket messages here
552 ws_msg = ws_stream.select_next_some() => {
554 WsMessage::Close(_) => {
555 println!("peer disconnected");
558 WsMessage::Ping(data) => Some(WsMessage::Pong(data)),
559 WsMessage::Pong(_) => None,
560 WsMessage::Binary(_) => None,
561 WsMessage::Text(text) => {
562 app.handle_websocket_message(&text)?;
565 WsMessage::Frame(_) => unreachable!(),
568 // Pass the GStreamer messages to the application control logic
569 gst_msg = send_gst_msg_rx.select_next_some() => {
570 app.handle_pipeline_message(&gst_msg)?;
573 // Handle WebSocket messages we created asynchronously
574 // to send them out now
575 ws_msg = send_ws_msg_rx.select_next_some() => Some(ws_msg),
576 // Once we're done, break the loop and return
580 // If there's a message to send out, do so now
581 if let Some(ws_msg) = ws_msg {
582 ws_sink.send(ws_msg).await?;
589 // Check if all GStreamer plugins we require are available
590 fn check_plugins() -> Result<(), anyhow::Error> {
609 let registry = gst::Registry::get();
612 .filter(|n| registry.find_plugin(n).is_none())
614 .collect::<Vec<_>>();
616 if !missing.is_empty() {
617 bail!("Missing plugins: {:?}", missing);
623 async fn async_main() -> Result<(), anyhow::Error> {
624 // Initialize GStreamer first
629 let args = Args::parse();
631 // Connect to the given server
632 let (mut ws, _) = async_tungstenite::async_std::connect_async(&args.server).await?;
634 println!("connected");
636 // Say HELLO to the server and see if it replies with HELLO
637 let our_id = rand::thread_rng().gen_range(10..10_000);
638 println!("Registering id {} with server", our_id);
639 ws.send(WsMessage::Text(format!("HELLO {}", our_id)))
645 .ok_or_else(|| anyhow!("didn't receive anything"))??;
647 if msg != WsMessage::Text("HELLO".into()) {
648 bail!("server didn't say HELLO");
651 if let Some(peer_id) = args.peer_id {
652 // Join the given session
653 ws.send(WsMessage::Text(format!("SESSION {}", peer_id)))
659 .ok_or_else(|| anyhow!("didn't receive anything"))??;
661 if msg != WsMessage::Text("SESSION_OK".into()) {
662 bail!("server error: {:?}", msg);
666 // All good, let's run our message loop
670 fn main() -> Result<(), anyhow::Error> {
671 macos_workaround::run(|| task::block_on(async_main()))