examples: webrtc: fix plugins check
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst-rust / src / main.rs
1 mod macos_workaround;
2
3 use std::sync::{Arc, Mutex, Weak};
4
5 use rand::prelude::*;
6
7 use clap::Parser;
8
9 use async_std::prelude::*;
10 use async_std::task;
11 use futures::channel::mpsc;
12 use futures::sink::{Sink, SinkExt};
13 use futures::stream::StreamExt;
14
15 use async_tungstenite::tungstenite;
16 use tungstenite::Error as WsError;
17 use tungstenite::Message as WsMessage;
18
19 use gst::glib;
20 use gst::prelude::*;
21
22 use serde_derive::{Deserialize, Serialize};
23
24 use anyhow::{anyhow, bail, Context};
25
26 const STUN_SERVER: &str = "stun://stun.l.google.com:19302";
27 const TURN_SERVER: &str = "turn://foo:bar@webrtc.nirbheek.in:3478";
28
29 // upgrade weak reference or return
30 #[macro_export]
31 macro_rules! upgrade_weak {
32     ($x:ident, $r:expr) => {{
33         match $x.upgrade() {
34             Some(o) => o,
35             None => return $r,
36         }
37     }};
38     ($x:ident) => {
39         upgrade_weak!($x, ())
40     };
41 }
42
43 #[derive(Debug, clap::Parser)]
44 struct Args {
45     #[clap(short, long, default_value = "wss://webrtc.nirbheek.in:8443")]
46     server: String,
47     #[clap(short, long)]
48     peer_id: Option<u32>,
49 }
50
51 // JSON messages we communicate with
52 #[derive(Serialize, Deserialize)]
53 #[serde(rename_all = "lowercase")]
54 enum JsonMsg {
55     Ice {
56         candidate: String,
57         #[serde(rename = "sdpMLineIndex")]
58         sdp_mline_index: u32,
59     },
60     Sdp {
61         #[serde(rename = "type")]
62         type_: String,
63         sdp: String,
64     },
65 }
66
67 // Strong reference to our application state
68 #[derive(Debug, Clone)]
69 struct App(Arc<AppInner>);
70
71 // Weak reference to our application state
72 #[derive(Debug, Clone)]
73 struct AppWeak(Weak<AppInner>);
74
75 // Actual application state
76 #[derive(Debug)]
77 struct AppInner {
78     args: Args,
79     pipeline: gst::Pipeline,
80     webrtcbin: gst::Element,
81     send_msg_tx: Mutex<mpsc::UnboundedSender<WsMessage>>,
82 }
83
84 // To be able to access the App's fields directly
85 impl std::ops::Deref for App {
86     type Target = AppInner;
87
88     fn deref(&self) -> &AppInner {
89         &self.0
90     }
91 }
92
93 impl AppWeak {
94     // Try upgrading a weak reference to a strong one
95     fn upgrade(&self) -> Option<App> {
96         self.0.upgrade().map(App)
97     }
98 }
99
100 impl App {
101     // Downgrade the strong reference to a weak reference
102     fn downgrade(&self) -> AppWeak {
103         AppWeak(Arc::downgrade(&self.0))
104     }
105
106     fn new(
107         args: Args,
108     ) -> Result<
109         (
110             Self,
111             impl Stream<Item = gst::Message>,
112             impl Stream<Item = WsMessage>,
113         ),
114         anyhow::Error,
115     > {
116         // Create the GStreamer pipeline
117         let pipeline = gst::parse_launch(
118         "videotestsrc pattern=ball is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! webrtcbin. \
119          audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 ! webrtcbin. \
120          webrtcbin name=webrtcbin"
121     )?;
122
123         // Downcast from gst::Element to gst::Pipeline
124         let pipeline = pipeline
125             .downcast::<gst::Pipeline>()
126             .expect("not a pipeline");
127
128         // Get access to the webrtcbin by name
129         let webrtcbin = pipeline.by_name("webrtcbin").expect("can't find webrtcbin");
130
131         // Set some properties on webrtcbin
132         webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
133         webrtcbin.set_property_from_str("turn-server", TURN_SERVER);
134         webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
135
136         // Create a stream for handling the GStreamer message asynchronously
137         let bus = pipeline.bus().unwrap();
138         let send_gst_msg_rx = bus.stream();
139
140         // Channel for outgoing WebSocket messages from other threads
141         let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>();
142
143         let app = App(Arc::new(AppInner {
144             args,
145             pipeline,
146             webrtcbin,
147             send_msg_tx: Mutex::new(send_ws_msg_tx),
148         }));
149
150         // Connect to on-negotiation-needed to handle sending an Offer
151         if app.args.peer_id.is_some() {
152             let app_clone = app.downgrade();
153             app.webrtcbin.connect_closure(
154                 "on-negotiation-needed",
155                 false,
156                 glib::closure!(move |_webrtcbin: &gst::Element| {
157                     let app = upgrade_weak!(app_clone);
158                     if let Err(err) = app.on_negotiation_needed() {
159                         gst::element_error!(
160                             app.pipeline,
161                             gst::LibraryError::Failed,
162                             ("Failed to negotiate: {:?}", err)
163                         );
164                     }
165                 }),
166             );
167         }
168
169         // Whenever there is a new ICE candidate, send it to the peer
170         let app_clone = app.downgrade();
171         app.webrtcbin.connect_closure(
172             "on-ice-candidate",
173             false,
174             glib::closure!(
175                 move |_webrtcbin: &gst::Element, mlineindex: u32, candidate: &str| {
176                     let app = upgrade_weak!(app_clone);
177
178                     if let Err(err) = app.on_ice_candidate(mlineindex, candidate) {
179                         gst::element_error!(
180                             app.pipeline,
181                             gst::LibraryError::Failed,
182                             ("Failed to send ICE candidate: {:?}", err)
183                         );
184                     }
185                 }
186             ),
187         );
188
189         // Whenever there is a new stream incoming from the peer, handle it
190         let app_clone = app.downgrade();
191         app.webrtcbin.connect_pad_added(move |_webrtc, pad| {
192             let app = upgrade_weak!(app_clone);
193
194             if let Err(err) = app.on_incoming_stream(pad) {
195                 gst::element_error!(
196                     app.pipeline,
197                     gst::LibraryError::Failed,
198                     ("Failed to handle incoming stream: {:?}", err)
199                 );
200             }
201         });
202
203         // Asynchronously set the pipeline to Playing
204         app.pipeline.call_async(|pipeline| {
205             // If this fails, post an error on the bus so we exit
206             if pipeline.set_state(gst::State::Playing).is_err() {
207                 gst::element_error!(
208                     pipeline,
209                     gst::LibraryError::Failed,
210                     ("Failed to set pipeline to Playing")
211                 );
212             }
213         });
214
215         // Asynchronously set the pipeline to Playing
216         app.pipeline.call_async(|pipeline| {
217             pipeline
218                 .set_state(gst::State::Playing)
219                 .expect("Couldn't set pipeline to Playing");
220         });
221
222         Ok((app, send_gst_msg_rx, send_ws_msg_rx))
223     }
224
225     // Handle WebSocket messages, both our own as well as WebSocket protocol messages
226     fn handle_websocket_message(&self, msg: &str) -> Result<(), anyhow::Error> {
227         if msg.starts_with("ERROR") {
228             bail!("Got error message: {}", msg);
229         }
230
231         let json_msg: JsonMsg = serde_json::from_str(msg)?;
232
233         match json_msg {
234             JsonMsg::Sdp { type_, sdp } => self.handle_sdp(&type_, &sdp),
235             JsonMsg::Ice {
236                 sdp_mline_index,
237                 candidate,
238             } => self.handle_ice(sdp_mline_index, &candidate),
239         }
240     }
241
242     // Handle GStreamer messages coming from the pipeline
243     fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> {
244         use gst::message::MessageView;
245
246         match message.view() {
247             MessageView::Error(err) => bail!(
248                 "Error from element {}: {} ({})",
249                 err.src()
250                     .map(|s| String::from(s.path_string()))
251                     .unwrap_or_else(|| String::from("None")),
252                 err.error(),
253                 err.debug().unwrap_or_else(|| String::from("None")),
254             ),
255             MessageView::Warning(warning) => {
256                 println!("Warning: \"{}\"", warning.debug().unwrap());
257             }
258             _ => (),
259         }
260
261         Ok(())
262     }
263
264     // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask
265     // for a new offer SDP from webrtcbin without any customization and then
266     // asynchronously send it to the peer via the WebSocket connection
267     fn on_negotiation_needed(&self) -> Result<(), anyhow::Error> {
268         println!("starting negotiation");
269
270         let app_clone = self.downgrade();
271         let promise = gst::Promise::with_change_func(move |reply| {
272             let app = upgrade_weak!(app_clone);
273
274             if let Err(err) = app.on_offer_created(reply) {
275                 gst::element_error!(
276                     app.pipeline,
277                     gst::LibraryError::Failed,
278                     ("Failed to send SDP offer: {:?}", err)
279                 );
280             }
281         });
282
283         self.webrtcbin
284             .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
285
286         Ok(())
287     }
288
289     // Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the
290     // WebSocket connection
291     fn on_offer_created(
292         &self,
293         reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
294     ) -> Result<(), anyhow::Error> {
295         let reply = match reply {
296             Ok(Some(reply)) => reply,
297             Ok(None) => {
298                 bail!("Offer creation future got no response");
299             }
300             Err(err) => {
301                 bail!("Offer creation future got error response: {:?}", err);
302             }
303         };
304
305         let offer = reply
306             .value("offer")
307             .unwrap()
308             .get::<gst_webrtc::WebRTCSessionDescription>()
309             .expect("Invalid argument");
310         self.webrtcbin
311             .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
312
313         println!(
314             "sending SDP offer to peer: {}",
315             offer.sdp().as_text().unwrap()
316         );
317
318         let message = serde_json::to_string(&JsonMsg::Sdp {
319             type_: "offer".to_string(),
320             sdp: offer.sdp().as_text().unwrap(),
321         })
322         .unwrap();
323
324         self.send_msg_tx
325             .lock()
326             .unwrap()
327             .unbounded_send(WsMessage::Text(message))
328             .context("Failed to send SDP offer")?;
329
330         Ok(())
331     }
332
333     // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the
334     // WebSocket connection
335     fn on_answer_created(
336         &self,
337         reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
338     ) -> Result<(), anyhow::Error> {
339         let reply = match reply {
340             Ok(Some(reply)) => reply,
341             Ok(None) => {
342                 bail!("Answer creation future got no response");
343             }
344             Err(err) => {
345                 bail!("Answer creation future got error response: {:?}", err);
346             }
347         };
348
349         let answer = reply
350             .value("answer")
351             .unwrap()
352             .get::<gst_webrtc::WebRTCSessionDescription>()
353             .expect("Invalid argument");
354         self.webrtcbin
355             .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
356
357         println!(
358             "sending SDP answer to peer: {}",
359             answer.sdp().as_text().unwrap()
360         );
361
362         let message = serde_json::to_string(&JsonMsg::Sdp {
363             type_: "answer".to_string(),
364             sdp: answer.sdp().as_text().unwrap(),
365         })
366         .unwrap();
367
368         self.send_msg_tx
369             .lock()
370             .unwrap()
371             .unbounded_send(WsMessage::Text(message))
372             .context("Failed to send SDP answer")?;
373
374         Ok(())
375     }
376
377     // Handle incoming SDP answers from the peer
378     fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> {
379         if type_ == "answer" {
380             print!("Received answer:\n{}\n", sdp);
381
382             let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
383                 .map_err(|_| anyhow!("Failed to parse SDP answer"))?;
384             let answer =
385                 gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
386
387             self.webrtcbin
388                 .emit_by_name::<()>("set-remote-description", &[&answer, &None::<gst::Promise>]);
389
390             Ok(())
391         } else if type_ == "offer" {
392             print!("Received offer:\n{}\n", sdp);
393
394             let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
395                 .map_err(|_| anyhow!("Failed to parse SDP offer"))?;
396
397             // And then asynchronously start our pipeline and do the next steps. The
398             // pipeline needs to be started before we can create an answer
399             let app_clone = self.downgrade();
400             self.pipeline.call_async(move |_pipeline| {
401                 let app = upgrade_weak!(app_clone);
402
403                 let offer = gst_webrtc::WebRTCSessionDescription::new(
404                     gst_webrtc::WebRTCSDPType::Offer,
405                     ret,
406                 );
407
408                 app.0
409                     .webrtcbin
410                     .emit_by_name::<()>("set-remote-description", &[&offer, &None::<gst::Promise>]);
411
412                 let app_clone = app.downgrade();
413                 let promise = gst::Promise::with_change_func(move |reply| {
414                     let app = upgrade_weak!(app_clone);
415
416                     if let Err(err) = app.on_answer_created(reply) {
417                         gst::element_error!(
418                             app.pipeline,
419                             gst::LibraryError::Failed,
420                             ("Failed to send SDP answer: {:?}", err)
421                         );
422                     }
423                 });
424
425                 app.0
426                     .webrtcbin
427                     .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
428             });
429
430             Ok(())
431         } else {
432             bail!("Sdp type is not \"answer\" but \"{}\"", type_)
433         }
434     }
435
436     // Handle incoming ICE candidates from the peer by passing them to webrtcbin
437     fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) -> Result<(), anyhow::Error> {
438         self.webrtcbin
439             .emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]);
440
441         Ok(())
442     }
443
444     // Asynchronously send ICE candidates to the peer via the WebSocket connection as a JSON
445     // message
446     fn on_ice_candidate(&self, mlineindex: u32, candidate: &str) -> Result<(), anyhow::Error> {
447         let message = serde_json::to_string(&JsonMsg::Ice {
448             candidate: candidate.to_string(),
449             sdp_mline_index: mlineindex,
450         })
451         .unwrap();
452
453         self.send_msg_tx
454             .lock()
455             .unwrap()
456             .unbounded_send(WsMessage::Text(message))
457             .context("Failed to send ICE candidate")?;
458
459         Ok(())
460     }
461
462     // Whenever there's a new incoming, encoded stream from the peer create a new decodebin
463     fn on_incoming_stream(&self, pad: &gst::Pad) -> Result<(), anyhow::Error> {
464         // Early return for the source pads we're adding ourselves
465         if pad.direction() != gst::PadDirection::Src {
466             return Ok(());
467         }
468
469         let decodebin = gst::ElementFactory::make("decodebin").build().unwrap();
470         let app_clone = self.downgrade();
471         decodebin.connect_pad_added(move |_decodebin, pad| {
472             let app = upgrade_weak!(app_clone);
473
474             if let Err(err) = app.on_incoming_decodebin_stream(pad) {
475                 gst::element_error!(
476                     app.pipeline,
477                     gst::LibraryError::Failed,
478                     ("Failed to handle decoded stream: {:?}", err)
479                 );
480             }
481         });
482
483         self.pipeline.add(&decodebin).unwrap();
484         decodebin.sync_state_with_parent().unwrap();
485
486         let sinkpad = decodebin.static_pad("sink").unwrap();
487         pad.link(&sinkpad).unwrap();
488
489         Ok(())
490     }
491
492     // Handle a newly decoded decodebin stream and depending on its type, create the relevant
493     // elements or simply ignore it
494     fn on_incoming_decodebin_stream(&self, pad: &gst::Pad) -> Result<(), anyhow::Error> {
495         let caps = pad.current_caps().unwrap();
496         let name = caps.structure(0).unwrap().name();
497
498         let sink = if name.starts_with("video/") {
499             gst::parse_bin_from_description(
500                 "queue ! videoconvert ! videoscale ! autovideosink",
501                 true,
502             )?
503         } else if name.starts_with("audio/") {
504             gst::parse_bin_from_description(
505                 "queue ! audioconvert ! audioresample ! autoaudiosink",
506                 true,
507             )?
508         } else {
509             println!("Unknown pad {:?}, ignoring", pad);
510             return Ok(());
511         };
512
513         self.pipeline.add(&sink).unwrap();
514         sink.sync_state_with_parent()
515             .with_context(|| format!("can't start sink for stream {:?}", caps))?;
516
517         let sinkpad = sink.static_pad("sink").unwrap();
518         pad.link(&sinkpad)
519             .with_context(|| format!("can't link sink for stream {:?}", caps))?;
520
521         Ok(())
522     }
523 }
524
525 // Make sure to shut down the pipeline when it goes out of scope
526 // to release any system resources
527 impl Drop for AppInner {
528     fn drop(&mut self) {
529         let _ = self.pipeline.set_state(gst::State::Null);
530     }
531 }
532
533 async fn run(
534     args: Args,
535     ws: impl Sink<WsMessage, Error = WsError> + Stream<Item = Result<WsMessage, WsError>>,
536 ) -> Result<(), anyhow::Error> {
537     // Split the websocket into the Sink and Stream
538     let (mut ws_sink, ws_stream) = ws.split();
539     // Fuse the Stream, required for the select macro
540     let mut ws_stream = ws_stream.fuse();
541
542     // Create our application state
543     let (app, send_gst_msg_rx, send_ws_msg_rx) = App::new(args)?;
544
545     let mut send_gst_msg_rx = send_gst_msg_rx.fuse();
546     let mut send_ws_msg_rx = send_ws_msg_rx.fuse();
547
548     // And now let's start our message loop
549     loop {
550         let ws_msg = futures::select! {
551             // Handle the WebSocket messages here
552             ws_msg = ws_stream.select_next_some() => {
553                 match ws_msg? {
554                     WsMessage::Close(_) => {
555                         println!("peer disconnected");
556                         break
557                     },
558                     WsMessage::Ping(data) => Some(WsMessage::Pong(data)),
559                     WsMessage::Pong(_) => None,
560                     WsMessage::Binary(_) => None,
561                     WsMessage::Text(text) => {
562                         app.handle_websocket_message(&text)?;
563                         None
564                     },
565                     WsMessage::Frame(_) => unreachable!(),
566                 }
567             },
568             // Pass the GStreamer messages to the application control logic
569             gst_msg = send_gst_msg_rx.select_next_some() => {
570                 app.handle_pipeline_message(&gst_msg)?;
571                 None
572             },
573             // Handle WebSocket messages we created asynchronously
574             // to send them out now
575             ws_msg = send_ws_msg_rx.select_next_some() => Some(ws_msg),
576             // Once we're done, break the loop and return
577             complete => break,
578         };
579
580         // If there's a message to send out, do so now
581         if let Some(ws_msg) = ws_msg {
582             ws_sink.send(ws_msg).await?;
583         }
584     }
585
586     Ok(())
587 }
588
589 // Check if all GStreamer plugins we require are available
590 fn check_plugins() -> Result<(), anyhow::Error> {
591     let needed = [
592         "videotestsrc",
593         "audiotestsrc",
594         "videoconvertscale",
595         "audioconvert",
596         "autodetect",
597         "opus",
598         "vpx",
599         "webrtc",
600         "nice",
601         "dtls",
602         "srtp",
603         "rtpmanager",
604         "rtp",
605         "playback",
606         "audioresample",
607     ];
608
609     let registry = gst::Registry::get();
610     let missing = needed
611         .iter()
612         .filter(|n| registry.find_plugin(n).is_none())
613         .cloned()
614         .collect::<Vec<_>>();
615
616     if !missing.is_empty() {
617         bail!("Missing plugins: {:?}", missing);
618     } else {
619         Ok(())
620     }
621 }
622
623 async fn async_main() -> Result<(), anyhow::Error> {
624     // Initialize GStreamer first
625     gst::init()?;
626
627     check_plugins()?;
628
629     let args = Args::parse();
630
631     // Connect to the given server
632     let (mut ws, _) = async_tungstenite::async_std::connect_async(&args.server).await?;
633
634     println!("connected");
635
636     // Say HELLO to the server and see if it replies with HELLO
637     let our_id = rand::thread_rng().gen_range(10..10_000);
638     println!("Registering id {} with server", our_id);
639     ws.send(WsMessage::Text(format!("HELLO {}", our_id)))
640         .await?;
641
642     let msg = ws
643         .next()
644         .await
645         .ok_or_else(|| anyhow!("didn't receive anything"))??;
646
647     if msg != WsMessage::Text("HELLO".into()) {
648         bail!("server didn't say HELLO");
649     }
650
651     if let Some(peer_id) = args.peer_id {
652         // Join the given session
653         ws.send(WsMessage::Text(format!("SESSION {}", peer_id)))
654             .await?;
655
656         let msg = ws
657             .next()
658             .await
659             .ok_or_else(|| anyhow!("didn't receive anything"))??;
660
661         if msg != WsMessage::Text("SESSION_OK".into()) {
662             bail!("server error: {:?}", msg);
663         }
664     }
665
666     // All good, let's run our message loop
667     run(args, ws).await
668 }
669
670 fn main() -> Result<(), anyhow::Error> {
671     macos_workaround::run(|| task::block_on(async_main()))
672 }