From: Philippe Normand Date: Wed, 16 Nov 2016 07:56:34 +0000 (+0100) Subject: rtpbin: receive bundle support X-Git-Tag: 1.12.2~344 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=dcd3ce9751cdef0b5ab1fa118355f92bdfe82cb3;p=platform%2Fupstream%2Fgst-plugins-good.git rtpbin: receive bundle support A new signal named on-bundled-ssrc is provided and can be used by the application to redirect a stream to a different GstRtpSession or to keep the RTX stream grouped within the GstRtpSession of the same media type. https://bugzilla.gnome.org/show_bug.cgi?id=772740 --- diff --git a/docs/plugins/gst-plugins-good-plugins.signals b/docs/plugins/gst-plugins-good-plugins.signals index 3db17e9..44bbdda 100644 --- a/docs/plugins/gst-plugins-good-plugins.signals +++ b/docs/plugins/gst-plugins-good-plugins.signals @@ -375,6 +375,14 @@ guint arg1 +GstRtpBin::on-bundled-ssrc +guint +l +GstRtpBin *gstrtpbin +guint arg1 + + + GstRtpJitterBuffer::clear-pt-map void la diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 648adb9..f58de01 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -53,6 +53,13 @@ * SSRC in the RTP packets to its own SSRC and wil forward the packets on the * send_rtp_src_\%u pad after updating its internal state. * + * #GstRtpBin can also demultiplex incoming bundled streams. The first + * #GstRtpSession will have a #GstRtpSsrcDemux element splitting the streams + * based on their SSRC and potentially dispatched to a different #GstRtpSession. + * Because retransmission SSRCs need to be merged with the corresponding media + * stream the #GstRtpBin::on-bundled-ssrc signal is emitted so that the + * application can find out to which session the SSRC belongs. + * * The session manager needs the clock-rate of the payload types it is handling * and will signal the #GstRtpSession::request-pt-map signal when it needs such a * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map @@ -276,6 +283,8 @@ enum SIGNAL_ON_NEW_SENDER_SSRC, SIGNAL_ON_SENDER_SSRC_ACTIVE, + SIGNAL_ON_BUNDLED_SSRC, + LAST_SIGNAL }; @@ -362,6 +371,14 @@ static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void free_client (GstRtpBinClient * client, GstRtpBin * bin); static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin); +static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id); +static GstPad *complete_session_sink (GstRtpBin * rtpbin, + GstRtpBinSession * session, gboolean bundle_demuxer_needed); +static void +complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid); +static GstPad *complete_session_rtcp (GstRtpBin * rtpbin, + GstRtpBinSession * session, guint sessid, gboolean bundle_demuxer_needed); /* Manages the RTP stream for one SSRC. * @@ -428,6 +445,12 @@ struct _GstRtpBinSession gulong demux_newpad_sig; gulong demux_padremoved_sig; + /* Bundling support */ + GstElement *rtp_funnel; + GstElement *rtcp_funnel; + GstElement *bundle_demux; + gulong bundle_demux_newpad_sig; + GMutex lock; /* list of GstRtpBinStream */ @@ -629,6 +652,96 @@ ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad, GST_RTP_BIN_UNLOCK (rtpbin); } +static void +new_bundled_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, + GstRtpBinSession * session) +{ + GValue result = G_VALUE_INIT; + GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT }; + guint session_id = 0; + GstRtpBinSession *target_session = NULL; + GstRtpBin *rtpbin = session->bin; + gchar *name; + GstPad *src_pad; + GstPad *recv_rtp_sink = NULL; + GstPad *recv_rtcp_sink = NULL; + GstPadLinkReturn ret; + + GST_RTP_BIN_DYN_LOCK (rtpbin); + GST_DEBUG_OBJECT (rtpbin, "new bundled SSRC pad %08x, %s:%s", ssrc, + GST_DEBUG_PAD_NAME (pad)); + + g_value_init (&result, G_TYPE_UINT); + g_value_init (¶ms[0], GST_TYPE_ELEMENT); + g_value_set_object (¶ms[0], rtpbin); + g_value_init (¶ms[1], G_TYPE_UINT); + g_value_set_uint (¶ms[1], ssrc); + + g_signal_emitv (params, + gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, &result); + g_value_unset (¶ms[0]); + + session_id = g_value_get_uint (&result); + if (session_id == 0) { + target_session = session; + } else { + target_session = find_session_by_id (rtpbin, (gint) session_id); + if (!target_session) { + target_session = create_session (rtpbin, session_id); + } + if (!target_session->recv_rtp_sink) { + recv_rtp_sink = complete_session_sink (rtpbin, target_session, FALSE); + } + + if (!target_session->recv_rtp_src) + complete_session_receiver (rtpbin, target_session, session_id); + + if (!target_session->recv_rtcp_sink) { + recv_rtcp_sink = + complete_session_rtcp (rtpbin, target_session, session_id, FALSE); + } + } + + GST_DEBUG_OBJECT (rtpbin, "Assigning bundled ssrc %u to session %u", ssrc, + session_id); + + if (!recv_rtp_sink) { + recv_rtp_sink = + gst_element_get_request_pad (target_session->rtp_funnel, "sink_%u"); + } + + if (!recv_rtcp_sink) { + recv_rtcp_sink = + gst_element_get_request_pad (target_session->rtcp_funnel, "sink_%u"); + } + + name = g_strdup_printf ("src_%u", ssrc); + src_pad = gst_element_get_static_pad (element, name); + ret = gst_pad_link (src_pad, recv_rtp_sink); + g_free (name); + gst_object_unref (src_pad); + gst_object_unref (recv_rtp_sink); + if (ret != GST_PAD_LINK_OK) { + g_warning + ("rtpbin: failed to link bundle demuxer to receive rtp funnel for session %u", + session_id); + } + + name = g_strdup_printf ("rtcp_src_%u", ssrc); + src_pad = gst_element_get_static_pad (element, name); + gst_pad_link (src_pad, recv_rtcp_sink); + g_free (name); + gst_object_unref (src_pad); + gst_object_unref (recv_rtcp_sink); + if (ret != GST_PAD_LINK_OK) { + g_warning + ("rtpbin: failed to link bundle demuxer to receive rtcp sink pad for session %u", + session_id); + } + + GST_RTP_BIN_DYN_UNLOCK (rtpbin); +} + /* create a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * create_session (GstRtpBin * rtpbin, gint id) @@ -649,6 +762,10 @@ create_session (GstRtpBin * rtpbin, gint id) sess->bin = rtpbin; sess->session = session; sess->demux = demux; + + sess->rtp_funnel = gst_element_factory_make ("funnel", NULL); + sess->rtcp_funnel = gst_element_factory_make ("funnel", NULL); + sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); @@ -696,6 +813,8 @@ create_session (GstRtpBin * rtpbin, gint id) gst_bin_add (GST_BIN_CAST (rtpbin), session); gst_bin_add (GST_BIN_CAST (rtpbin), demux); + gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtp_funnel); + gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtcp_funnel); GST_OBJECT_LOCK (rtpbin); target = GST_STATE_TARGET (rtpbin); @@ -704,6 +823,8 @@ create_session (GstRtpBin * rtpbin, gint id) /* change state only to what's needed */ gst_element_set_state (demux, target); gst_element_set_state (session, target); + gst_element_set_state (sess->rtp_funnel, target); + gst_element_set_state (sess->rtcp_funnel, target); return sess; @@ -807,7 +928,7 @@ get_pt_map (GstRtpBinSession * session, guint pt) GValue ret = { 0 }; GValue args[3] = { {0}, {0}, {0} }; - GST_DEBUG ("searching pt %d in cache", pt); + GST_DEBUG ("searching pt %u in cache", pt); GST_RTP_SESSION_LOCK (session); @@ -820,7 +941,7 @@ get_pt_map (GstRtpBinSession * session, guint pt) bin = session->bin; - GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id); + GST_DEBUG ("emiting signal for pt %u in session %u", pt, session->id); /* not in cache, send signal to request caps */ g_value_init (&args[0], GST_TYPE_ELEMENT); @@ -856,7 +977,7 @@ get_pt_map (GstRtpBinSession * session, guint pt) if (!caps) goto no_caps; - GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps); + GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps); /* store in cache, take additional ref */ g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt), @@ -947,7 +1068,7 @@ gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id) GstElement *ret = NULL; GST_RTP_BIN_LOCK (bin); - GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %d", session_id); + GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id); session = find_session_by_id (bin, (gint) session_id); if (session) { ret = gst_object_ref (session->session); @@ -964,7 +1085,7 @@ gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id) GstRtpBinSession *session; GST_RTP_BIN_LOCK (bin); - GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d", + GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u", session_id); session = find_session_by_id (bin, (gint) session_id); if (session) { @@ -2194,6 +2315,29 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); + + /** + * GstRtpBin::on-bundled-ssrc: + * @rtpbin: the object which received the signal + * @ssrc: the bundled SSRC + * + * Notify of a new incoming bundled SSRC. If no handler is connected to the + * signal then the #GstRtpSession created for the recv_rtp_sink_\%u + * request pad will be managing this new SSRC. However if there is a handler + * connected then the application can decided to dispatch this new stream to + * another session by providing its ID as return value of the handler. This + * can be particularly useful to keep retransmission SSRCs grouped with the + * session for which they handle retransmission. + * + * Since: 1.12 + */ + gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC] = + g_signal_new ("on-bundled-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + on_bundled_ssrc), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_UINT, 1, G_TYPE_UINT); + + g_object_class_install_property (gobject_class, PROP_SDES, g_param_spec_boxed ("sdes", "SDES", "The SDES items of this session", @@ -3021,7 +3165,7 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad, rtpbin = stream->bin; - GST_DEBUG ("new payload pad %d", pt); + GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt); GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown); @@ -3078,7 +3222,7 @@ pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session) rtpbin = session->bin; - GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt, + GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt, session->id); caps = get_pt_map (session, pt); @@ -3099,7 +3243,7 @@ static void payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session) { GST_DEBUG_OBJECT (session->bin, - "emiting signal for pt type changed to %d in session %d", pt, + "emiting signal for pt type changed to %u in session %u", pt, session->id); g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE], @@ -3246,15 +3390,42 @@ no_stream: } } -static gboolean -complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) +static void +session_maybe_create_bundle_demuxer (GstRtpBinSession * session) +{ + GstRtpBin *rtpbin; + + if (session->bundle_demux) + return; + + rtpbin = session->bin; + if (g_signal_has_handler_pending (rtpbin, + gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, TRUE)) { + GST_DEBUG_OBJECT (rtpbin, "Adding a bundle SSRC demuxer to session %u", + session->id); + session->bundle_demux = gst_element_factory_make ("rtpssrcdemux", NULL); + session->bundle_demux_newpad_sig = g_signal_connect (session->bundle_demux, + "new-ssrc-pad", (GCallback) new_bundled_ssrc_pad_found, session); + + gst_bin_add (GST_BIN_CAST (rtpbin), session->bundle_demux); + gst_element_sync_state_with_parent (session->bundle_demux); + } else { + GST_DEBUG_OBJECT (rtpbin, + "No handler for the on-bundled-ssrc signal so no need for a bundle SSRC demuxer in session %u", + session->id); + } +} + +static GstPad * +complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session, + gboolean bundle_demuxer_needed) { - gchar *gname; guint sessid = session->id; GstPad *recv_rtp_sink; + GstPad *funnel_src; GstElement *decoder; - GstElementClass *klass; - GstPadTemplate *templ; + + g_assert (!session->recv_rtp_sink); /* get recv_rtp pad and store */ session->recv_rtp_sink = @@ -3265,6 +3436,9 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) g_signal_connect (session->recv_rtp_sink, "notify::caps", (GCallback) caps_changed, session); + if (bundle_demuxer_needed) + session_maybe_create_bundle_demuxer (session); + GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder"); decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER); if (decoder) { @@ -3282,7 +3456,14 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) if (decsrc == NULL) goto dec_src_failed; - ret = gst_pad_link (decsrc, session->recv_rtp_sink); + if (session->bundle_demux) { + GstPad *demux_sink; + demux_sink = gst_element_get_static_pad (session->bundle_demux, "sink"); + ret = gst_pad_link (decsrc, demux_sink); + gst_object_unref (demux_sink); + } else { + ret = gst_pad_link (decsrc, session->recv_rtp_sink); + } gst_object_unref (decsrc); if (ret != GST_PAD_LINK_OK) @@ -3290,81 +3471,54 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) } else { GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given"); - recv_rtp_sink = gst_object_ref (session->recv_rtp_sink); + if (session->bundle_demux) { + recv_rtp_sink = + gst_element_get_static_pad (session->bundle_demux, "sink"); + } else { + recv_rtp_sink = + gst_element_get_request_pad (session->rtp_funnel, "sink_%u"); + } } - GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); - klass = GST_ELEMENT_GET_CLASS (rtpbin); - gname = g_strdup_printf ("recv_rtp_sink_%u", sessid); - templ = gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u"); - session->recv_rtp_sink_ghost = - gst_ghost_pad_new_from_template (gname, recv_rtp_sink, templ); - gst_object_unref (recv_rtp_sink); - gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost); - g_free (gname); + funnel_src = gst_element_get_static_pad (session->rtp_funnel, "src"); + gst_pad_link (funnel_src, session->recv_rtp_sink); + gst_object_unref (funnel_src); - return TRUE; + return recv_rtp_sink; /* ERRORS */ pad_failed: { g_warning ("rtpbin: failed to get session recv_rtp_sink pad"); - return FALSE; + return NULL; } dec_sink_failed: { - g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid); - return FALSE; + g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid); + return NULL; } dec_src_failed: { - g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid); + g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid); gst_object_unref (recv_rtp_sink); - return FALSE; + return NULL; } dec_link_failed: { - g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid); + g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid); gst_object_unref (recv_rtp_sink); - return FALSE; + return NULL; } } -/* Create a pad for receiving RTP for the session in @name. Must be called with - * RTP_BIN_LOCK. - */ -static GstPad * -create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +static void +complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid) { - guint sessid; GstElement *aux; GstPad *recv_rtp_src; - GstRtpBinSession *session; - - /* first get the session number */ - if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1) - goto no_name; - - GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); - /* get or create session */ - session = find_session_by_id (rtpbin, sessid); - if (!session) { - GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); - /* create session now */ - session = create_session (rtpbin, sessid); - if (session == NULL) - goto create_error; - } - - /* check if pad was requested */ - if (session->recv_rtp_sink_ghost != NULL) - return session->recv_rtp_sink_ghost; - - /* setup the session sink pad */ - if (!complete_session_sink (rtpbin, session)) - goto session_sink_failed; + g_assert (!session->recv_rtp_src); session->recv_rtp_src = gst_element_get_static_pad (session->session, "recv_rtp_src"); @@ -3381,7 +3535,7 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver"); - pname = g_strdup_printf ("sink_%d", sessid); + pname = g_strdup_printf ("sink_%u", sessid); auxsink = gst_element_get_static_pad (aux, pname); g_free (pname); if (auxsink == NULL) @@ -3394,7 +3548,7 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) /* this can be NULL when this AUX element is not to be linked to * an SSRC demuxer */ - pname = g_strdup_printf ("src_%d", sessid); + pname = g_strdup_printf ("src_%u", sessid); recv_rtp_src = gst_element_get_static_pad (aux, pname); g_free (pname); } else { @@ -3408,8 +3562,8 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) sinkdpad = gst_element_get_static_pad (session->demux, "sink"); GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad"); gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING); - gst_object_unref (recv_rtp_src); gst_object_unref (sinkdpad); + gst_object_unref (recv_rtp_src); /* connect to the new-ssrc-pad signal of the SSRC demuxer */ session->demux_newpad_sig = g_signal_connect (session->demux, @@ -3417,6 +3571,71 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) session->demux_padremoved_sig = g_signal_connect (session->demux, "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session); } + + return; + +pad_failed: + { + g_warning ("rtpbin: failed to get session recv_rtp_src pad"); + return; + } +aux_sink_failed: + { + g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); + return; + } +aux_link_failed: + { + g_warning ("rtpbin: failed to link AUX pad to session %u", sessid); + return; + } +} + +/* Create a pad for receiving RTP for the session in @name. Must be called with + * RTP_BIN_LOCK. + */ +static GstPad * +create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +{ + guint sessid; + GstRtpBinSession *session; + GstPad *recv_rtp_sink; + + /* first get the session number */ + if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1) + goto no_name; + + GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid); + + /* get or create session */ + session = find_session_by_id (rtpbin, sessid); + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } + + /* check if pad was requested */ + if (session->recv_rtp_sink_ghost != NULL) + return session->recv_rtp_sink_ghost; + + /* setup the session sink pad */ + recv_rtp_sink = complete_session_sink (rtpbin, session, TRUE); + if (!recv_rtp_sink) + goto session_sink_failed; + + + GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); + session->recv_rtp_sink_ghost = + gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ); + gst_object_unref (recv_rtp_sink); + gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost); + + complete_session_receiver (rtpbin, session, sessid); + return session->recv_rtp_sink_ghost; /* ERRORS */ @@ -3435,21 +3654,6 @@ session_sink_failed: /* warning already done */ return NULL; } -pad_failed: - { - g_warning ("rtpbin: failed to get session recv_rtp_src pad"); - return NULL; - } -aux_sink_failed: - { - g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid); - return NULL; - } -aux_link_failed: - { - g_warning ("rtpbin: failed to link AUX pad to session %d", sessid); - return NULL; - } } static void @@ -3463,6 +3667,11 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig); session->demux_padremoved_sig = 0; } + if (session->bundle_demux_newpad_sig) { + g_signal_handler_disconnect (session->bundle_demux, + session->bundle_demux_newpad_sig); + session->bundle_demux_newpad_sig = 0; + } if (session->recv_rtp_src) { gst_object_unref (session->recv_rtp_src); session->recv_rtp_src = NULL; @@ -3480,37 +3689,14 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) } } -/* Create a pad for receiving RTCP for the session in @name. Must be called with - * RTP_BIN_LOCK. - */ static GstPad * -create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, - const gchar * name) +complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid, gboolean bundle_demuxer_needed) { - guint sessid; GstElement *decoder; - GstRtpBinSession *session; - GstPad *sinkdpad, *decsink; - - /* first get the session number */ - if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1) - goto no_name; - - GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); - - /* get or create the session */ - session = find_session_by_id (rtpbin, sessid); - if (!session) { - GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); - /* create session now */ - session = create_session (rtpbin, sessid); - if (session == NULL) - goto create_error; - } - - /* check if pad was requested */ - if (session->recv_rtcp_sink_ghost != NULL) - return session->recv_rtcp_sink_ghost; + GstPad *sinkdpad; + GstPad *decsink = NULL; + GstPad *funnel_src; /* get recv_rtp pad and store */ GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); @@ -3519,6 +3705,9 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink == NULL) goto pad_failed; + if (bundle_demuxer_needed) + session_maybe_create_bundle_demuxer (session); + GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder"); decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER); if (decoder) { @@ -3535,14 +3724,26 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, if (decsrc == NULL) goto dec_src_failed; - ret = gst_pad_link (decsrc, session->recv_rtcp_sink); + if (session->bundle_demux) { + GstPad *demux_sink; + demux_sink = + gst_element_get_static_pad (session->bundle_demux, "rtcp_sink"); + ret = gst_pad_link (decsrc, demux_sink); + gst_object_unref (demux_sink); + } else { + ret = gst_pad_link (decsrc, session->recv_rtcp_sink); + } gst_object_unref (decsrc); if (ret != GST_PAD_LINK_OK) goto dec_link_failed; } else { GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given"); - decsink = gst_object_ref (session->recv_rtcp_sink); + if (session->bundle_demux) { + decsink = gst_element_get_static_pad (session->bundle_demux, "rtcp_sink"); + } else { + decsink = gst_element_get_request_pad (session->rtcp_funnel, "sink_%u"); + } } /* get srcpad, link to SSRCDemux */ @@ -3556,26 +3757,12 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING); gst_object_unref (sinkdpad); - session->recv_rtcp_sink_ghost = - gst_ghost_pad_new_from_template (name, decsink, templ); - gst_object_unref (decsink); - gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), - session->recv_rtcp_sink_ghost); + funnel_src = gst_element_get_static_pad (session->rtcp_funnel, "src"); + gst_pad_link (funnel_src, session->recv_rtcp_sink); + gst_object_unref (funnel_src); - return session->recv_rtcp_sink_ghost; + return decsink; - /* ERRORS */ -no_name: - { - g_warning ("rtpbin: invalid name given"); - return NULL; - } -create_error: - { - /* create_session already warned */ - return NULL; - } pad_failed: { g_warning ("rtpbin: failed to get session rtcp_sink pad"); @@ -3583,25 +3770,82 @@ pad_failed: } dec_sink_failed: { - g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid); return NULL; } dec_src_failed: { - g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid); - gst_object_unref (decsink); - return NULL; + g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid); + goto cleanup; } dec_link_failed: { - g_warning ("rtpbin: failed to link rtcp decoder for session %d", sessid); - gst_object_unref (decsink); - return NULL; + g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid); + goto cleanup; } src_pad_failed: { g_warning ("rtpbin: failed to get session sync_src pad"); - gst_object_unref (decsink); + } + +cleanup: + gst_object_unref (decsink); + return NULL; +} + +/* Create a pad for receiving RTCP for the session in @name. Must be called with + * RTP_BIN_LOCK. + */ +static GstPad * +create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, + const gchar * name) +{ + guint sessid; + GstRtpBinSession *session; + GstPad *decsink = NULL; + + /* first get the session number */ + if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1) + goto no_name; + + GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid); + + /* get or create the session */ + session = find_session_by_id (rtpbin, sessid); + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } + + /* check if pad was requested */ + if (session->recv_rtcp_sink_ghost != NULL) + return session->recv_rtcp_sink_ghost; + + decsink = complete_session_rtcp (rtpbin, session, sessid, TRUE); + if (!decsink) + goto create_error; + + session->recv_rtcp_sink_ghost = + gst_ghost_pad_new_from_template (name, decsink, templ); + gst_object_unref (decsink); + gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), + session->recv_rtcp_sink_ghost); + + return session->recv_rtcp_sink_ghost; + + /* ERRORS */ +no_name: + { + g_warning ("rtpbin: invalid name given"); + return NULL; + } +create_error: + { + /* create_session already warned */ return NULL; } } @@ -3651,7 +3895,7 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) GstPadLinkReturn ret; GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder"); - ename = g_strdup_printf ("rtp_src_%d", sessid); + ename = g_strdup_printf ("rtp_src_%u", sessid); encsrc = gst_element_get_static_pad (encoder, ename); g_free (ename); @@ -3660,7 +3904,7 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) send_rtp_src = encsrc; - ename = g_strdup_printf ("rtp_sink_%d", sessid); + ename = g_strdup_printf ("rtp_sink_%u", sessid); encsink = gst_element_get_static_pad (encoder, ename); g_free (ename); if (encsink == NULL) @@ -3694,23 +3938,23 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) /* ERRORS */ no_srcpad: { - g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid); + g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid); return FALSE; } enc_src_failed: { - g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid); + g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid); return FALSE; } enc_sink_failed: { - g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid); gst_object_unref (send_rtp_src); return FALSE; } enc_link_failed: { - g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid); + g_warning ("rtpbin: failed to link rtp encoder for session %u", sessid); gst_object_unref (send_rtp_src); return FALSE; } @@ -3772,22 +4016,22 @@ create_error: } existing_session: { - g_warning ("rtpbin: session %d is already a sender", sessid); + g_warning ("rtpbin: session %u is already a sender", sessid); return FALSE; } pad_failed: { - g_warning ("rtpbin: failed to get session pad for session %d", sessid); + g_warning ("rtpbin: failed to get session pad for session %u", sessid); return FALSE; } aux_link_failed: { - g_warning ("rtpbin: failed to link AUX for session %d", sessid); + g_warning ("rtpbin: failed to link AUX for session %u", sessid); return FALSE; } session_src_failed: { - g_warning ("rtpbin: failed to complete AUX for session %d", sessid); + g_warning ("rtpbin: failed to complete AUX for session %u", sessid); return FALSE; } } @@ -3847,7 +4091,7 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (!setup_aux_sender (rtpbin, session, aux)) goto aux_session_failed; - pname = g_strdup_printf ("sink_%d", sessid); + pname = g_strdup_printf ("sink_%u", sessid); send_rtp_sink = gst_element_get_static_pad (aux, pname); g_free (pname); @@ -3887,27 +4131,27 @@ create_error: } existing_session: { - g_warning ("rtpbin: session %d is already in use", sessid); + g_warning ("rtpbin: session %u is already in use", sessid); return NULL; } aux_session_failed: { - g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); return NULL; } aux_sink_failed: { - g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); return NULL; } pad_failed: { - g_warning ("rtpbin: failed to get session pad for session %d", sessid); + g_warning ("rtpbin: failed to get session pad for session %u", sessid); return NULL; } session_src_failed: { - g_warning ("rtpbin: failed to setup source pads for session %d", sessid); + g_warning ("rtpbin: failed to setup source pads for session %u", sessid); return NULL; } } @@ -3978,13 +4222,13 @@ create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder"); - ename = g_strdup_printf ("rtcp_src_%d", sessid); + ename = g_strdup_printf ("rtcp_src_%u", sessid); encsrc = gst_element_get_static_pad (encoder, ename); g_free (ename); if (encsrc == NULL) goto enc_src_failed; - ename = g_strdup_printf ("rtcp_sink_%d", sessid); + ename = g_strdup_printf ("rtcp_sink_%u", sessid); encsink = gst_element_get_static_pad (encoder, ename); g_free (ename); if (encsink == NULL) @@ -4021,23 +4265,23 @@ no_session: } pad_failed: { - g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid); + g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid); return NULL; } enc_src_failed: { - g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid); + g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid); return NULL; } enc_sink_failed: { - g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid); gst_object_unref (encsrc); return NULL; } enc_link_failed: { - g_warning ("rtpbin: failed to link rtcp encoder for session %d", sessid); + g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid); gst_object_unref (encsrc); return NULL; } diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index fb13a47..384b76d 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -127,6 +127,8 @@ struct _GstRtpBinClass { void (*on_new_sender_ssrc) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_sender_ssrc_active) (GstRtpBin *rtpbin, guint session, guint32 ssrc); + + guint (*on_bundled_ssrc) (GstRtpBin *rtpbin, guint ssrc); }; GType gst_rtp_bin_get_type (void); diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index aa40c70..9c855ae 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -233,6 +233,7 @@ check_rtpmanager = \ elements/rtpaux \ elements/rtpbin \ elements/rtpbin_buffer_list \ + elements/rtpbundle \ elements/rtpcollision \ elements/rtpjitterbuffer \ elements/rtpmux \ @@ -576,6 +577,9 @@ elements_rtpcollision_LDADD = $(GST_PLUGINS_BASE_LIBS) $(GST_NET_LIBS) -lgstrtp- elements_rtpaux_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_rtpaux_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD) +elements_rtpbundle_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) +elements_rtpbundle_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD) + # FIXME: configure should check for gdk-pixbuf not gtk # only need video.h header, not the lib elements_gdkpixbufsink_CFLAGS = \ diff --git a/tests/check/elements/.gitignore b/tests/check/elements/.gitignore index ac1624b..de196b7 100644 --- a/tests/check/elements/.gitignore +++ b/tests/check/elements/.gitignore @@ -54,6 +54,7 @@ rtp-payloading rtpaux rtpbin rtpbin_buffer_list +rtpbundle rtpcollision rtph261 rtph263 diff --git a/tests/check/elements/rtpbundle.c b/tests/check/elements/rtpbundle.c new file mode 100644 index 0000000..9b477e1 --- /dev/null +++ b/tests/check/elements/rtpbundle.c @@ -0,0 +1,390 @@ +/* GStreamer + * + * Copyright (C) 2016 Igalia S.L. + * @author Philippe Normand + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include + +static GMainLoop *main_loop; + +static void +message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) +{ + GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, + GST_MESSAGE_SRC (message), message); + + switch (message->type) { + case GST_MESSAGE_EOS: + g_main_loop_quit (main_loop); + break; + case GST_MESSAGE_WARNING:{ + GError *gerror; + gchar *debug; + + gst_message_parse_warning (message, &gerror, &debug); + gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); + g_error_free (gerror); + g_free (debug); + break; + } + case GST_MESSAGE_ERROR:{ + GError *gerror; + gchar *debug; + + gst_message_parse_error (message, &gerror, &debug); + gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); + g_error_free (gerror); + g_free (debug); + fail ("Error!"); + break; + } + default: + break; + } +} + +static void +on_rtpbinreceive_pad_added (GstElement * element, GstPad * new_pad, + gpointer data) +{ + GstElement *pipeline = GST_ELEMENT (data); + gchar *pad_name = gst_pad_get_name (new_pad); + + if (g_str_has_prefix (pad_name, "recv_rtp_src_")) { + GstCaps *caps = gst_pad_get_current_caps (new_pad); + GstStructure *s = gst_caps_get_structure (caps, 0); + const gchar *media_type = gst_structure_get_string (s, "media"); + gchar *depayloader_name = g_strdup_printf ("%s_rtpdepayloader", media_type); + GstElement *rtpdepayloader = + gst_bin_get_by_name (GST_BIN (pipeline), depayloader_name); + GstPad *sinkpad; + + g_free (depayloader_name); + fail_unless (rtpdepayloader != NULL, NULL); + + sinkpad = gst_element_get_static_pad (rtpdepayloader, "sink"); + gst_pad_link (new_pad, sinkpad); + gst_object_unref (sinkpad); + gst_object_unref (rtpdepayloader); + + gst_caps_unref (caps); + } + g_free (pad_name); +} + +static guint +on_bundled_ssrc (GstElement * rtpbin, guint ssrc, gpointer user_data) +{ + static gboolean create_session = FALSE; + guint session_id = 0; + + if (create_session) { + session_id = 1; + } else { + create_session = TRUE; + /* use existing session 0, a new session will be created for the next discovered bundled SSRC */ + } + return session_id; +} + +static GstCaps * +on_request_pt_map (GstElement * rtpbin, guint session_id, guint pt, + gpointer user_data) +{ + GstCaps *caps = NULL; + if (pt == 96) { + caps = + gst_caps_from_string + ("application/x-rtp,media=(string)audio,encoding-name=(string)PCMA,clock-rate=(int)8000"); + } else if (pt == 100) { + caps = + gst_caps_from_string + ("application/x-rtp,media=(string)video,encoding-name=(string)RAW,clock-rate=(int)90000,sampling=(string)\"YCbCr-4:2:0\",depth=(string)8,width=(string)320,height=(string)240"); + } + return caps; +} + + +static GstElement * +create_pipeline (gboolean send) +{ + GstElement *pipeline, *rtpbin, *audiosrc, *audio_encoder, + *audio_rtppayloader, *sendrtp_udpsink, *recv_rtp_udpsrc, + *send_rtcp_udpsink, *recv_rtcp_udpsrc, *sendrtcp_funnel, *sendrtp_funnel; + GstElement *audio_rtpdepayloader, *audio_decoder, *audio_sink; + GstElement *videosrc, *video_rtppayloader, *video_rtpdepayloader, *video_sink; + gboolean res; + GstPad *funnel_pad, *rtp_src_pad; + GstCaps *rtpcaps; + gint rtp_udp_port = 5001; + gint rtcp_udp_port = 5002; + + pipeline = gst_pipeline_new (send ? "pipeline_send" : "pipeline_receive"); + + rtpbin = + gst_element_factory_make ("rtpbin", + send ? "rtpbin_send" : "rtpbin_receive"); + g_object_set (rtpbin, "latency", 200, NULL); + + if (!send) { + g_signal_connect (rtpbin, "on-bundled-ssrc", + G_CALLBACK (on_bundled_ssrc), NULL); + g_signal_connect (rtpbin, "request-pt-map", + G_CALLBACK (on_request_pt_map), NULL); + } + + g_signal_connect (rtpbin, "pad-added", + G_CALLBACK (on_rtpbinreceive_pad_added), pipeline); + + gst_bin_add (GST_BIN (pipeline), rtpbin); + + if (send) { + audiosrc = gst_element_factory_make ("audiotestsrc", NULL); + audio_encoder = gst_element_factory_make ("alawenc", NULL); + audio_rtppayloader = gst_element_factory_make ("rtppcmapay", NULL); + g_object_set (audio_rtppayloader, "pt", 96, NULL); + g_object_set (audio_rtppayloader, "seqnum-offset", 1, NULL); + + videosrc = gst_element_factory_make ("videotestsrc", NULL); + video_rtppayloader = gst_element_factory_make ("rtpvrawpay", NULL); + g_object_set (video_rtppayloader, "pt", 100, "seqnum-offset", 1, NULL); + + g_object_set (audiosrc, "num-buffers", 5, NULL); + g_object_set (videosrc, "num-buffers", 5, NULL); + + /* muxed rtcp */ + sendrtcp_funnel = gst_element_factory_make ("funnel", "send_rtcp_funnel"); + send_rtcp_udpsink = gst_element_factory_make ("udpsink", NULL); + g_object_set (send_rtcp_udpsink, "host", "127.0.0.1", NULL); + g_object_set (send_rtcp_udpsink, "port", rtcp_udp_port, NULL); + g_object_set (send_rtcp_udpsink, "sync", FALSE, NULL); + g_object_set (send_rtcp_udpsink, "async", FALSE, NULL); + + /* outgoing bundled stream */ + sendrtp_funnel = gst_element_factory_make ("funnel", "send_rtp_funnel"); + sendrtp_udpsink = gst_element_factory_make ("udpsink", NULL); + g_object_set (sendrtp_udpsink, "host", "127.0.0.1", NULL); + g_object_set (sendrtp_udpsink, "port", rtp_udp_port, NULL); + + gst_bin_add_many (GST_BIN (pipeline), audiosrc, audio_encoder, + audio_rtppayloader, sendrtp_udpsink, send_rtcp_udpsink, + sendrtp_funnel, sendrtcp_funnel, videosrc, video_rtppayloader, NULL); + + res = gst_element_link (audiosrc, audio_encoder); + fail_unless (res == TRUE, NULL); + res = gst_element_link (audio_encoder, audio_rtppayloader); + fail_unless (res == TRUE, NULL); + res = + gst_element_link_pads_full (audio_rtppayloader, "src", rtpbin, + "send_rtp_sink_0", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + + res = gst_element_link (videosrc, video_rtppayloader); + fail_unless (res == TRUE, NULL); + res = + gst_element_link_pads_full (video_rtppayloader, "src", rtpbin, + "send_rtp_sink_1", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + + res = + gst_element_link_pads_full (sendrtp_funnel, "src", sendrtp_udpsink, + "sink", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + + funnel_pad = gst_element_get_request_pad (sendrtp_funnel, "sink_%u"); + rtp_src_pad = gst_element_get_static_pad (rtpbin, "send_rtp_src_0"); + res = gst_pad_link (rtp_src_pad, funnel_pad); + gst_object_unref (funnel_pad); + gst_object_unref (rtp_src_pad); + + funnel_pad = gst_element_get_request_pad (sendrtp_funnel, "sink_%u"); + rtp_src_pad = gst_element_get_static_pad (rtpbin, "send_rtp_src_1"); + res = gst_pad_link (rtp_src_pad, funnel_pad); + gst_object_unref (funnel_pad); + gst_object_unref (rtp_src_pad); + + res = + gst_element_link_pads_full (sendrtcp_funnel, "src", send_rtcp_udpsink, + "sink", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + + funnel_pad = gst_element_get_request_pad (sendrtcp_funnel, "sink_%u"); + rtp_src_pad = gst_element_get_request_pad (rtpbin, "send_rtcp_src_0"); + res = + gst_pad_link_full (rtp_src_pad, funnel_pad, GST_PAD_LINK_CHECK_NOTHING); + gst_object_unref (funnel_pad); + gst_object_unref (rtp_src_pad); + + funnel_pad = gst_element_get_request_pad (sendrtcp_funnel, "sink_%u"); + rtp_src_pad = gst_element_get_request_pad (rtpbin, "send_rtcp_src_1"); + res = + gst_pad_link_full (rtp_src_pad, funnel_pad, GST_PAD_LINK_CHECK_NOTHING); + gst_object_unref (funnel_pad); + gst_object_unref (rtp_src_pad); + + } else { + recv_rtp_udpsrc = gst_element_factory_make ("udpsrc", NULL); + g_object_set (recv_rtp_udpsrc, "port", rtp_udp_port, NULL); + rtpcaps = gst_caps_from_string ("application/x-rtp"); + g_object_set (recv_rtp_udpsrc, "caps", rtpcaps, NULL); + gst_caps_unref (rtpcaps); + + recv_rtcp_udpsrc = gst_element_factory_make ("udpsrc", NULL); + g_object_set (recv_rtcp_udpsrc, "port", rtcp_udp_port, NULL); + + audio_rtpdepayloader = + gst_element_factory_make ("rtppcmadepay", "audio_rtpdepayloader"); + audio_decoder = gst_element_factory_make ("alawdec", NULL); + audio_sink = gst_element_factory_make ("fakesink", NULL); + g_object_set (audio_sink, "sync", TRUE, NULL); + + video_rtpdepayloader = + gst_element_factory_make ("rtpvrawdepay", "video_rtpdepayloader"); + video_sink = gst_element_factory_make ("fakesink", NULL); + g_object_set (video_sink, "sync", TRUE, NULL); + + gst_bin_add_many (GST_BIN (pipeline), recv_rtp_udpsrc, recv_rtcp_udpsrc, + audio_rtpdepayloader, audio_decoder, audio_sink, video_rtpdepayloader, + video_sink, NULL); + + res = + gst_element_link_pads_full (audio_rtpdepayloader, "src", audio_decoder, + "sink", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + res = gst_element_link (audio_decoder, audio_sink); + fail_unless (res == TRUE, NULL); + + res = + gst_element_link_pads_full (video_rtpdepayloader, "src", video_sink, + "sink", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + + /* request a single receiving RTP session. */ + res = + gst_element_link_pads_full (recv_rtcp_udpsrc, "src", rtpbin, + "recv_rtcp_sink_0", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + res = + gst_element_link_pads_full (recv_rtp_udpsrc, "src", rtpbin, + "recv_rtp_sink_0", GST_PAD_LINK_CHECK_NOTHING); + fail_unless (res == TRUE, NULL); + } + + return pipeline; +} + +GST_START_TEST (test_simple_rtpbin_bundle) +{ + GstElement *send_pipeline, *recv_pipeline; + GstBus *send_bus, *recv_bus; + GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE; + GstElement *rtpbin_receive; + GObject *rtp_session; + + main_loop = g_main_loop_new (NULL, FALSE); + + send_pipeline = create_pipeline (TRUE); + recv_pipeline = create_pipeline (FALSE); + + send_bus = gst_element_get_bus (send_pipeline); + gst_bus_add_signal_watch_full (send_bus, G_PRIORITY_HIGH); + + g_signal_connect (send_bus, "message::error", (GCallback) message_received, + send_pipeline); + g_signal_connect (send_bus, "message::warning", (GCallback) message_received, + send_pipeline); + g_signal_connect (send_bus, "message::eos", (GCallback) message_received, + send_pipeline); + + recv_bus = gst_element_get_bus (recv_pipeline); + gst_bus_add_signal_watch_full (recv_bus, G_PRIORITY_HIGH); + + g_signal_connect (recv_bus, "message::error", (GCallback) message_received, + recv_pipeline); + g_signal_connect (recv_bus, "message::warning", (GCallback) message_received, + recv_pipeline); + g_signal_connect (recv_bus, "message::eos", (GCallback) message_received, + recv_pipeline); + + state_res = gst_element_set_state (recv_pipeline, GST_STATE_PLAYING); + ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); + + state_res = gst_element_set_state (send_pipeline, GST_STATE_PLAYING); + ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); + + GST_INFO ("enter mainloop"); + g_main_loop_run (main_loop); + GST_INFO ("exit mainloop"); + + rtpbin_receive = + gst_bin_get_by_name (GST_BIN (recv_pipeline), "rtpbin_receive"); + fail_if (rtpbin_receive == NULL, NULL); + + /* Check that 2 RTP sessions where created while only one was explicitely requested. */ + g_signal_emit_by_name (rtpbin_receive, "get-internal-session", 0, + &rtp_session); + fail_if (rtp_session == NULL, NULL); + g_object_unref (rtp_session); + g_signal_emit_by_name (rtpbin_receive, "get-internal-session", 1, + &rtp_session); + fail_if (rtp_session == NULL, NULL); + g_object_unref (rtp_session); + + gst_object_unref (rtpbin_receive); + + state_res = gst_element_set_state (send_pipeline, GST_STATE_NULL); + ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); + + state_res = gst_element_set_state (recv_pipeline, GST_STATE_NULL); + ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); + + /* cleanup */ + g_main_loop_unref (main_loop); + + gst_bus_remove_signal_watch (send_bus); + gst_object_unref (send_bus); + gst_object_unref (send_pipeline); + + gst_bus_remove_signal_watch (recv_bus); + gst_object_unref (recv_bus); + gst_object_unref (recv_pipeline); + +} + +GST_END_TEST; + +static Suite * +rtpbundle_suite (void) +{ + Suite *s = suite_create ("rtpbundle"); + TCase *tc_chain = tcase_create ("general"); + + tcase_set_timeout (tc_chain, 10000); + + suite_add_tcase (s, tc_chain); + + tcase_add_test (tc_chain, test_simple_rtpbin_bundle); + + return s; +} + +GST_CHECK_MAIN (rtpbundle); diff --git a/tests/check/meson.build b/tests/check/meson.build index 4559e66..0f41377 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -72,6 +72,7 @@ good_tests = [ [ 'elements/rtpaux' ], [ 'elements/rtpbin' ], [ 'elements/rtpbin_buffer_list' ], + [ 'elements/rtpbundle' ], [ 'elements/rtpcollision' ], [ 'elements/rtpjitterbuffer' ], [ 'elements/rtpmux' ], diff --git a/tests/examples/rtp/.gitignore b/tests/examples/rtp/.gitignore index 02f551f..8c3c6d3 100644 --- a/tests/examples/rtp/.gitignore +++ b/tests/examples/rtp/.gitignore @@ -2,3 +2,5 @@ client-PCMA server-alsasrc-PCMA client-rtpaux server-rtpaux +client-rtpbundle +server-rtpbundle diff --git a/tests/examples/rtp/Makefile.am b/tests/examples/rtp/Makefile.am index 40ab2c5..4d7f7c1 100644 --- a/tests/examples/rtp/Makefile.am +++ b/tests/examples/rtp/Makefile.am @@ -1,5 +1,5 @@ noinst_PROGRAMS = server-alsasrc-PCMA client-PCMA \ - client-rtpaux server-rtpaux + client-rtpaux server-rtpaux client-rtpbundle server-rtpbundle # FIXME 0.11: ignore GValueArray warnings for now until this is sorted ERROR_CFLAGS= @@ -12,6 +12,14 @@ client_rtpaux_SOURCES = client-rtpaux.c client_rtpaux_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) client_rtpaux_LDADD = $(GST_LIBS) +server_rtpbundle_SOURCES = server-rtpbundle.c +server_rtpbundle_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) +server_rtpbundle_LDADD = $(GST_LIBS) + +client_rtpbundle_SOURCES = client-rtpbundle.c +client_rtpbundle_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) +client_rtpbundle_LDADD = $(GST_LIBS) + server_alsasrc_PCMA_SOURCES = server-alsasrc-PCMA.c server_alsasrc_PCMA_CFLAGS = $(GST_CFLAGS) server_alsasrc_PCMA_LDADD = $(GST_LIBS) $(LIBM) diff --git a/tests/examples/rtp/client-rtpbundle.c b/tests/examples/rtp/client-rtpbundle.c new file mode 100644 index 0000000..cd3a737 --- /dev/null +++ b/tests/examples/rtp/client-rtpbundle.c @@ -0,0 +1,266 @@ +/* GStreamer + * Copyright (C) 2016 Igalia S.L + * @author Philippe Normand + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +/* + * RTP bundle receiver + * + * In this example we initially create one RTP session but the incoming RTP + * and RTCP streams actually bundle 2 different media type, one audio stream + * and one video stream. We are notified of the discovery of the streams by + * the on-bundled-ssrc rtpbin signal. In the handler we decide to assign the + * first SSRC to the (existing) audio session and the second SSRC to a new + * session (id: 1). + * + * .-------. .----------. .-----------. .-------. .-------------. + * RTP |udpsrc | | rtpbin | | pcmadepay | |alawdec| |autoaudiosink| + * port=5001 | src->recv_rtp_0 recv_rtp_0->sink src->sink src->sink | + * '-------' | | '-----------' '-------' '-------------' + * | | + * | | .-------. + * | | |udpsink| RTCP + * | send_rtcp_0->sink | port=5003 + * .-------. | | '-------' sync=false + * RTCP |udpsrc | | | async=false + * port=5002 | src->recv_rtcp_0 | + * '-------' | | + * | | + * | | .---------. .-------------. + * | | |vrawdepay| |autovideosink| + * | recv_rtp_1->sink src->sink | + * | | '---------' '-------------' + * | | + * | | .-------. + * | | |udpsink| RTCP + * | send_rtcp_1->sink | port=5004 + * | | '-------' sync=false + * | | async=false + * | | + * '----------' + * + */ + +static gboolean +plug_video_rtcp_sender (gpointer user_data) +{ + gint send_video_rtcp_port = 5004; + GstElement *rtpbin = GST_ELEMENT_CAST (user_data); + GstElement *send_video_rtcp_udpsink; + GstElement *pipeline = + GST_ELEMENT_CAST (gst_object_get_parent (GST_OBJECT (rtpbin))); + + send_video_rtcp_udpsink = gst_element_factory_make ("udpsink", NULL); + g_object_set (send_video_rtcp_udpsink, "host", "127.0.0.1", NULL); + g_object_set (send_video_rtcp_udpsink, "port", send_video_rtcp_port, NULL); + g_object_set (send_video_rtcp_udpsink, "sync", FALSE, NULL); + g_object_set (send_video_rtcp_udpsink, "async", FALSE, NULL); + gst_bin_add (GST_BIN (pipeline), send_video_rtcp_udpsink); + gst_element_link_pads (rtpbin, "send_rtcp_src_1", send_video_rtcp_udpsink, + "sink"); + gst_element_sync_state_with_parent (send_video_rtcp_udpsink); + + gst_object_unref (pipeline); + gst_object_unref (rtpbin); + return G_SOURCE_REMOVE; +} + +static void +on_rtpbinreceive_pad_added (GstElement * rtpbin, GstPad * new_pad, + gpointer data) +{ + GstElement *pipeline = GST_ELEMENT (data); + gchar *pad_name = gst_pad_get_name (new_pad); + + if (g_str_has_prefix (pad_name, "recv_rtp_src_")) { + GstCaps *caps = gst_pad_get_current_caps (new_pad); + GstStructure *s = gst_caps_get_structure (caps, 0); + const gchar *media_type = gst_structure_get_string (s, "media"); + gchar *depayloader_name = g_strdup_printf ("%s_rtpdepayloader", media_type); + GstElement *rtpdepayloader = + gst_bin_get_by_name (GST_BIN (pipeline), depayloader_name); + GstPad *sinkpad; + + g_free (depayloader_name); + + sinkpad = gst_element_get_static_pad (rtpdepayloader, "sink"); + gst_pad_link (new_pad, sinkpad); + gst_object_unref (sinkpad); + gst_object_unref (rtpdepayloader); + + gst_caps_unref (caps); + + if (g_str_has_prefix (pad_name, "recv_rtp_src_1")) { + g_timeout_add (0, plug_video_rtcp_sender, gst_object_ref (rtpbin)); + } + } + g_free (pad_name); +} + +static guint +on_bundled_ssrc (GstElement * rtpbin, guint ssrc, gpointer user_data) +{ + static gboolean create_session = FALSE; + guint session_id = 0; + + if (create_session) { + session_id = 1; + } else { + create_session = TRUE; + /* use existing session 0, a new session will be created for the next discovered bundled SSRC */ + } + return session_id; +} + +static GstCaps * +on_request_pt_map (GstElement * rtpbin, guint session_id, guint pt, + gpointer user_data) +{ + GstCaps *caps = NULL; + if (pt == 96) { + caps = + gst_caps_from_string + ("application/x-rtp,media=(string)audio,encoding-name=(string)PCMA,clock-rate=(int)8000"); + } else if (pt == 100) { + caps = + gst_caps_from_string + ("application/x-rtp,media=(string)video,encoding-name=(string)RAW,clock-rate=(int)90000,sampling=(string)\"YCbCr-4:2:0\",depth=(string)8,width=(string)320,height=(string)240"); + } + return caps; +} + +static GstElement * +create_pipeline (void) +{ + GstElement *pipeline, *rtpbin, *recv_rtp_udpsrc, *recv_rtcp_udpsrc, + *audio_rtpdepayloader, *audio_decoder, *audio_sink, *video_rtpdepayloader, + *video_sink, *send_audio_rtcp_udpsink; + GstCaps *rtpcaps; + gint rtp_udp_port = 5001; + gint rtcp_udp_port = 5002; + gint send_audio_rtcp_port = 5003; + + pipeline = gst_pipeline_new (NULL); + + rtpbin = gst_element_factory_make ("rtpbin", NULL); + g_object_set (rtpbin, "latency", 200, NULL); + + g_signal_connect (rtpbin, "on-bundled-ssrc", + G_CALLBACK (on_bundled_ssrc), NULL); + g_signal_connect (rtpbin, "request-pt-map", + G_CALLBACK (on_request_pt_map), NULL); + + g_signal_connect (rtpbin, "pad-added", + G_CALLBACK (on_rtpbinreceive_pad_added), pipeline); + + gst_bin_add (GST_BIN (pipeline), rtpbin); + + recv_rtp_udpsrc = gst_element_factory_make ("udpsrc", NULL); + g_object_set (recv_rtp_udpsrc, "port", rtp_udp_port, NULL); + rtpcaps = gst_caps_from_string ("application/x-rtp"); + g_object_set (recv_rtp_udpsrc, "caps", rtpcaps, NULL); + gst_caps_unref (rtpcaps); + + recv_rtcp_udpsrc = gst_element_factory_make ("udpsrc", NULL); + g_object_set (recv_rtcp_udpsrc, "port", rtcp_udp_port, NULL); + + audio_rtpdepayloader = + gst_element_factory_make ("rtppcmadepay", "audio_rtpdepayloader"); + audio_decoder = gst_element_factory_make ("alawdec", NULL); + audio_sink = gst_element_factory_make ("autoaudiosink", NULL); + + video_rtpdepayloader = + gst_element_factory_make ("rtpvrawdepay", "video_rtpdepayloader"); + video_sink = gst_element_factory_make ("autovideosink", NULL); + + gst_bin_add_many (GST_BIN (pipeline), recv_rtp_udpsrc, recv_rtcp_udpsrc, + audio_rtpdepayloader, audio_decoder, audio_sink, video_rtpdepayloader, + video_sink, NULL); + + gst_element_link_pads (audio_rtpdepayloader, "src", audio_decoder, "sink"); + gst_element_link (audio_decoder, audio_sink); + + gst_element_link_pads (video_rtpdepayloader, "src", video_sink, "sink"); + + /* request a single receiving RTP session. */ + gst_element_link_pads (recv_rtcp_udpsrc, "src", rtpbin, "recv_rtcp_sink_0"); + gst_element_link_pads (recv_rtp_udpsrc, "src", rtpbin, "recv_rtp_sink_0"); + + send_audio_rtcp_udpsink = gst_element_factory_make ("udpsink", NULL); + g_object_set (send_audio_rtcp_udpsink, "host", "127.0.0.1", NULL); + g_object_set (send_audio_rtcp_udpsink, "port", send_audio_rtcp_port, NULL); + g_object_set (send_audio_rtcp_udpsink, "sync", FALSE, NULL); + g_object_set (send_audio_rtcp_udpsink, "async", FALSE, NULL); + gst_bin_add (GST_BIN (pipeline), send_audio_rtcp_udpsink); + gst_element_link_pads (rtpbin, "send_rtcp_src_0", send_audio_rtcp_udpsink, + "sink"); + + return pipeline; +} + +/* + * Used to generate informative messages during pipeline startup + */ +static void +cb_state (GstBus * bus, GstMessage * message, gpointer data) +{ + GstObject *pipe = GST_OBJECT (data); + GstState old, new, pending; + gst_message_parse_state_changed (message, &old, &new, &pending); + if (message->src == pipe) { + g_print ("Pipeline %s changed state from %s to %s\n", + GST_OBJECT_NAME (message->src), + gst_element_state_get_name (old), gst_element_state_get_name (new)); + if (old == GST_STATE_PAUSED && new == GST_STATE_PLAYING) + GST_DEBUG_BIN_TO_DOT_FILE (GST_BIN (pipe), GST_DEBUG_GRAPH_SHOW_ALL, + GST_OBJECT_NAME (message->src)); + } +} + +int +main (int argc, char **argv) +{ + GstElement *pipe; + GstBus *bus; + GMainLoop *loop; + + gst_init (&argc, &argv); + + loop = g_main_loop_new (NULL, FALSE); + + pipe = create_pipeline (); + bus = gst_element_get_bus (pipe); + g_signal_connect (bus, "message::state-changed", G_CALLBACK (cb_state), pipe); + gst_bus_add_signal_watch (bus); + gst_object_unref (bus); + + g_print ("starting server pipeline\n"); + gst_element_set_state (pipe, GST_STATE_PLAYING); + + g_main_loop_run (loop); + + g_print ("stopping server pipeline\n"); + gst_element_set_state (pipe, GST_STATE_NULL); + + gst_object_unref (pipe); + g_main_loop_unref (loop); + + return 0; +} diff --git a/tests/examples/rtp/server-rtpbundle.c b/tests/examples/rtp/server-rtpbundle.c new file mode 100644 index 0000000..1f6d01b --- /dev/null +++ b/tests/examples/rtp/server-rtpbundle.c @@ -0,0 +1,179 @@ +/* GStreamer + * Copyright (C) 2016 Igalia S.L + * @author Philippe Normand + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#include + +/* + * An bundling RTP server + * creates two sessions and streams audio on one, video on the other, with RTCP + * on both sessions. The destination is 127.0.0.1. + * + * The RTP streams are bundled to a single outgoing connection. Same for the RTCP streams. + * + * .-------. .-------. .-------. .------------. .------. + * |audiots| |alawenc| |pcmapay| | rtpbin | |funnel| + * | src->sink src->sink src->send_rtp_0 send_rtp_0--->sink_0 | .-------. + * '-------' '-------' '-------' | | | | |udpsink| + * | | | src->sink | + * .-------. .---------. | | | | '-------' + * |videots| | vrawpay | | | | | + * | src------------>sink src->send_rtp_1 send_rtp_1--->sink_1 | + * '-------' '---------' | | '------' + * | | + * .------. | | + * |udpsrc| | | .------. + * | src->recv_rtcp_0 | |funnel| + * '------' | send_rtcp_0-->sink_0 | .-------. + * | | | | |udpsink| + * .------. | | | src->sink | + * |udpsrc| | | | | '-------' + * | src->recv_rtcp_1 | | | + * '------' | send_rtcp_1-->sink_1 | + * '------------' '------' + * + */ + +static GstElement * +create_pipeline (void) +{ + GstElement *pipeline, *rtpbin, *audiosrc, *audio_encoder, + *audio_rtppayloader, *sendrtp_udpsink, + *send_rtcp_udpsink, *sendrtcp_funnel, *sendrtp_funnel; + GstElement *videosrc, *video_rtppayloader, *time_overlay; + gint rtp_udp_port = 5001; + gint rtcp_udp_port = 5002; + gint recv_audio_rtcp_port = 5003; + gint recv_video_rtcp_port = 5004; + GstElement *audio_rtcp_udpsrc, *video_rtcp_udpsrc; + + pipeline = gst_pipeline_new (NULL); + + rtpbin = gst_element_factory_make ("rtpbin", NULL); + + audiosrc = gst_element_factory_make ("audiotestsrc", NULL); + g_object_set (audiosrc, "is-live", TRUE, NULL); + audio_encoder = gst_element_factory_make ("alawenc", NULL); + audio_rtppayloader = gst_element_factory_make ("rtppcmapay", NULL); + g_object_set (audio_rtppayloader, "pt", 96, NULL); + + videosrc = gst_element_factory_make ("videotestsrc", NULL); + g_object_set (videosrc, "is-live", TRUE, NULL); + time_overlay = gst_element_factory_make ("timeoverlay", NULL); + video_rtppayloader = gst_element_factory_make ("rtpvrawpay", NULL); + g_object_set (video_rtppayloader, "pt", 100, NULL); + + /* muxed rtcp */ + sendrtcp_funnel = gst_element_factory_make ("funnel", "send_rtcp_funnel"); + send_rtcp_udpsink = gst_element_factory_make ("udpsink", NULL); + g_object_set (send_rtcp_udpsink, "host", "127.0.0.1", NULL); + g_object_set (send_rtcp_udpsink, "port", rtcp_udp_port, NULL); + g_object_set (send_rtcp_udpsink, "sync", FALSE, NULL); + g_object_set (send_rtcp_udpsink, "async", FALSE, NULL); + + /* outgoing bundled stream */ + sendrtp_funnel = gst_element_factory_make ("funnel", "send_rtp_funnel"); + sendrtp_udpsink = gst_element_factory_make ("udpsink", NULL); + g_object_set (sendrtp_udpsink, "host", "127.0.0.1", NULL); + g_object_set (sendrtp_udpsink, "port", rtp_udp_port, NULL); + g_object_set (sendrtp_udpsink, "sync", FALSE, NULL); + g_object_set (sendrtp_udpsink, "async", FALSE, NULL); + + gst_bin_add_many (GST_BIN (pipeline), rtpbin, audiosrc, audio_encoder, + audio_rtppayloader, sendrtp_udpsink, send_rtcp_udpsink, + sendrtp_funnel, sendrtcp_funnel, videosrc, video_rtppayloader, NULL); + + if (time_overlay) + gst_bin_add (GST_BIN (pipeline), time_overlay); + + gst_element_link_many (audiosrc, audio_encoder, audio_rtppayloader, NULL); + gst_element_link_pads (audio_rtppayloader, "src", rtpbin, "send_rtp_sink_0"); + + if (time_overlay) { + gst_element_link_many (videosrc, time_overlay, video_rtppayloader, NULL); + } else { + gst_element_link (videosrc, video_rtppayloader); + } + + gst_element_link_pads (video_rtppayloader, "src", rtpbin, "send_rtp_sink_1"); + + gst_element_link_pads (sendrtp_funnel, "src", sendrtp_udpsink, "sink"); + gst_element_link_pads (rtpbin, "send_rtp_src_0", sendrtp_funnel, "sink_%u"); + gst_element_link_pads (rtpbin, "send_rtp_src_1", sendrtp_funnel, "sink_%u"); + gst_element_link_pads (sendrtcp_funnel, "src", send_rtcp_udpsink, "sink"); + gst_element_link_pads (rtpbin, "send_rtcp_src_0", sendrtcp_funnel, "sink_%u"); + gst_element_link_pads (rtpbin, "send_rtcp_src_1", sendrtcp_funnel, "sink_%u"); + + audio_rtcp_udpsrc = gst_element_factory_make ("udpsrc", NULL); + g_object_set (audio_rtcp_udpsrc, "port", recv_audio_rtcp_port, NULL); + video_rtcp_udpsrc = gst_element_factory_make ("udpsrc", NULL); + g_object_set (video_rtcp_udpsrc, "port", recv_video_rtcp_port, NULL); + gst_bin_add_many (GST_BIN (pipeline), audio_rtcp_udpsrc, video_rtcp_udpsrc, + NULL); + gst_element_link_pads (audio_rtcp_udpsrc, "src", rtpbin, "recv_rtcp_sink_0"); + gst_element_link_pads (video_rtcp_udpsrc, "src", rtpbin, "recv_rtcp_sink_1"); + + return pipeline; +} + +/* + * Used to generate informative messages during pipeline startup + */ +static void +cb_state (GstBus * bus, GstMessage * message, gpointer data) +{ + GstObject *pipe = GST_OBJECT (data); + GstState old, new, pending; + gst_message_parse_state_changed (message, &old, &new, &pending); + if (message->src == pipe) { + g_print ("Pipeline %s changed state from %s to %s\n", + GST_OBJECT_NAME (message->src), + gst_element_state_get_name (old), gst_element_state_get_name (new)); + } +} + +int +main (int argc, char **argv) +{ + GstElement *pipe; + GstBus *bus; + GMainLoop *loop; + + gst_init (&argc, &argv); + + loop = g_main_loop_new (NULL, FALSE); + + pipe = create_pipeline (); + bus = gst_element_get_bus (pipe); + g_signal_connect (bus, "message::state-changed", G_CALLBACK (cb_state), pipe); + gst_bus_add_signal_watch (bus); + gst_object_unref (bus); + + g_print ("starting server pipeline\n"); + gst_element_set_state (pipe, GST_STATE_PLAYING); + + g_main_loop_run (loop); + + g_print ("stopping server pipeline\n"); + gst_element_set_state (pipe, GST_STATE_NULL); + + gst_object_unref (pipe); + g_main_loop_unref (loop); + + return 0; +}