> {
// Create the GStreamer pipeline
let pipeline = gst::parse_launch(
- "videotestsrc pattern=ball is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! webrtcbin. \
- audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 ! application/x-rtp,encoding-name=OPUS ! webrtcbin. \
+ "videotestsrc pattern=ball is-live=true ! vp8enc deadline=1 ! rtpvp8pay name=vpay pt=96 ! webrtcbin. \
+ audiotestsrc is-live=true ! opusenc perfect-timestamp=true ! rtpopuspay name=apay pt=97 ! application/x-rtp,encoding-name=OPUS ! webrtcbin. \
webrtcbin name=webrtcbin"
)?;
}
});
- // Asynchronously set the pipeline to Playing
- app.pipeline.call_async(|pipeline| {
- // If this fails, post an error on the bus so we exit
- if pipeline.set_state(gst::State::Playing).is_err() {
- gst::element_error!(
- pipeline,
- gst::LibraryError::Failed,
- ("Failed to set pipeline to Playing")
- );
- }
- });
-
- // Asynchronously set the pipeline to Playing
- app.pipeline.call_async(|pipeline| {
- pipeline
- .set_state(gst::State::Playing)
- .expect("Couldn't set pipeline to Playing");
- });
+ // Asynchronously set the pipeline to Playing if we're creating the offer,
+ // otherwise do that after the offer was received.
+ if app.args.peer_id.is_some() {
+ app.pipeline.call_async(|pipeline| {
+ // If this fails, post an error on the bus so we exit
+ if pipeline.set_state(gst::State::Playing).is_err() {
+ gst::element_error!(
+ pipeline,
+ gst::LibraryError::Failed,
+ ("Failed to set pipeline to Playing")
+ );
+ }
+ });
+ }
Ok((app, send_gst_msg_rx, send_ws_msg_rx))
}
Ok(())
}
+ fn configure_pipeline_on_offer(
+ &self,
+ offer: &gst_sdp::SDPMessage,
+ ) -> Result<(), anyhow::Error> {
+ // Extract audio/video payload types from the SDP and configure accordingly on the
+ // pipeline as these have to match with the offer
+ let mut opus_id = None;
+ let mut vp8_id = None;
+ for media in offer.medias() {
+ for fmt in media.formats() {
+ if fmt == "webrtc-datachannel" {
+ continue;
+ }
+
+ let pt = match fmt.parse::<u8>() {
+ Ok(pt) => pt,
+ Err(_) => continue,
+ };
+
+ let caps = match media.caps_from_media(pt as i32) {
+ Some(caps) if caps.size() > 0 => caps,
+ _ => continue,
+ };
+
+ let s = caps.structure(0).unwrap();
+ let encoding_name = match s.get::<&str>("encoding-name") {
+ Ok(encoding_name) => encoding_name,
+ Err(_) => continue,
+ };
+
+ if encoding_name == "VP8" && vp8_id.is_none() {
+ vp8_id = Some(pt);
+ } else if encoding_name == "OPUS" && opus_id.is_none() {
+ opus_id = Some(pt);
+ }
+ }
+ }
+
+ if let (Some(opus_id), Some(vp8_id)) = (opus_id, vp8_id) {
+ let apay = self.pipeline.by_name("apay").unwrap();
+ let vpay = self.pipeline.by_name("vpay").unwrap();
+
+ for (pay, pt) in [(apay, opus_id), (vpay, vp8_id)] {
+ pay.set_property("pt", pt as u32);
+ }
+ } else {
+ gst::element_error!(
+ self.pipeline,
+ gst::LibraryError::Failed,
+ ("Not all streams found in the offer")
+ );
+ bail!("Not all streams found in the offer");
+ }
+
+ Ok(())
+ }
+
// Handle incoming SDP answers from the peer
fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> {
if type_ == "answer" {
self.pipeline.call_async(move |_pipeline| {
let app = upgrade_weak!(app_clone);
+ if app.configure_pipeline_on_offer(&ret).is_err() {
+ return;
+ }
+
+ // If this fails, post an error on the bus so we exit
+ if app.pipeline.set_state(gst::State::Playing).is_err() {
+ gst::element_error!(
+ app.pipeline,
+ gst::LibraryError::Failed,
+ ("Failed to set pipeline to Playing")
+ );
+ return;
+ }
+
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
ret,