examples: webrtc: Update to gstreamer-rs 0.19 release
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / multiparty-sendrecv / gst-rust / src / main.rs
1 #![recursion_limit = "256"]
2
3 mod macos_workaround;
4
5 use std::collections::BTreeMap;
6 use std::sync::{Arc, Mutex, Weak};
7
8 use rand::prelude::*;
9
10 use clap::Parser;
11
12 use async_std::prelude::*;
13 use async_std::task;
14 use futures::channel::mpsc;
15 use futures::sink::{Sink, SinkExt};
16 use futures::stream::StreamExt;
17
18 use async_tungstenite::tungstenite;
19 use tungstenite::Error as WsError;
20 use tungstenite::Message as WsMessage;
21
22 use gst::glib;
23 use gst::prelude::*;
24
25 use serde_derive::{Deserialize, Serialize};
26
27 use anyhow::{anyhow, bail, Context};
28
29 const STUN_SERVER: &str = "stun://stun.l.google.com:19302";
30 const TURN_SERVER: &str = "turn://foo:bar@webrtc.nirbheek.in:3478";
31 const VIDEO_WIDTH: u32 = 1024;
32 const VIDEO_HEIGHT: u32 = 768;
33
34 // upgrade weak reference or return
35 #[macro_export]
36 macro_rules! upgrade_weak {
37     ($x:ident, $r:expr) => {{
38         match $x.upgrade() {
39             Some(o) => o,
40             None => return $r,
41         }
42     }};
43     ($x:ident) => {
44         upgrade_weak!($x, ())
45     };
46 }
47
48 #[derive(Debug, clap::Parser)]
49 struct Args {
50     #[clap(short, long, default_value = "wss://webrtc.nirbheek.in:8443")]
51     server: String,
52     #[clap(short, long)]
53     room_id: u32,
54 }
55
56 // JSON messages we communicate with
57 #[derive(Serialize, Deserialize)]
58 #[serde(rename_all = "lowercase")]
59 enum JsonMsg {
60     Ice {
61         candidate: String,
62         #[serde(rename = "sdpMLineIndex")]
63         sdp_mline_index: u32,
64     },
65     Sdp {
66         #[serde(rename = "type")]
67         type_: String,
68         sdp: String,
69     },
70 }
71
72 // Strong reference to our application state
73 #[derive(Debug, Clone)]
74 struct App(Arc<AppInner>);
75
76 // Weak reference to our application state
77 #[derive(Debug, Clone)]
78 struct AppWeak(Weak<AppInner>);
79
80 // Actual application state
81 #[derive(Debug)]
82 struct AppInner {
83     pipeline: gst::Pipeline,
84     video_tee: gst::Element,
85     audio_tee: gst::Element,
86     video_mixer: gst::Element,
87     audio_mixer: gst::Element,
88     send_msg_tx: Arc<Mutex<mpsc::UnboundedSender<WsMessage>>>,
89     peers: Mutex<BTreeMap<u32, Peer>>,
90 }
91
92 // Strong reference to the state of one peer
93 #[derive(Debug, Clone)]
94 struct Peer(Arc<PeerInner>);
95
96 // Weak reference to the state of one peer
97 #[derive(Debug, Clone)]
98 struct PeerWeak(Weak<PeerInner>);
99
100 // Actual peer state
101 #[derive(Debug)]
102 struct PeerInner {
103     peer_id: u32,
104     bin: gst::Bin,
105     webrtcbin: gst::Element,
106     send_msg_tx: Arc<Mutex<mpsc::UnboundedSender<WsMessage>>>,
107 }
108
109 // To be able to access the App's fields directly
110 impl std::ops::Deref for App {
111     type Target = AppInner;
112
113     fn deref(&self) -> &AppInner {
114         &self.0
115     }
116 }
117
118 // To be able to access the Peers's fields directly
119 impl std::ops::Deref for Peer {
120     type Target = PeerInner;
121
122     fn deref(&self) -> &PeerInner {
123         &self.0
124     }
125 }
126
127 impl AppWeak {
128     // Try upgrading a weak reference to a strong one
129     fn upgrade(&self) -> Option<App> {
130         self.0.upgrade().map(App)
131     }
132 }
133
134 impl PeerWeak {
135     // Try upgrading a weak reference to a strong one
136     fn upgrade(&self) -> Option<Peer> {
137         self.0.upgrade().map(Peer)
138     }
139 }
140
141 impl App {
142     // Downgrade the strong reference to a weak reference
143     fn downgrade(&self) -> AppWeak {
144         AppWeak(Arc::downgrade(&self.0))
145     }
146
147     fn new(
148         initial_peers: &[&str],
149     ) -> Result<
150         (
151             Self,
152             impl Stream<Item = gst::Message>,
153             impl Stream<Item = WsMessage>,
154         ),
155         anyhow::Error,
156     > {
157         // Create the GStreamer pipeline
158         let pipeline = gst::parse_launch(
159             &format!(
160                 "videotestsrc is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! tee name=video-tee ! \
161                  queue ! fakesink sync=true \
162                  audiotestsrc wave=ticks is-live=true ! opusenc ! rtpopuspay pt=97 ! tee name=audio-tee ! \
163                  queue ! fakesink sync=true \
164                  audiotestsrc wave=silence is-live=true ! audio-mixer. \
165                  audiomixer name=audio-mixer sink_0::mute=true ! audioconvert ! audioresample ! autoaudiosink \
166                  videotestsrc pattern=black ! capsfilter caps=video/x-raw,width=1,height=1 ! video-mixer. \
167                  compositor name=video-mixer background=black sink_0::alpha=0.0 ! capsfilter caps=video/x-raw,width={width},height={height} ! videoconvert ! autovideosink",
168                 width=VIDEO_WIDTH,
169                 height=VIDEO_HEIGHT,
170         ))?;
171
172         // Downcast from gst::Element to gst::Pipeline
173         let pipeline = pipeline
174             .downcast::<gst::Pipeline>()
175             .expect("not a pipeline");
176
177         // Get access to the tees and mixers by name
178         let video_tee = pipeline.by_name("video-tee").expect("can't find video-tee");
179         let audio_tee = pipeline.by_name("audio-tee").expect("can't find audio-tee");
180
181         let video_mixer = pipeline
182             .by_name("video-mixer")
183             .expect("can't find video-mixer");
184         let audio_mixer = pipeline
185             .by_name("audio-mixer")
186             .expect("can't find audio-mixer");
187
188         // Create a stream for handling the GStreamer message asynchronously
189         let bus = pipeline.bus().unwrap();
190         let send_gst_msg_rx = bus.stream();
191
192         // Channel for outgoing WebSocket messages from other threads
193         let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>();
194
195         // Asynchronously set the pipeline to Playing
196         pipeline.call_async(|pipeline| {
197             pipeline
198                 .set_state(gst::State::Playing)
199                 .expect("Couldn't set pipeline to Playing");
200         });
201
202         let app = App(Arc::new(AppInner {
203             pipeline,
204             video_tee,
205             audio_tee,
206             video_mixer,
207             audio_mixer,
208             peers: Mutex::new(BTreeMap::new()),
209             send_msg_tx: Arc::new(Mutex::new(send_ws_msg_tx)),
210         }));
211
212         for peer in initial_peers {
213             app.add_peer(peer, true)?;
214         }
215
216         // Asynchronously set the pipeline to Playing
217         app.pipeline.call_async(|pipeline| {
218             // If this fails, post an error on the bus so we exit
219             if pipeline.set_state(gst::State::Playing).is_err() {
220                 gst::element_error!(
221                     pipeline,
222                     gst::LibraryError::Failed,
223                     ("Failed to set pipeline to Playing")
224                 );
225             }
226         });
227
228         Ok((app, send_gst_msg_rx, send_ws_msg_rx))
229     }
230
231     // Handle WebSocket messages, both our own as well as WebSocket protocol messages
232     fn handle_websocket_message(&self, msg: &str) -> Result<(), anyhow::Error> {
233         if msg.starts_with("ERROR") {
234             bail!("Got error message: {}", msg);
235         }
236
237         if let Some(msg) = msg.strip_prefix("ROOM_PEER_MSG ") {
238             let mut split = msg.splitn(2, ' ');
239
240             let peer_id = split
241                 .next()
242                 .and_then(|s| str::parse::<u32>(s).ok())
243                 .ok_or_else(|| anyhow!("Can't parse peer id"))?;
244
245             let peers = self.peers.lock().unwrap();
246             let peer = peers
247                 .get(&peer_id)
248                 .ok_or_else(|| anyhow!("Can't find peer {}", peer_id))?
249                 .clone();
250             drop(peers);
251
252             let msg = split
253                 .next()
254                 .ok_or_else(|| anyhow!("Can't parse peer message"))?;
255
256             let json_msg: JsonMsg = serde_json::from_str(msg)?;
257
258             match json_msg {
259                 JsonMsg::Sdp { type_, sdp } => peer.handle_sdp(&type_, &sdp),
260                 JsonMsg::Ice {
261                     sdp_mline_index,
262                     candidate,
263                 } => peer.handle_ice(sdp_mline_index, &candidate),
264             }
265         } else if let Some(msg) = msg.strip_prefix("ROOM_PEER_JOINED ") {
266             // Parse message and add the new peer
267             let mut split = msg.splitn(2, ' ');
268             let peer_id = split.next().ok_or_else(|| anyhow!("Can't parse peer id"))?;
269
270             self.add_peer(peer_id, false)
271         } else if let Some(msg) = msg.strip_prefix("ROOM_PEER_LEFT ") {
272             // Parse message and add the new peer
273             let mut split = msg.splitn(2, ' ');
274             let peer_id = split.next().ok_or_else(|| anyhow!("Can't parse peer id"))?;
275
276             self.remove_peer(peer_id)
277         } else {
278             Ok(())
279         }
280     }
281
282     // Handle GStreamer messages coming from the pipeline
283     fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> {
284         use gst::message::MessageView;
285
286         match message.view() {
287             MessageView::Error(err) => bail!(
288                 "Error from element {}: {} ({})",
289                 err.src()
290                     .map(|s| String::from(s.path_string()))
291                     .unwrap_or_else(|| String::from("None")),
292                 err.error(),
293                 err.debug().unwrap_or_else(|| String::from("None")),
294             ),
295             MessageView::Warning(warning) => {
296                 println!("Warning: \"{}\"", warning.debug().unwrap());
297             }
298             _ => (),
299         }
300
301         Ok(())
302     }
303
304     // Add this new peer and if requested, send the offer to it
305     fn add_peer(&self, peer: &str, offer: bool) -> Result<(), anyhow::Error> {
306         println!("Adding peer {}", peer);
307         let peer_id = str::parse::<u32>(peer).context("Can't parse peer id")?;
308         let mut peers = self.peers.lock().unwrap();
309         if peers.contains_key(&peer_id) {
310             bail!("Peer {} already called", peer_id);
311         }
312
313         let peer_bin = gst::parse_bin_from_description(
314             "queue name=video-queue ! webrtcbin. \
315              queue name=audio-queue ! webrtcbin. \
316              webrtcbin name=webrtcbin",
317             false,
318         )?;
319
320         // Get access to the webrtcbin by name
321         let webrtcbin = peer_bin.by_name("webrtcbin").expect("can't find webrtcbin");
322
323         // Set some properties on webrtcbin
324         webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
325         webrtcbin.set_property_from_str("turn-server", TURN_SERVER);
326         webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
327
328         // Add ghost pads for connecting to the input
329         let audio_queue = peer_bin
330             .by_name("audio-queue")
331             .expect("can't find audio-queue");
332         let audio_sink_pad = gst::GhostPad::with_target(
333             Some("audio_sink"),
334             &audio_queue.static_pad("sink").unwrap(),
335         )
336         .unwrap();
337         peer_bin.add_pad(&audio_sink_pad).unwrap();
338
339         let video_queue = peer_bin
340             .by_name("video-queue")
341             .expect("can't find video-queue");
342         let video_sink_pad = gst::GhostPad::with_target(
343             Some("video_sink"),
344             &video_queue.static_pad("sink").unwrap(),
345         )
346         .unwrap();
347         peer_bin.add_pad(&video_sink_pad).unwrap();
348
349         let peer = Peer(Arc::new(PeerInner {
350             peer_id,
351             bin: peer_bin,
352             webrtcbin,
353             send_msg_tx: self.send_msg_tx.clone(),
354         }));
355
356         // Insert the peer into our map_
357         peers.insert(peer_id, peer.clone());
358         drop(peers);
359
360         // Add to the whole pipeline
361         self.pipeline.add(&peer.bin).unwrap();
362
363         // If we should send the offer to the peer, do so from on-negotiation-needed
364         if offer {
365             // Connect to on-negotiation-needed to handle sending an Offer
366             let peer_clone = peer.downgrade();
367             peer.webrtcbin.connect_closure(
368                 "on-negotiation-needed",
369                 false,
370                 glib::closure!(move |_webrtcbin: &gst::Element| {
371                     let peer = upgrade_weak!(peer_clone);
372                     if let Err(err) = peer.on_negotiation_needed() {
373                         gst::element_error!(
374                             peer.bin,
375                             gst::LibraryError::Failed,
376                             ("Failed to negotiate: {:?}", err)
377                         );
378                     }
379                 }),
380             );
381         }
382
383         // Whenever there is a new ICE candidate, send it to the peer
384         let peer_clone = peer.downgrade();
385         peer.webrtcbin.connect_closure(
386             "on-ice-candidate",
387             false,
388             glib::closure!(
389                 move |_webrtcbin: &gst::Element, mlineindex: u32, candidate: &str| {
390                     let peer = upgrade_weak!(peer_clone);
391
392                     if let Err(err) = peer.on_ice_candidate(mlineindex, candidate) {
393                         gst::element_error!(
394                             peer.bin,
395                             gst::LibraryError::Failed,
396                             ("Failed to send ICE candidate: {:?}", err)
397                         );
398                     }
399                 }
400             ),
401         );
402
403         // Whenever there is a new stream incoming from the peer, handle it
404         let peer_clone = peer.downgrade();
405         peer.webrtcbin.connect_pad_added(move |_webrtc, pad| {
406             let peer = upgrade_weak!(peer_clone);
407
408             if let Err(err) = peer.on_incoming_stream(pad) {
409                 gst::element_error!(
410                     peer.bin,
411                     gst::LibraryError::Failed,
412                     ("Failed to handle incoming stream: {:?}", err)
413                 );
414             }
415         });
416
417         // Whenever a decoded stream comes available, handle it and connect it to the mixers
418         let app_clone = self.downgrade();
419         peer.bin.connect_pad_added(move |_bin, pad| {
420             let app = upgrade_weak!(app_clone);
421
422             if pad.name() == "audio_src" {
423                 let audiomixer_sink_pad = app.audio_mixer.request_pad_simple("sink_%u").unwrap();
424                 pad.link(&audiomixer_sink_pad).unwrap();
425
426                 // Once it is unlinked again later when the peer is being removed,
427                 // also release the pad on the mixer
428                 audiomixer_sink_pad.connect_unlinked(move |pad, _peer| {
429                     if let Some(audiomixer) = pad.parent() {
430                         let audiomixer = audiomixer.downcast_ref::<gst::Element>().unwrap();
431                         audiomixer.release_request_pad(pad);
432                     }
433                 });
434             } else if pad.name() == "video_src" {
435                 let videomixer_sink_pad = app.video_mixer.request_pad_simple("sink_%u").unwrap();
436                 pad.link(&videomixer_sink_pad).unwrap();
437
438                 app.relayout_videomixer();
439
440                 // Once it is unlinked again later when the peer is being removed,
441                 // also release the pad on the mixer
442                 let app_clone = app.downgrade();
443                 videomixer_sink_pad.connect_unlinked(move |pad, _peer| {
444                     let app = upgrade_weak!(app_clone);
445
446                     if let Some(videomixer) = pad.parent() {
447                         let videomixer = videomixer.downcast_ref::<gst::Element>().unwrap();
448                         videomixer.release_request_pad(pad);
449                     }
450
451                     app.relayout_videomixer();
452                 });
453             }
454         });
455
456         // Add pad probes to both tees for blocking them and
457         // then unblock them once we reached the Playing state.
458         //
459         // Then link them and unblock, in case they got blocked
460         // in the meantime.
461         //
462         // Otherwise it might happen that data is received before
463         // the elements are ready and then an error happens.
464         let audio_src_pad = self.audio_tee.request_pad_simple("src_%u").unwrap();
465         let audio_block = audio_src_pad
466             .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
467                 gst::PadProbeReturn::Ok
468             })
469             .unwrap();
470         audio_src_pad.link(&audio_sink_pad)?;
471
472         let video_src_pad = self.video_tee.request_pad_simple("src_%u").unwrap();
473         let video_block = video_src_pad
474             .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
475                 gst::PadProbeReturn::Ok
476             })
477             .unwrap();
478         video_src_pad.link(&video_sink_pad)?;
479
480         // Asynchronously set the peer bin to Playing
481         peer.bin.call_async(move |bin| {
482             // If this fails, post an error on the bus so we exit
483             if bin.sync_state_with_parent().is_err() {
484                 gst::element_error!(
485                     bin,
486                     gst::LibraryError::Failed,
487                     ("Failed to set peer bin to Playing")
488                 );
489             }
490
491             // And now unblock
492             audio_src_pad.remove_probe(audio_block);
493             video_src_pad.remove_probe(video_block);
494         });
495
496         Ok(())
497     }
498
499     // Remove this peer
500     fn remove_peer(&self, peer: &str) -> Result<(), anyhow::Error> {
501         println!("Removing peer {}", peer);
502         let peer_id = str::parse::<u32>(peer).context("Can't parse peer id")?;
503         let mut peers = self.peers.lock().unwrap();
504         if let Some(peer) = peers.remove(&peer_id) {
505             drop(peers);
506
507             // Now asynchronously remove the peer from the pipeline
508             let app_clone = self.downgrade();
509             self.pipeline.call_async(move |_pipeline| {
510                 let app = upgrade_weak!(app_clone);
511
512                 // Block the tees shortly for removal
513                 let audio_tee_sinkpad = app.audio_tee.static_pad("sink").unwrap();
514                 let audio_block = audio_tee_sinkpad
515                     .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
516                         gst::PadProbeReturn::Ok
517                     })
518                     .unwrap();
519
520                 let video_tee_sinkpad = app.video_tee.static_pad("sink").unwrap();
521                 let video_block = video_tee_sinkpad
522                     .add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, |_pad, _info| {
523                         gst::PadProbeReturn::Ok
524                     })
525                     .unwrap();
526
527                 // Release the tee pads and unblock
528                 let audio_sinkpad = peer.bin.static_pad("audio_sink").unwrap();
529                 let video_sinkpad = peer.bin.static_pad("video_sink").unwrap();
530
531                 if let Some(audio_tee_srcpad) = audio_sinkpad.peer() {
532                     let _ = audio_tee_srcpad.unlink(&audio_sinkpad);
533                     app.audio_tee.release_request_pad(&audio_tee_srcpad);
534                 }
535                 audio_tee_sinkpad.remove_probe(audio_block);
536
537                 if let Some(video_tee_srcpad) = video_sinkpad.peer() {
538                     let _ = video_tee_srcpad.unlink(&video_sinkpad);
539                     app.video_tee.release_request_pad(&video_tee_srcpad);
540                 }
541                 video_tee_sinkpad.remove_probe(video_block);
542
543                 // Then remove the peer bin gracefully from the pipeline
544                 let _ = app.pipeline.remove(&peer.bin);
545                 let _ = peer.bin.set_state(gst::State::Null);
546
547                 println!("Removed peer {}", peer.peer_id);
548             });
549         }
550
551         Ok(())
552     }
553
554     fn relayout_videomixer(&self) {
555         let mut pads = self.video_mixer.sink_pads();
556         if pads.is_empty() {
557             return;
558         }
559
560         // We ignore the first pad
561         pads.remove(0);
562         let npads = pads.len();
563
564         let (width, height) = if npads <= 1 {
565             (1, 1)
566         } else if npads <= 4 {
567             (2, 2)
568         } else if npads <= 16 {
569             (4, 4)
570         } else {
571             // FIXME: we don't support more than 16 streams for now
572             (4, 4)
573         };
574
575         let mut x: i32 = 0;
576         let mut y: i32 = 0;
577         let w = VIDEO_WIDTH as i32 / width;
578         let h = VIDEO_HEIGHT as i32 / height;
579
580         for pad in pads {
581             pad.set_property("xpos", x);
582             pad.set_property("ypos", y);
583             pad.set_property("width", w);
584             pad.set_property("height", h);
585
586             x += w;
587             if x >= VIDEO_WIDTH as i32 {
588                 x = 0;
589                 y += h;
590             }
591         }
592     }
593 }
594
595 // Make sure to shut down the pipeline when it goes out of scope
596 // to release any system resources
597 impl Drop for AppInner {
598     fn drop(&mut self) {
599         let _ = self.pipeline.set_state(gst::State::Null);
600     }
601 }
602
603 impl Peer {
604     // Downgrade the strong reference to a weak reference
605     fn downgrade(&self) -> PeerWeak {
606         PeerWeak(Arc::downgrade(&self.0))
607     }
608
609     // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask
610     // for a new offer SDP from webrtcbin without any customization and then
611     // asynchronously send it to the peer via the WebSocket connection
612     fn on_negotiation_needed(&self) -> Result<(), anyhow::Error> {
613         println!("starting negotiation with peer {}", self.peer_id);
614
615         let peer_clone = self.downgrade();
616         let promise = gst::Promise::with_change_func(move |reply| {
617             let peer = upgrade_weak!(peer_clone);
618
619             if let Err(err) = peer.on_offer_created(reply) {
620                 gst::element_error!(
621                     peer.bin,
622                     gst::LibraryError::Failed,
623                     ("Failed to send SDP offer: {:?}", err)
624                 );
625             }
626         });
627
628         self.webrtcbin
629             .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
630
631         Ok(())
632     }
633
634     // Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the
635     // WebSocket connection
636     fn on_offer_created(
637         &self,
638         reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
639     ) -> Result<(), anyhow::Error> {
640         let reply = match reply {
641             Ok(Some(reply)) => reply,
642             Ok(None) => {
643                 bail!("Offer creation future got no reponse");
644             }
645             Err(err) => {
646                 bail!("Offer creation future got error reponse: {:?}", err);
647             }
648         };
649
650         let offer = reply
651             .value("offer")
652             .unwrap()
653             .get::<gst_webrtc::WebRTCSessionDescription>()
654             .expect("Invalid argument");
655         self.webrtcbin
656             .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
657
658         println!(
659             "sending SDP offer to peer: {}",
660             offer.sdp().as_text().unwrap()
661         );
662
663         let message = serde_json::to_string(&JsonMsg::Sdp {
664             type_: "offer".to_string(),
665             sdp: offer.sdp().as_text().unwrap(),
666         })
667         .unwrap();
668
669         self.send_msg_tx
670             .lock()
671             .unwrap()
672             .unbounded_send(WsMessage::Text(format!(
673                 "ROOM_PEER_MSG {} {}",
674                 self.peer_id, message
675             )))
676             .context("Failed to send SDP offer")?;
677
678         Ok(())
679     }
680
681     // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the
682     // WebSocket connection
683     fn on_answer_created(
684         &self,
685         reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
686     ) -> Result<(), anyhow::Error> {
687         let reply = match reply {
688             Ok(Some(reply)) => reply,
689             Ok(None) => {
690                 bail!("Answer creation future got no reponse");
691             }
692             Err(err) => {
693                 bail!("Answer creation future got error reponse: {:?}", err);
694             }
695         };
696
697         let answer = reply
698             .value("answer")
699             .unwrap()
700             .get::<gst_webrtc::WebRTCSessionDescription>()
701             .expect("Invalid argument");
702         self.webrtcbin
703             .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
704
705         println!(
706             "sending SDP answer to peer: {}",
707             answer.sdp().as_text().unwrap()
708         );
709
710         let message = serde_json::to_string(&JsonMsg::Sdp {
711             type_: "answer".to_string(),
712             sdp: answer.sdp().as_text().unwrap(),
713         })
714         .unwrap();
715
716         self.send_msg_tx
717             .lock()
718             .unwrap()
719             .unbounded_send(WsMessage::Text(format!(
720                 "ROOM_PEER_MSG {} {}",
721                 self.peer_id, message
722             )))
723             .context("Failed to send SDP answer")?;
724
725         Ok(())
726     }
727
728     // Handle incoming SDP answers from the peer
729     fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> {
730         if type_ == "answer" {
731             print!("Received answer:\n{}\n", sdp);
732
733             let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
734                 .map_err(|_| anyhow!("Failed to parse SDP answer"))?;
735             let answer =
736                 gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
737
738             self.webrtcbin
739                 .emit_by_name::<()>("set-remote-description", &[&answer, &None::<gst::Promise>]);
740
741             Ok(())
742         } else if type_ == "offer" {
743             print!("Received offer:\n{}\n", sdp);
744
745             let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
746                 .map_err(|_| anyhow!("Failed to parse SDP offer"))?;
747
748             // And then asynchronously start our pipeline and do the next steps. The
749             // pipeline needs to be started before we can create an answer
750             let peer_clone = self.downgrade();
751             self.bin.call_async(move |_pipeline| {
752                 let peer = upgrade_weak!(peer_clone);
753
754                 let offer = gst_webrtc::WebRTCSessionDescription::new(
755                     gst_webrtc::WebRTCSDPType::Offer,
756                     ret,
757                 );
758
759                 peer.0
760                     .webrtcbin
761                     .emit_by_name::<()>("set-remote-description", &[&offer, &None::<gst::Promise>]);
762
763                 let peer_clone = peer.downgrade();
764                 let promise = gst::Promise::with_change_func(move |reply| {
765                     let peer = upgrade_weak!(peer_clone);
766
767                     if let Err(err) = peer.on_answer_created(reply) {
768                         gst::element_error!(
769                             peer.bin,
770                             gst::LibraryError::Failed,
771                             ("Failed to send SDP answer: {:?}", err)
772                         );
773                     }
774                 });
775
776                 peer.0
777                     .webrtcbin
778                     .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
779             });
780
781             Ok(())
782         } else {
783             bail!("Sdp type is not \"answer\" but \"{}\"", type_)
784         }
785     }
786
787     // Handle incoming ICE candidates from the peer by passing them to webrtcbin
788     fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) -> Result<(), anyhow::Error> {
789         self.webrtcbin
790             .emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]);
791
792         Ok(())
793     }
794
795     // Asynchronously send ICE candidates to the peer via the WebSocket connection as a JSON
796     // message
797     fn on_ice_candidate(&self, mlineindex: u32, candidate: &str) -> Result<(), anyhow::Error> {
798         let message = serde_json::to_string(&JsonMsg::Ice {
799             candidate: candidate.to_string(),
800             sdp_mline_index: mlineindex,
801         })
802         .unwrap();
803
804         self.send_msg_tx
805             .lock()
806             .unwrap()
807             .unbounded_send(WsMessage::Text(format!(
808                 "ROOM_PEER_MSG {} {}",
809                 self.peer_id, message
810             )))
811             .context("Failed to send ICE candidate")?;
812
813         Ok(())
814     }
815
816     // Whenever there's a new incoming, encoded stream from the peer create a new decodebin
817     // and audio/video sink depending on the stream type
818     fn on_incoming_stream(&self, pad: &gst::Pad) -> Result<(), anyhow::Error> {
819         // Early return for the source pads we're adding ourselves
820         if pad.direction() != gst::PadDirection::Src {
821             return Ok(());
822         }
823
824         let caps = pad.current_caps().unwrap();
825         let s = caps.structure(0).unwrap();
826         let media_type = s
827             .get_optional::<&str>("media")
828             .expect("Invalid type")
829             .ok_or_else(|| anyhow!("no media type in caps {:?}", caps))?;
830
831         let conv = if media_type == "video" {
832             gst::parse_bin_from_description(
833                 &format!(
834                     "decodebin name=dbin ! queue ! videoconvert ! videoscale ! capsfilter name=src caps=video/x-raw,width={width},height={height},pixel-aspect-ratio=1/1",
835                     width=VIDEO_WIDTH,
836                     height=VIDEO_HEIGHT
837                 ),
838                 false,
839             )?
840         } else if media_type == "audio" {
841             gst::parse_bin_from_description(
842                 "decodebin name=dbin ! queue ! audioconvert ! audioresample name=src",
843                 false,
844             )?
845         } else {
846             println!("Unknown pad {:?}, ignoring", pad);
847             return Ok(());
848         };
849
850         // Add a ghost pad on our conv bin that proxies the sink pad of the decodebin
851         let dbin = conv.by_name("dbin").unwrap();
852         let sinkpad =
853             gst::GhostPad::with_target(Some("sink"), &dbin.static_pad("sink").unwrap()).unwrap();
854         conv.add_pad(&sinkpad).unwrap();
855
856         // And another one that proxies the source pad of the last element
857         let src = conv.by_name("src").unwrap();
858         let srcpad =
859             gst::GhostPad::with_target(Some("src"), &src.static_pad("src").unwrap()).unwrap();
860         conv.add_pad(&srcpad).unwrap();
861
862         self.bin.add(&conv).unwrap();
863         conv.sync_state_with_parent()
864             .with_context(|| format!("can't start sink for stream {:?}", caps))?;
865
866         pad.link(&sinkpad)
867             .with_context(|| format!("can't link sink for stream {:?}", caps))?;
868
869         // And then add a new ghost pad to the peer bin that proxies the source pad we added above
870         if media_type == "video" {
871             let srcpad = gst::GhostPad::with_target(Some("video_src"), &srcpad).unwrap();
872             srcpad.set_active(true).unwrap();
873             self.bin.add_pad(&srcpad).unwrap();
874         } else if media_type == "audio" {
875             let srcpad = gst::GhostPad::with_target(Some("audio_src"), &srcpad).unwrap();
876             srcpad.set_active(true).unwrap();
877             self.bin.add_pad(&srcpad).unwrap();
878         }
879
880         Ok(())
881     }
882 }
883
884 // At least shut down the bin here if it didn't happen so far
885 impl Drop for PeerInner {
886     fn drop(&mut self) {
887         let _ = self.bin.set_state(gst::State::Null);
888     }
889 }
890
891 async fn run(
892     initial_peers: &[&str],
893     ws: impl Sink<WsMessage, Error = WsError> + Stream<Item = Result<WsMessage, WsError>>,
894 ) -> Result<(), anyhow::Error> {
895     // Split the websocket into the Sink and Stream
896     let (mut ws_sink, ws_stream) = ws.split();
897     // Fuse the Stream, required for the select macro
898     let mut ws_stream = ws_stream.fuse();
899
900     // Create our application state
901     let (app, send_gst_msg_rx, send_ws_msg_rx) = App::new(initial_peers)?;
902
903     let mut send_gst_msg_rx = send_gst_msg_rx.fuse();
904     let mut send_ws_msg_rx = send_ws_msg_rx.fuse();
905
906     // And now let's start our message loop
907     loop {
908         let ws_msg = futures::select! {
909             // Handle the WebSocket messages here
910             ws_msg = ws_stream.select_next_some() => {
911                 match ws_msg? {
912                     WsMessage::Close(_) => {
913                         println!("peer disconnected");
914                         break
915                     },
916                     WsMessage::Ping(data) => Some(WsMessage::Pong(data)),
917                     WsMessage::Pong(_) => None,
918                     WsMessage::Binary(_) => None,
919                     WsMessage::Text(text) => {
920                         if let Err(err) = app.handle_websocket_message(&text) {
921                             println!("Failed to parse message: {}", err);
922                         }
923                         None
924                     },
925                     WsMessage::Frame(_) => unreachable!(),
926                 }
927             },
928             // Pass the GStreamer messages to the application control logic
929             gst_msg = send_gst_msg_rx.select_next_some() => {
930                 app.handle_pipeline_message(&gst_msg)?;
931                 None
932             },
933             // Handle WebSocket messages we created asynchronously
934             // to send them out now
935             ws_msg = send_ws_msg_rx.select_next_some() => Some(ws_msg),
936             // Once we're done, break the loop and return
937             complete => break,
938         };
939
940         // If there's a message to send out, do so now
941         if let Some(ws_msg) = ws_msg {
942             ws_sink.send(ws_msg).await?;
943         }
944     }
945
946     Ok(())
947 }
948
949 // Check if all GStreamer plugins we require are available
950 fn check_plugins() -> Result<(), anyhow::Error> {
951     let needed = [
952         "videotestsrc",
953         "audiotestsrc",
954         "videoconvert",
955         "audioconvert",
956         "autodetect",
957         "opus",
958         "vpx",
959         "webrtc",
960         "nice",
961         "dtls",
962         "srtp",
963         "rtpmanager",
964         "rtp",
965         "playback",
966         "videoscale",
967         "audioresample",
968         "compositor",
969         "audiomixer",
970     ];
971
972     let registry = gst::Registry::get();
973     let missing = needed
974         .iter()
975         .filter(|n| registry.find_plugin(n).is_none())
976         .cloned()
977         .collect::<Vec<_>>();
978
979     if !missing.is_empty() {
980         bail!("Missing plugins: {:?}", missing);
981     } else {
982         Ok(())
983     }
984 }
985
986 async fn async_main() -> Result<(), anyhow::Error> {
987     // Initialize GStreamer first
988     gst::init()?;
989
990     check_plugins()?;
991
992     let args = Args::parse();
993
994     // Connect to the given server
995     let (mut ws, _) = async_tungstenite::async_std::connect_async(&args.server).await?;
996
997     println!("connected");
998
999     // Say HELLO to the server and see if it replies with HELLO
1000     let our_id = rand::thread_rng().gen_range(10..10_000);
1001     println!("Registering id {} with server", our_id);
1002     ws.send(WsMessage::Text(format!("HELLO {}", our_id)))
1003         .await?;
1004
1005     let msg = ws
1006         .next()
1007         .await
1008         .ok_or_else(|| anyhow!("didn't receive anything"))??;
1009
1010     if msg != WsMessage::Text("HELLO".into()) {
1011         bail!("server didn't say HELLO");
1012     }
1013
1014     // Join the given room
1015     ws.send(WsMessage::Text(format!("ROOM {}", args.room_id)))
1016         .await?;
1017
1018     let msg = ws
1019         .next()
1020         .await
1021         .ok_or_else(|| anyhow!("didn't receive anything"))??;
1022
1023     let peers_str = if let WsMessage::Text(text) = &msg {
1024         if !text.starts_with("ROOM_OK") {
1025             bail!("server error: {:?}", text);
1026         }
1027
1028         println!("Joined room {}", args.room_id);
1029
1030         &text["ROOM_OK ".len()..]
1031     } else {
1032         bail!("server error: {:?}", msg);
1033     };
1034
1035     // Collect the ids of already existing peers
1036     let initial_peers = peers_str
1037         .split(' ')
1038         .filter_map(|p| {
1039             // Filter out empty lines
1040             let p = p.trim();
1041             if p.is_empty() {
1042                 None
1043             } else {
1044                 Some(p)
1045             }
1046         })
1047         .collect::<Vec<_>>();
1048
1049     // All good, let's run our message loop
1050     run(&initial_peers, ws).await
1051 }
1052
1053 fn main() -> Result<(), anyhow::Error> {
1054     macos_workaround::run(|| task::block_on(async_main()))
1055 }