1 #![recursion_limit = "256"]
5 use std::collections::BTreeMap;
6 use std::sync::{Arc, Mutex, Weak};
12 use async_std::prelude::*;
14 use futures::channel::mpsc;
15 use futures::sink::{Sink, SinkExt};
16 use futures::stream::StreamExt;
18 use async_tungstenite::tungstenite;
19 use tungstenite::Error as WsError;
20 use tungstenite::Message as WsMessage;
25 use serde_derive::{Deserialize, Serialize};
27 use anyhow::{anyhow, bail, Context};
29 const STUN_SERVER: &str = "stun://stun.l.google.com:19302";
30 const TURN_SERVER: &str = "turn://foo:bar@webrtc.nirbheek.in:3478";
31 const VIDEO_WIDTH: u32 = 1024;
32 const VIDEO_HEIGHT: u32 = 768;
34 // upgrade weak reference or return
36 macro_rules! upgrade_weak {
37 ($x:ident, $r:expr) => {{
48 #[derive(Debug, clap::Parser)]
50 #[clap(short, long, default_value = "wss://webrtc.nirbheek.in:8443")]
56 // JSON messages we communicate with
57 #[derive(Serialize, Deserialize)]
58 #[serde(rename_all = "lowercase")]
62 #[serde(rename = "sdpMLineIndex")]
66 #[serde(rename = "type")]
72 // Strong reference to our application state
73 #[derive(Debug, Clone)]
74 struct App(Arc<AppInner>);
76 // Weak reference to our application state
77 #[derive(Debug, Clone)]
78 struct AppWeak(Weak<AppInner>);
80 // Actual application state
83 pipeline: gst::Pipeline,
84 video_tee: gst::Element,
85 audio_tee: gst::Element,
86 video_mixer: gst::Element,
87 audio_mixer: gst::Element,
88 send_msg_tx: Arc<Mutex<mpsc::UnboundedSender<WsMessage>>>,
89 peers: Mutex<BTreeMap<u32, Peer>>,
92 // Strong reference to the state of one peer
93 #[derive(Debug, Clone)]
94 struct Peer(Arc<PeerInner>);
96 // Weak reference to the state of one peer
97 #[derive(Debug, Clone)]
98 struct PeerWeak(Weak<PeerInner>);
105 webrtcbin: gst::Element,
106 send_msg_tx: Arc<Mutex<mpsc::UnboundedSender<WsMessage>>>,
109 // To be able to access the App's fields directly
110 impl std::ops::Deref for App {
111 type Target = AppInner;
113 fn deref(&self) -> &AppInner {
118 // To be able to access the Peers's fields directly
119 impl std::ops::Deref for Peer {
120 type Target = PeerInner;
122 fn deref(&self) -> &PeerInner {
128 // Try upgrading a weak reference to a strong one
129 fn upgrade(&self) -> Option<App> {
130 self.0.upgrade().map(App)
135 // Try upgrading a weak reference to a strong one
136 fn upgrade(&self) -> Option<Peer> {
137 self.0.upgrade().map(Peer)
142 // Downgrade the strong reference to a weak reference
143 fn downgrade(&self) -> AppWeak {
144 AppWeak(Arc::downgrade(&self.0))
148 initial_peers: &[&str],
152 impl Stream<Item = gst::Message>,
153 impl Stream<Item = WsMessage>,
157 // Create the GStreamer pipeline
158 let pipeline = gst::parse_launch(
160 "videotestsrc is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! tee name=video-tee ! \
161 queue ! fakesink sync=true \
162 audiotestsrc wave=ticks is-live=true ! opusenc ! rtpopuspay pt=97 ! tee name=audio-tee ! \
163 queue ! fakesink sync=true \
164 audiotestsrc wave=silence is-live=true ! audio-mixer. \
165 audiomixer name=audio-mixer sink_0::mute=true ! audioconvert ! audioresample ! autoaudiosink \
166 videotestsrc pattern=black ! capsfilter caps=video/x-raw,width=1,height=1 ! video-mixer. \
167 compositor name=video-mixer background=black sink_0::alpha=0.0 ! capsfilter caps=video/x-raw,width={width},height={height} ! videoconvert ! autovideosink",
172 // Downcast from gst::Element to gst::Pipeline
173 let pipeline = pipeline
174 .downcast::<gst::Pipeline>()
175 .expect("not a pipeline");
177 // Get access to the tees and mixers by name
178 let video_tee = pipeline.by_name("video-tee").expect("can't find video-tee");
179 let audio_tee = pipeline.by_name("audio-tee").expect("can't find audio-tee");
181 let video_mixer = pipeline
182 .by_name("video-mixer")
183 .expect("can't find video-mixer");
184 let audio_mixer = pipeline
185 .by_name("audio-mixer")
186 .expect("can't find audio-mixer");
188 // Create a stream for handling the GStreamer message asynchronously
189 let bus = pipeline.bus().unwrap();
190 let send_gst_msg_rx = bus.stream();
192 // Channel for outgoing WebSocket messages from other threads
193 let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>();
195 // Asynchronously set the pipeline to Playing
196 pipeline.call_async(|pipeline| {
198 .set_state(gst::State::Playing)
199 .expect("Couldn't set pipeline to Playing");
202 let app = App(Arc::new(AppInner {
208 peers: Mutex::new(BTreeMap::new()),
209 send_msg_tx: Arc::new(Mutex::new(send_ws_msg_tx)),
212 for peer in initial_peers {
213 app.add_peer(peer, true)?;
216 // Asynchronously set the pipeline to Playing
217 app.pipeline.call_async(|pipeline| {
218 // If this fails, post an error on the bus so we exit
219 if pipeline.set_state(gst::State::Playing).is_err() {
222 gst::LibraryError::Failed,
223 ("Failed to set pipeline to Playing")
228 Ok((app, send_gst_msg_rx, send_ws_msg_rx))
231 // Handle WebSocket messages, both our own as well as WebSocket protocol messages
232 fn handle_websocket_message(&self, msg: &str) -> Result<(), anyhow::Error> {
233 if msg.starts_with("ERROR") {
234 bail!("Got error message: {}", msg);
237 if let Some(msg) = msg.strip_prefix("ROOM_PEER_MSG ") {
238 let mut split = msg.splitn(2, ' ');
242 .and_then(|s| str::parse::<u32>(s).ok())
243 .ok_or_else(|| anyhow!("Can't parse peer id"))?;
245 let peers = self.peers.lock().unwrap();
248 .ok_or_else(|| anyhow!("Can't find peer {}", peer_id))?
254 .ok_or_else(|| anyhow!("Can't parse peer message"))?;
256 let json_msg: JsonMsg = serde_json::from_str(msg)?;
259 JsonMsg::Sdp { type_, sdp } => peer.handle_sdp(&type_, &sdp),
263 } => peer.handle_ice(sdp_mline_index, &candidate),
265 } else if let Some(msg) = msg.strip_prefix("ROOM_PEER_JOINED ") {
266 // Parse message and add the new peer
267 let mut split = msg.splitn(2, ' ');
268 let peer_id = split.next().ok_or_else(|| anyhow!("Can't parse peer id"))?;
270 self.add_peer(peer_id, false)
271 } else if let Some(msg) = msg.strip_prefix("ROOM_PEER_LEFT ") {
272 // Parse message and add the new peer
273 let mut split = msg.splitn(2, ' ');
274 let peer_id = split.next().ok_or_else(|| anyhow!("Can't parse peer id"))?;
276 self.remove_peer(peer_id)
282 // Handle GStreamer messages coming from the pipeline
283 fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> {
284 use gst::message::MessageView;
286 match message.view() {
287 MessageView::Error(err) => bail!(
288 "Error from element {}: {} ({})",
290 .map(|s| String::from(s.path_string()))
291 .unwrap_or_else(|| String::from("None")),
293 err.debug().unwrap_or_else(|| String::from("None")),
295 MessageView::Warning(warning) => {
296 println!("Warning: \"{}\"", warning.debug().unwrap());
304 // Add this new peer and if requested, send the offer to it
305 fn add_peer(&self, peer: &str, offer: bool) -> Result<(), anyhow::Error> {
306 println!("Adding peer {}", peer);
307 let peer_id = str::parse::<u32>(peer).context("Can't parse peer id")?;
308 let mut peers = self.peers.lock().unwrap();
309 if peers.contains_key(&peer_id) {
310 bail!("Peer {} already called", peer_id);
313 let peer_bin = gst::parse_bin_from_description(
314 "queue name=video-queue ! webrtcbin. \
315 queue name=audio-queue ! webrtcbin. \
316 webrtcbin name=webrtcbin",
320 // Get access to the webrtcbin by name
321 let webrtcbin = peer_bin.by_name("webrtcbin").expect("can't find webrtcbin");
323 // Set some properties on webrtcbin
324 webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
325 webrtcbin.set_property_from_str("turn-server", TURN_SERVER);
326 webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
328 // Add ghost pads for connecting to the input
329 let audio_queue = peer_bin
330 .by_name("audio-queue")
331 .expect("can't find audio-queue");
332 let audio_sink_pad = gst::GhostPad::with_target(
334 &audio_queue.static_pad("sink").unwrap(),
337 peer_bin.add_pad(&audio_sink_pad).unwrap();
339 let video_queue = peer_bin
340 .by_name("video-queue")
341 .expect("can't find video-queue");
342 let video_sink_pad = gst::GhostPad::with_target(
344 &video_queue.static_pad("sink").unwrap(),
347 peer_bin.add_pad(&video_sink_pad).unwrap();
349 let peer = Peer(Arc::new(PeerInner {
353 send_msg_tx: self.send_msg_tx.clone(),
356 // Insert the peer into our map_
357 peers.insert(peer_id, peer.clone());
360 // Add to the whole pipeline
361 self.pipeline.add(&peer.bin).unwrap();
363 // If we should send the offer to the peer, do so from on-negotiation-needed
365 // Connect to on-negotiation-needed to handle sending an Offer
366 let peer_clone = peer.downgrade();
367 peer.webrtcbin.connect_closure(
368 "on-negotiation-needed",
370 glib::closure!(move |_webrtcbin: &gst::Element| {
371 let peer = upgrade_weak!(peer_clone);
372 if let Err(err) = peer.on_negotiation_needed() {
375 gst::LibraryError::Failed,
376 ("Failed to negotiate: {:?}", err)
383 // Whenever there is a new ICE candidate, send it to the peer
384 let peer_clone = peer.downgrade();
385 peer.webrtcbin.connect_closure(
389 move |_webrtcbin: &gst::Element, mlineindex: u32, candidate: &str| {
390 let peer = upgrade_weak!(peer_clone);
392 if let Err(err) = peer.on_ice_candidate(mlineindex, candidate) {
395 gst::LibraryError::Failed,
396 ("Failed to send ICE candidate: {:?}", err)
403 // Whenever there is a new stream incoming from the peer, handle it
404 let peer_clone = peer.downgrade();
405 peer.webrtcbin.connect_pad_added(move |_webrtc, pad| {
406 let peer = upgrade_weak!(peer_clone);
408 if let Err(err) = peer.on_incoming_stream(pad) {
411 gst::LibraryError::Failed,
412 ("Failed to handle incoming stream: {:?}", err)
417 // Whenever a decoded stream comes available, handle it and connect it to the mixers
418 let app_clone = self.downgrade();
419 peer.bin.connect_pad_added(move |_bin, pad| {
420 let app = upgrade_weak!(app_clone);
422 if pad.name() == "audio_src" {
423 let audiomixer_sink_pad = app.audio_mixer.request_pad_simple("sink_%u").unwrap();
424 pad.link(&audiomixer_sink_pad).unwrap();
426 // Once it is unlinked again later when the peer is being removed,
427 // also release the pad on the mixer
428 audiomixer_sink_pad.connect_unlinked(move |pad, _peer| {
429 if let Some(audiomixer) = pad.parent() {
430 let audiomixer = audiomixer.downcast_ref::<gst::Element>().unwrap();
431 audiomixer.release_request_pad(pad);
434 } else if pad.name() == "video_src" {
435 let videomixer_sink_pad = app.video_mixer.request_pad_simple("sink_%u").unwrap();
436 pad.link(&videomixer_sink_pad).unwrap();
438 app.relayout_videomixer();
440 // Once it is unlinked again later when the peer is being removed,
441 // also release the pad on the mixer
442 let app_clone = app.downgrade();
443 videomixer_sink_pad.connect_unlinked(move |pad, _peer| {
444 let app = upgrade_weak!(app_clone);
446 if let Some(videomixer) = pad.parent() {
447 let videomixer = videomixer.downcast_ref::<gst::Element>().unwrap();
448 videomixer.release_request_pad(pad);
451 app.relayout_videomixer();
456 // Add pad probes to both tees for blocking them and
457 // then unblock them once we reached the Playing state.
459 // Then link them and unblock, in case they got blocked
462 // Otherwise it might happen that data is received before
463 // the elements are ready and then an error happens.
464 let audio_src_pad = self.audio_tee.request_pad_simple("src_%u").unwrap();
465 let audio_block = audio_src_pad
466 .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
467 gst::PadProbeReturn::Ok
470 audio_src_pad.link(&audio_sink_pad)?;
472 let video_src_pad = self.video_tee.request_pad_simple("src_%u").unwrap();
473 let video_block = video_src_pad
474 .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
475 gst::PadProbeReturn::Ok
478 video_src_pad.link(&video_sink_pad)?;
480 // Asynchronously set the peer bin to Playing
481 peer.bin.call_async(move |bin| {
482 // If this fails, post an error on the bus so we exit
483 if bin.sync_state_with_parent().is_err() {
486 gst::LibraryError::Failed,
487 ("Failed to set peer bin to Playing")
492 audio_src_pad.remove_probe(audio_block);
493 video_src_pad.remove_probe(video_block);
500 fn remove_peer(&self, peer: &str) -> Result<(), anyhow::Error> {
501 println!("Removing peer {}", peer);
502 let peer_id = str::parse::<u32>(peer).context("Can't parse peer id")?;
503 let mut peers = self.peers.lock().unwrap();
504 if let Some(peer) = peers.remove(&peer_id) {
507 // Now asynchronously remove the peer from the pipeline
508 let app_clone = self.downgrade();
509 self.pipeline.call_async(move |_pipeline| {
510 let app = upgrade_weak!(app_clone);
512 // Block the tees shortly for removal
513 let audio_tee_sinkpad = app.audio_tee.static_pad("sink").unwrap();
514 let audio_block = audio_tee_sinkpad
515 .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
516 gst::PadProbeReturn::Ok
520 let video_tee_sinkpad = app.video_tee.static_pad("sink").unwrap();
521 let video_block = video_tee_sinkpad
522 .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
523 gst::PadProbeReturn::Ok
527 // Release the tee pads and unblock
528 let audio_sinkpad = peer.bin.static_pad("audio_sink").unwrap();
529 let video_sinkpad = peer.bin.static_pad("video_sink").unwrap();
531 if let Some(audio_tee_srcpad) = audio_sinkpad.peer() {
532 let _ = audio_tee_srcpad.unlink(&audio_sinkpad);
533 app.audio_tee.release_request_pad(&audio_tee_srcpad);
535 audio_tee_sinkpad.remove_probe(audio_block);
537 if let Some(video_tee_srcpad) = video_sinkpad.peer() {
538 let _ = video_tee_srcpad.unlink(&video_sinkpad);
539 app.video_tee.release_request_pad(&video_tee_srcpad);
541 video_tee_sinkpad.remove_probe(video_block);
543 // Then remove the peer bin gracefully from the pipeline
544 let _ = app.pipeline.remove(&peer.bin);
545 let _ = peer.bin.set_state(gst::State::Null);
547 println!("Removed peer {}", peer.peer_id);
554 fn relayout_videomixer(&self) {
555 let mut pads = self.video_mixer.sink_pads();
560 // We ignore the first pad
562 let npads = pads.len();
564 let (width, height) = if npads <= 1 {
566 } else if npads <= 4 {
568 } else if npads <= 16 {
571 // FIXME: we don't support more than 16 streams for now
577 let w = VIDEO_WIDTH as i32 / width;
578 let h = VIDEO_HEIGHT as i32 / height;
581 pad.set_property("xpos", x);
582 pad.set_property("ypos", y);
583 pad.set_property("width", w);
584 pad.set_property("height", h);
587 if x >= VIDEO_WIDTH as i32 {
595 // Make sure to shut down the pipeline when it goes out of scope
596 // to release any system resources
597 impl Drop for AppInner {
599 let _ = self.pipeline.set_state(gst::State::Null);
604 // Downgrade the strong reference to a weak reference
605 fn downgrade(&self) -> PeerWeak {
606 PeerWeak(Arc::downgrade(&self.0))
609 // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask
610 // for a new offer SDP from webrtcbin without any customization and then
611 // asynchronously send it to the peer via the WebSocket connection
612 fn on_negotiation_needed(&self) -> Result<(), anyhow::Error> {
613 println!("starting negotiation with peer {}", self.peer_id);
615 let peer_clone = self.downgrade();
616 let promise = gst::Promise::with_change_func(move |reply| {
617 let peer = upgrade_weak!(peer_clone);
619 if let Err(err) = peer.on_offer_created(reply) {
622 gst::LibraryError::Failed,
623 ("Failed to send SDP offer: {:?}", err)
629 .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
634 // Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the
635 // WebSocket connection
638 reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
639 ) -> Result<(), anyhow::Error> {
640 let reply = match reply {
641 Ok(Some(reply)) => reply,
643 bail!("Offer creation future got no reponse");
646 bail!("Offer creation future got error reponse: {:?}", err);
653 .get::<gst_webrtc::WebRTCSessionDescription>()
654 .expect("Invalid argument");
656 .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
659 "sending SDP offer to peer: {}",
660 offer.sdp().as_text().unwrap()
663 let message = serde_json::to_string(&JsonMsg::Sdp {
664 type_: "offer".to_string(),
665 sdp: offer.sdp().as_text().unwrap(),
672 .unbounded_send(WsMessage::Text(format!(
673 "ROOM_PEER_MSG {} {}",
674 self.peer_id, message
676 .context("Failed to send SDP offer")?;
681 // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the
682 // WebSocket connection
683 fn on_answer_created(
685 reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
686 ) -> Result<(), anyhow::Error> {
687 let reply = match reply {
688 Ok(Some(reply)) => reply,
690 bail!("Answer creation future got no reponse");
693 bail!("Answer creation future got error reponse: {:?}", err);
700 .get::<gst_webrtc::WebRTCSessionDescription>()
701 .expect("Invalid argument");
703 .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
706 "sending SDP answer to peer: {}",
707 answer.sdp().as_text().unwrap()
710 let message = serde_json::to_string(&JsonMsg::Sdp {
711 type_: "answer".to_string(),
712 sdp: answer.sdp().as_text().unwrap(),
719 .unbounded_send(WsMessage::Text(format!(
720 "ROOM_PEER_MSG {} {}",
721 self.peer_id, message
723 .context("Failed to send SDP answer")?;
728 // Handle incoming SDP answers from the peer
729 fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> {
730 if type_ == "answer" {
731 print!("Received answer:\n{}\n", sdp);
733 let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
734 .map_err(|_| anyhow!("Failed to parse SDP answer"))?;
736 gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
739 .emit_by_name::<()>("set-remote-description", &[&answer, &None::<gst::Promise>]);
742 } else if type_ == "offer" {
743 print!("Received offer:\n{}\n", sdp);
745 let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
746 .map_err(|_| anyhow!("Failed to parse SDP offer"))?;
748 // And then asynchronously start our pipeline and do the next steps. The
749 // pipeline needs to be started before we can create an answer
750 let peer_clone = self.downgrade();
751 self.bin.call_async(move |_pipeline| {
752 let peer = upgrade_weak!(peer_clone);
754 let offer = gst_webrtc::WebRTCSessionDescription::new(
755 gst_webrtc::WebRTCSDPType::Offer,
761 .emit_by_name::<()>("set-remote-description", &[&offer, &None::<gst::Promise>]);
763 let peer_clone = peer.downgrade();
764 let promise = gst::Promise::with_change_func(move |reply| {
765 let peer = upgrade_weak!(peer_clone);
767 if let Err(err) = peer.on_answer_created(reply) {
770 gst::LibraryError::Failed,
771 ("Failed to send SDP answer: {:?}", err)
778 .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
783 bail!("Sdp type is not \"answer\" but \"{}\"", type_)
787 // Handle incoming ICE candidates from the peer by passing them to webrtcbin
788 fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) -> Result<(), anyhow::Error> {
790 .emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]);
795 // Asynchronously send ICE candidates to the peer via the WebSocket connection as a JSON
797 fn on_ice_candidate(&self, mlineindex: u32, candidate: &str) -> Result<(), anyhow::Error> {
798 let message = serde_json::to_string(&JsonMsg::Ice {
799 candidate: candidate.to_string(),
800 sdp_mline_index: mlineindex,
807 .unbounded_send(WsMessage::Text(format!(
808 "ROOM_PEER_MSG {} {}",
809 self.peer_id, message
811 .context("Failed to send ICE candidate")?;
816 // Whenever there's a new incoming, encoded stream from the peer create a new decodebin
817 // and audio/video sink depending on the stream type
818 fn on_incoming_stream(&self, pad: &gst::Pad) -> Result<(), anyhow::Error> {
819 // Early return for the source pads we're adding ourselves
820 if pad.direction() != gst::PadDirection::Src {
824 let caps = pad.current_caps().unwrap();
825 let s = caps.structure(0).unwrap();
827 .get_optional::<&str>("media")
828 .expect("Invalid type")
829 .ok_or_else(|| anyhow!("no media type in caps {:?}", caps))?;
831 let conv = if media_type == "video" {
832 gst::parse_bin_from_description(
834 "decodebin name=dbin ! queue ! videoconvert ! videoscale ! capsfilter name=src caps=video/x-raw,width={width},height={height},pixel-aspect-ratio=1/1",
840 } else if media_type == "audio" {
841 gst::parse_bin_from_description(
842 "decodebin name=dbin ! queue ! audioconvert ! audioresample name=src",
846 println!("Unknown pad {:?}, ignoring", pad);
850 // Add a ghost pad on our conv bin that proxies the sink pad of the decodebin
851 let dbin = conv.by_name("dbin").unwrap();
853 gst::GhostPad::with_target(Some("sink"), &dbin.static_pad("sink").unwrap()).unwrap();
854 conv.add_pad(&sinkpad).unwrap();
856 // And another one that proxies the source pad of the last element
857 let src = conv.by_name("src").unwrap();
859 gst::GhostPad::with_target(Some("src"), &src.static_pad("src").unwrap()).unwrap();
860 conv.add_pad(&srcpad).unwrap();
862 self.bin.add(&conv).unwrap();
863 conv.sync_state_with_parent()
864 .with_context(|| format!("can't start sink for stream {:?}", caps))?;
867 .with_context(|| format!("can't link sink for stream {:?}", caps))?;
869 // And then add a new ghost pad to the peer bin that proxies the source pad we added above
870 if media_type == "video" {
871 let srcpad = gst::GhostPad::with_target(Some("video_src"), &srcpad).unwrap();
872 srcpad.set_active(true).unwrap();
873 self.bin.add_pad(&srcpad).unwrap();
874 } else if media_type == "audio" {
875 let srcpad = gst::GhostPad::with_target(Some("audio_src"), &srcpad).unwrap();
876 srcpad.set_active(true).unwrap();
877 self.bin.add_pad(&srcpad).unwrap();
884 // At least shut down the bin here if it didn't happen so far
885 impl Drop for PeerInner {
887 let _ = self.bin.set_state(gst::State::Null);
892 initial_peers: &[&str],
893 ws: impl Sink<WsMessage, Error = WsError> + Stream<Item = Result<WsMessage, WsError>>,
894 ) -> Result<(), anyhow::Error> {
895 // Split the websocket into the Sink and Stream
896 let (mut ws_sink, ws_stream) = ws.split();
897 // Fuse the Stream, required for the select macro
898 let mut ws_stream = ws_stream.fuse();
900 // Create our application state
901 let (app, send_gst_msg_rx, send_ws_msg_rx) = App::new(initial_peers)?;
903 let mut send_gst_msg_rx = send_gst_msg_rx.fuse();
904 let mut send_ws_msg_rx = send_ws_msg_rx.fuse();
906 // And now let's start our message loop
908 let ws_msg = futures::select! {
909 // Handle the WebSocket messages here
910 ws_msg = ws_stream.select_next_some() => {
912 WsMessage::Close(_) => {
913 println!("peer disconnected");
916 WsMessage::Ping(data) => Some(WsMessage::Pong(data)),
917 WsMessage::Pong(_) => None,
918 WsMessage::Binary(_) => None,
919 WsMessage::Text(text) => {
920 if let Err(err) = app.handle_websocket_message(&text) {
921 println!("Failed to parse message: {}", err);
925 WsMessage::Frame(_) => unreachable!(),
928 // Pass the GStreamer messages to the application control logic
929 gst_msg = send_gst_msg_rx.select_next_some() => {
930 app.handle_pipeline_message(&gst_msg)?;
933 // Handle WebSocket messages we created asynchronously
934 // to send them out now
935 ws_msg = send_ws_msg_rx.select_next_some() => Some(ws_msg),
936 // Once we're done, break the loop and return
940 // If there's a message to send out, do so now
941 if let Some(ws_msg) = ws_msg {
942 ws_sink.send(ws_msg).await?;
949 // Check if all GStreamer plugins we require are available
950 fn check_plugins() -> Result<(), anyhow::Error> {
972 let registry = gst::Registry::get();
975 .filter(|n| registry.find_plugin(n).is_none())
977 .collect::<Vec<_>>();
979 if !missing.is_empty() {
980 bail!("Missing plugins: {:?}", missing);
986 async fn async_main() -> Result<(), anyhow::Error> {
987 // Initialize GStreamer first
992 let args = Args::parse();
994 // Connect to the given server
995 let (mut ws, _) = async_tungstenite::async_std::connect_async(&args.server).await?;
997 println!("connected");
999 // Say HELLO to the server and see if it replies with HELLO
1000 let our_id = rand::thread_rng().gen_range(10..10_000);
1001 println!("Registering id {} with server", our_id);
1002 ws.send(WsMessage::Text(format!("HELLO {}", our_id)))
1008 .ok_or_else(|| anyhow!("didn't receive anything"))??;
1010 if msg != WsMessage::Text("HELLO".into()) {
1011 bail!("server didn't say HELLO");
1014 // Join the given room
1015 ws.send(WsMessage::Text(format!("ROOM {}", args.room_id)))
1021 .ok_or_else(|| anyhow!("didn't receive anything"))??;
1023 let peers_str = if let WsMessage::Text(text) = &msg {
1024 if !text.starts_with("ROOM_OK") {
1025 bail!("server error: {:?}", text);
1028 println!("Joined room {}", args.room_id);
1030 &text["ROOM_OK ".len()..]
1032 bail!("server error: {:?}", msg);
1035 // Collect the ids of already existing peers
1036 let initial_peers = peers_str
1039 // Filter out empty lines
1047 .collect::<Vec<_>>();
1049 // All good, let's run our message loop
1050 run(&initial_peers, ws).await
1053 fn main() -> Result<(), anyhow::Error> {
1054 macos_workaround::run(|| task::block_on(async_main()))