X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Fgstrtpbin.c;h=fbed41415536c4759bdd13c4d7bc1140794e52dd;hb=f05e14ba316d67b9cdf1440d9b2129da03625a30;hp=dd83eb4df326300a6f703445392a76b36cf3366a;hpb=5986dc287c2e53b0a07498835d39ab1c2c47b939;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index dd83eb4..fbed414 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -1,21 +1,21 @@ -/* GStreamer - * Copyright (C) <2007> Wim Taymans - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ + /* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ /** * SECTION:element-rtpbin @@ -53,13 +53,6 @@ * SSRC in the RTP packets to its own SSRC and wil forward the packets on the * send_rtp_src_\%u pad after updating its internal state. * - * #GstRtpBin can also demultiplex incoming bundled streams. The first - * #GstRtpSession will have a #GstRtpSsrcDemux element splitting the streams - * based on their SSRC and potentially dispatched to a different #GstRtpSession. - * Because retransmission SSRCs need to be merged with the corresponding media - * stream the #GstRtpBin::on-bundled-ssrc signal is emitted so that the - * application can find out to which session the SSRC belongs. - * * The session manager needs the clock-rate of the payload types it is handling * and will signal the #GstRtpSession::request-pt-map signal when it needs such a * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map @@ -204,9 +197,6 @@ static GstStaticPadTemplate rtpbin_send_rtp_src_template = GST_STATIC_CAPS ("application/x-rtp;application/x-srtp") ); -#define GST_RTP_BIN_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) - #define GST_RTP_BIN_LOCK(bin) g_mutex_lock (&(bin)->priv->bin_lock) #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock) @@ -230,6 +220,10 @@ G_STMT_START { \ #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin) \ GST_RTP_BIN_DYN_UNLOCK (bin); \ +/* Minimum time offset to apply. This compensates for rounding errors in NTP to + * RTP timestamp conversions */ +#define MIN_TS_OFFSET (4 * GST_MSECOND) + struct _GstRtpBinPrivate { GMutex bin_lock; @@ -258,6 +252,8 @@ enum SIGNAL_RESET_SYNC, SIGNAL_GET_SESSION, SIGNAL_GET_INTERNAL_SESSION, + SIGNAL_GET_STORAGE, + SIGNAL_GET_INTERNAL_STORAGE, SIGNAL_ON_NEW_SSRC, SIGNAL_ON_SSRC_COLLISION, @@ -275,7 +271,11 @@ enum SIGNAL_REQUEST_RTCP_ENCODER, SIGNAL_REQUEST_RTCP_DECODER, + SIGNAL_REQUEST_FEC_DECODER, + SIGNAL_REQUEST_FEC_ENCODER, + SIGNAL_NEW_JITTERBUFFER, + SIGNAL_NEW_STORAGE, SIGNAL_REQUEST_AUX_SENDER, SIGNAL_REQUEST_AUX_RECEIVER, @@ -309,6 +309,8 @@ enum #define DEFAULT_MAX_MISORDER_TIME 2000 #define DEFAULT_RFC7273_SYNC FALSE #define DEFAULT_MAX_STREAMS G_MAXUINT +#define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0) +#define DEFAULT_MAX_TS_OFFSET G_GINT64_CONSTANT(3000000000) enum { @@ -333,7 +335,9 @@ enum PROP_MAX_DROPOUT_TIME, PROP_MAX_MISORDER_TIME, PROP_RFC7273_SYNC, - PROP_MAX_STREAMS + PROP_MAX_STREAMS, + PROP_MAX_TS_OFFSET_ADJUSTMENT, + PROP_MAX_TS_OFFSET, }; #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) @@ -373,12 +377,12 @@ static void free_client (GstRtpBinClient * client, GstRtpBin * bin); static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin); static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id); static GstPad *complete_session_sink (GstRtpBin * rtpbin, - GstRtpBinSession * session, gboolean bundle_demuxer_needed); + GstRtpBinSession * session); static void complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, guint sessid); static GstPad *complete_session_rtcp (GstRtpBin * rtpbin, - GstRtpBinSession * session, guint sessid, gboolean bundle_demuxer_needed); + GstRtpBinSession * session, guint sessid); /* Manages the RTP stream for one SSRC. * @@ -433,6 +437,9 @@ struct _GstRtpBinStream * there they are pushed into an SSRC demuxer that splits the stream based on * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with * the GstRtpBinStream above). + * + * Before the SSRC demuxer, a storage element may be inserted for the purpose + * of Forward Error Correction. */ struct _GstRtpBinSession { @@ -447,11 +454,8 @@ struct _GstRtpBinSession gulong demux_newpad_sig; gulong demux_padremoved_sig; - /* Bundling support */ - GstElement *rtp_funnel; - GstElement *rtcp_funnel; - GstElement *bundle_demux; - gulong bundle_demux_newpad_sig; + /* Fec support */ + GstElement *storage; GMutex lock; @@ -473,7 +477,6 @@ struct _GstRtpBinSession GstPad *sync_src; GstPad *send_rtp_sink; GstPad *send_rtp_sink_ghost; - GstPad *send_rtp_src; GstPad *send_rtp_src_ghost; GstPad *send_rtcp_src; GstPad *send_rtcp_src_ghost; @@ -654,108 +657,13 @@ ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad, GST_RTP_BIN_UNLOCK (rtpbin); } -static void -new_bundled_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, - GstRtpBinSession * session) -{ - GValue result = G_VALUE_INIT; - GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT }; - guint session_id = 0; - GstRtpBinSession *target_session = NULL; - GstRtpBin *rtpbin = session->bin; - gchar *name; - GstPad *src_pad; - GstPad *recv_rtp_sink = NULL; - GstPad *recv_rtcp_sink = NULL; - GstPadLinkReturn ret; - - GST_RTP_BIN_DYN_LOCK (rtpbin); - GST_DEBUG_OBJECT (rtpbin, "new bundled SSRC pad %08x, %s:%s", ssrc, - GST_DEBUG_PAD_NAME (pad)); - - g_value_init (&result, G_TYPE_UINT); - g_value_init (¶ms[0], GST_TYPE_ELEMENT); - g_value_set_object (¶ms[0], rtpbin); - g_value_init (¶ms[1], G_TYPE_UINT); - g_value_set_uint (¶ms[1], ssrc); - - g_signal_emitv (params, - gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, &result); - g_value_unset (¶ms[0]); - - session_id = g_value_get_uint (&result); - if (session_id == 0) { - target_session = session; - } else { - target_session = find_session_by_id (rtpbin, (gint) session_id); - if (!target_session) { - target_session = create_session (rtpbin, session_id); - } - if (!target_session) { - /* create_session() warned already */ - GST_RTP_BIN_DYN_UNLOCK (rtpbin); - return; - } - - if (!target_session->recv_rtp_sink) { - recv_rtp_sink = complete_session_sink (rtpbin, target_session, FALSE); - } - - if (!target_session->recv_rtp_src) - complete_session_receiver (rtpbin, target_session, session_id); - - if (!target_session->recv_rtcp_sink) { - recv_rtcp_sink = - complete_session_rtcp (rtpbin, target_session, session_id, FALSE); - } - } - - GST_DEBUG_OBJECT (rtpbin, "Assigning bundled ssrc %u to session %u", ssrc, - session_id); - - if (!recv_rtp_sink) { - recv_rtp_sink = - gst_element_get_request_pad (target_session->rtp_funnel, "sink_%u"); - } - - if (!recv_rtcp_sink) { - recv_rtcp_sink = - gst_element_get_request_pad (target_session->rtcp_funnel, "sink_%u"); - } - - name = g_strdup_printf ("src_%u", ssrc); - src_pad = gst_element_get_static_pad (element, name); - ret = gst_pad_link (src_pad, recv_rtp_sink); - g_free (name); - gst_object_unref (src_pad); - gst_object_unref (recv_rtp_sink); - if (ret != GST_PAD_LINK_OK) { - g_warning - ("rtpbin: failed to link bundle demuxer to receive rtp funnel for session %u", - session_id); - } - - name = g_strdup_printf ("rtcp_src_%u", ssrc); - src_pad = gst_element_get_static_pad (element, name); - gst_pad_link (src_pad, recv_rtcp_sink); - g_free (name); - gst_object_unref (src_pad); - gst_object_unref (recv_rtcp_sink); - if (ret != GST_PAD_LINK_OK) { - g_warning - ("rtpbin: failed to link bundle demuxer to receive rtcp sink pad for session %u", - session_id); - } - - GST_RTP_BIN_DYN_UNLOCK (rtpbin); -} - /* create a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * create_session (GstRtpBin * rtpbin, gint id) { GstRtpBinSession *sess; GstElement *session, *demux; + GstElement *storage = NULL; GstState target; if (!(session = gst_element_factory_make ("rtpsession", NULL))) @@ -764,15 +672,22 @@ create_session (GstRtpBin * rtpbin, gint id) if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) goto no_demux; + if (!(storage = gst_element_factory_make ("rtpstorage", NULL))) + goto no_storage; + + /* need to sink the storage or otherwise signal handlers from bindings will + * take ownership of it and we don't own it anymore */ + gst_object_ref_sink (storage); + g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_STORAGE], 0, storage, + id); + sess = g_new0 (GstRtpBinSession, 1); g_mutex_init (&sess->lock); sess->id = id; sess->bin = rtpbin; sess->session = session; sess->demux = demux; - - sess->rtp_funnel = gst_element_factory_make ("funnel", NULL); - sess->rtcp_funnel = gst_element_factory_make ("funnel", NULL); + sess->storage = storage; sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); @@ -821,8 +736,11 @@ create_session (GstRtpBin * rtpbin, gint id) gst_bin_add (GST_BIN_CAST (rtpbin), session); gst_bin_add (GST_BIN_CAST (rtpbin), demux); - gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtp_funnel); - gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtcp_funnel); + gst_bin_add (GST_BIN_CAST (rtpbin), storage); + + /* unref the storage again, the bin has a reference now and + * we don't need it anymore */ + gst_object_unref (storage); GST_OBJECT_LOCK (rtpbin); target = GST_STATE_TARGET (rtpbin); @@ -831,8 +749,7 @@ create_session (GstRtpBin * rtpbin, gint id) /* change state only to what's needed */ gst_element_set_state (demux, target); gst_element_set_state (session, target); - gst_element_set_state (sess->rtp_funnel, target); - gst_element_set_state (sess->rtcp_funnel, target); + gst_element_set_state (storage, target); return sess; @@ -848,6 +765,13 @@ no_demux: g_warning ("rtpbin: could not create rtpssrcdemux element"); return NULL; } +no_storage: + { + gst_object_unref (session); + gst_object_unref (demux); + g_warning ("rtpbin: could not create rtpstorage element"); + return NULL; + } } static gboolean @@ -859,6 +783,10 @@ bin_manage_element (GstRtpBin * bin, GstElement * element) GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element); } else { GST_DEBUG_OBJECT (bin, "adding requested element %p", element); + + if (g_object_is_floating (element)) + element = gst_object_ref_sink (element); + if (!gst_bin_add (GST_BIN_CAST (bin), element)) goto add_failed; if (!gst_element_sync_state_with_parent (element)) @@ -874,6 +802,7 @@ bin_manage_element (GstRtpBin * bin, GstElement * element) add_failed: { GST_WARNING_OBJECT (bin, "unable to add element"); + gst_object_unref (element); return FALSE; } } @@ -888,10 +817,13 @@ remove_bin_element (GstElement * element, GstRtpBin * bin) if (find) { priv->elements = g_list_delete_link (priv->elements, find); - if (!g_list_find (priv->elements, element)) + if (!g_list_find (priv->elements, element)) { + gst_element_set_locked_state (element, TRUE); gst_bin_remove (GST_BIN_CAST (bin), element); - else - gst_object_unref (element); + gst_element_set_state (element, GST_STATE_NULL); + } + + gst_object_unref (element); } } @@ -903,9 +835,11 @@ free_session (GstRtpBinSession * sess, GstRtpBin * bin) gst_element_set_locked_state (sess->demux, TRUE); gst_element_set_locked_state (sess->session, TRUE); + gst_element_set_locked_state (sess->storage, TRUE); gst_element_set_state (sess->demux, GST_STATE_NULL); gst_element_set_state (sess->session, GST_STATE_NULL); + gst_element_set_state (sess->storage, GST_STATE_NULL); remove_recv_rtp (bin, sess); remove_recv_rtcp (bin, sess); @@ -914,6 +848,7 @@ free_session (GstRtpBinSession * sess, GstRtpBin * bin) gst_bin_remove (GST_BIN_CAST (bin), sess->session); gst_bin_remove (GST_BIN_CAST (bin), sess->demux); + gst_bin_remove (GST_BIN_CAST (bin), sess->storage); g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin); g_slist_free (sess->elements); @@ -1106,6 +1041,43 @@ gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id) } static GstElement * +gst_rtp_bin_get_storage (GstRtpBin * bin, guint session_id) +{ + GstRtpBinSession *session; + GstElement *res = NULL; + + GST_RTP_BIN_LOCK (bin); + GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u", + session_id); + session = find_session_by_id (bin, (gint) session_id); + if (session && session->storage) { + res = gst_object_ref (session->storage); + } + GST_RTP_BIN_UNLOCK (bin); + + return res; +} + +static GObject * +gst_rtp_bin_get_internal_storage (GstRtpBin * bin, guint session_id) +{ + GObject *internal_storage = NULL; + GstRtpBinSession *session; + + GST_RTP_BIN_LOCK (bin); + GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u", + session_id); + session = find_session_by_id (bin, (gint) session_id); + if (session && session->storage) { + g_object_get (session->storage, "internal-storage", &internal_storage, + NULL); + } + GST_RTP_BIN_UNLOCK (bin); + + return internal_storage; +} + +static GstElement * gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id) { GST_DEBUG_OBJECT (bin, "return NULL encoder"); @@ -1261,7 +1233,8 @@ get_current_times (GstRtpBin * bin, GstClockTime * running_time, static void stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream, - gint64 ts_offset, gboolean check) + gint64 ts_offset, gint64 max_ts_offset, gint64 min_ts_offset, + gboolean allow_positive_ts_offset) { gint64 prev_ts_offset; @@ -1277,19 +1250,25 @@ stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream, "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff); - if (check) { - /* only change diff when it changed more than 4 milliseconds. This - * compensates for rounding errors in NTP to RTP timestamp - * conversions */ - if (ABS (diff) < 4 * GST_MSECOND) { - GST_DEBUG_OBJECT (bin, "offset too small, ignoring"); + /* ignore minor offsets */ + if (ABS (diff) < min_ts_offset) { + GST_DEBUG_OBJECT (bin, "offset too small, ignoring"); + return; + } + + /* sanity check offset */ + if (max_ts_offset > 0) { + if (ts_offset > 0 && !allow_positive_ts_offset) { + GST_DEBUG_OBJECT (bin, + "offset is positive (clocks are out of sync), ignoring"); return; } - if (ABS (diff) > (3 * GST_SECOND)) { - GST_WARNING_OBJECT (bin, "offset unusually large, ignoring"); + if (ABS (ts_offset) > max_ts_offset) { + GST_DEBUG_OBJECT (bin, "offset too large, ignoring"); return; } } + g_object_set (stream->buffer, "ts-offset", ts_offset, NULL); } GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, @@ -1421,13 +1400,17 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT, local_ntpnstime, ntpnstime); GST_DEBUG_OBJECT (bin, + "local running time %" G_GUINT64_FORMAT ", SR RTP running time %" + G_GUINT64_FORMAT, local_running_time, running_time); + GST_DEBUG_OBJECT (bin, "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff, rtdiff); /* combine to get the final diff to apply to the running_time */ stream->rt_delta = rtdiff - ntpdiff; - stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE); + stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset, + 0, FALSE); } else { gint64 min, rtp_min, clock_base = stream->clock_base; gboolean all_sync, use_rtp; @@ -1579,7 +1562,8 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, else ts_offset = ostream->rt_delta - min; - stream_set_ts_offset (bin, ostream, ts_offset, TRUE); + stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset, + MIN_TS_OFFSET, TRUE); } } gst_rtp_bin_send_sync_event (stream); @@ -1723,9 +1707,10 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL))) goto no_jitterbuffer; - if (!rtpbin->ignore_pt) + if (!rtpbin->ignore_pt) { if (!(demux = gst_element_factory_make ("rtpptdemux", NULL))) goto no_demux; + } #ifdef TIZEN_FEATURE_RTSP_MODIFICATION if (session->bin->buffer_mode == RTP_JITTER_BUFFER_MODE_SLAVE) if (!(queue2 = gst_element_factory_make ("queue2", NULL))) @@ -1768,6 +1753,8 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, "max-misorder-time", rtpbin->max_misorder_time, NULL); g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL); + g_object_set (buffer, "max-ts-offset-adjustment", + rtpbin->max_ts_offset_adjustment, NULL); #ifdef TIZEN_FEATURE_RTSP_MODIFICATION /* configure queue2 to use live buffering */ @@ -1777,7 +1764,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) g_object_set (queue2, "buffer-mode", GST_BUFFERING_LIVE, NULL); } #endif - + /* need to sink the jitterbufer or otherwise signal handlers from bindings will + * take ownership of it and we don't own it anymore */ + gst_object_ref_sink (buffer); g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0, buffer, session->id, ssrc); @@ -1791,6 +1780,10 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) gst_bin_add (GST_BIN_CAST (rtpbin), buffer); + /* unref the jitterbuffer again, the bin has a reference now and + * we don't need it anymore */ + gst_object_unref (buffer); + /* link stuff */ #ifdef TIZEN_FEATURE_RTSP_MODIFICATION if (queue2) { @@ -1800,10 +1793,10 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) gst_element_link_pads_full (queue2, "src", demux, "sink", GST_PAD_LINK_CHECK_NOTHING); } - } else if (demux) - gst_element_link_pads_full (buffer, "src", demux, "sink", - GST_PAD_LINK_CHECK_NOTHING); - + } else if (demux) { + gst_element_link_pads_full (buffer, "src", demux, "sink", + GST_PAD_LINK_CHECK_NOTHING); + } #else if (demux) gst_element_link_pads_full (buffer, "src", demux, "sink", @@ -1942,7 +1935,7 @@ static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad); static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message); #define gst_rtp_bin_parent_class parent_class -G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN); +G_DEFINE_TYPE_WITH_PRIVATE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN); static gboolean _gst_element_accumulator (GSignalInvocationHint * ihint, @@ -1987,8 +1980,6 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) gstelement_class = (GstElementClass *) klass; gstbin_class = (GstBinClass *) klass; - g_type_class_add_private (klass, sizeof (GstRtpBinPrivate)); - gobject_class->dispose = gst_rtp_bin_dispose; gobject_class->finalize = gst_rtp_bin_finalize; gobject_class->set_property = gst_rtp_bin_set_property; @@ -2089,6 +2080,36 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) RTP_TYPE_SESSION, 1, G_TYPE_UINT); /** + * GstRtpBin::get-internal-storage: + * @rtpbin: the object which received the signal + * @id: the session id + * + * Request the internal RTPStorage object as #GObject in session @id. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_STORAGE] = + g_signal_new ("get-internal-storage", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass, + get_internal_storage), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_OBJECT, 1, G_TYPE_UINT); + + /** + * GstRtpBin::get-storage: + * @rtpbin: the object which received the signal + * @id: the session id + * + * Request the RTPStorage element as #GObject in session @id. + * + * Since: 1.16 + */ + gst_rtp_bin_signals[SIGNAL_GET_STORAGE] = + g_signal_new ("get-storage", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass, + get_storage), NULL, NULL, g_cclosure_marshal_generic, + GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** * GstRtpBin::on-new-ssrc: * @rtpbin: the object which received the signal * @session: the session @@ -2312,6 +2333,23 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT); /** + * GstRtpBin::new-storage: + * @rtpbin: the object which received the signal + * @storage: the new storage + * @session: the session + * + * Notify that a new @storage was created for @session. + * This signal can, for example, be used to configure @storage. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_NEW_STORAGE] = + g_signal_new ("new-storage", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + new_storage), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 2, GST_TYPE_ELEMENT, G_TYPE_UINT); + + /** * GstRtpBin::request-aux-sender: * @rtpbin: the object which received the signal * @session: the session @@ -2328,6 +2366,7 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_aux_sender), _gst_element_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + /** * GstRtpBin::request-aux-receiver: * @rtpbin: the object which received the signal @@ -2345,6 +2384,43 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_aux_receiver), _gst_element_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** + * GstRtpBin::request-fec-decoder: + * @rtpbin: the object which received the signal + * @session: the session index + * + * Request a FEC decoder element for the given @session. The element + * will be added to the bin after the pt demuxer. + * + * If no handler is connected, no FEC decoder will be used. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER] = + g_signal_new ("request-fec-decoder", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + request_fec_decoder), _gst_element_accumulator, NULL, + g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** + * GstRtpBin::request-fec-encoder: + * @rtpbin: the object which received the signal + * @session: the session index + * + * Request a FEC encoder element for the given @session. The element + * will be added to the bin after the RTPSession. + * + * If no handler is connected, no FEC encoder will be used. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_ENCODER] = + g_signal_new ("request-fec-encoder", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + request_fec_encoder), _gst_element_accumulator, NULL, + g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + /** * GstRtpBin::on-new-sender-ssrc: * @rtpbin: the object which received the signal @@ -2376,29 +2452,6 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); - - /** - * GstRtpBin::on-bundled-ssrc: - * @rtpbin: the object which received the signal - * @ssrc: the bundled SSRC - * - * Notify of a new incoming bundled SSRC. If no handler is connected to the - * signal then the #GstRtpSession created for the recv_rtp_sink_\%u - * request pad will be managing this new SSRC. However if there is a handler - * connected then the application can decided to dispatch this new stream to - * another session by providing its ID as return value of the handler. This - * can be particularly useful to keep retransmission SSRCs grouped with the - * session for which they handle retransmission. - * - * Since: 1.12 - */ - gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC] = - g_signal_new ("on-bundled-ssrc", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, - on_bundled_ssrc), NULL, NULL, - g_cclosure_marshal_generic, G_TYPE_UINT, 1, G_TYPE_UINT); - - g_object_class_install_property (gobject_class, PROP_SDES, g_param_spec_boxed ("sdes", "SDES", "The SDES items of this session", @@ -2545,6 +2598,40 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) 0, G_MAXUINT, DEFAULT_MAX_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin:max-ts-offset-adjustment: + * + * Syncing time stamps to NTP time adds a time offset. This parameter + * specifies the maximum number of nanoseconds per frame that this time offset + * may be adjusted with. This is used to avoid sudden large changes to time + * stamps. + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT, + g_param_spec_uint64 ("max-ts-offset-adjustment", + "Max Timestamp Offset Adjustment", + "The maximum number of nanoseconds per frame that time stamp offsets " + "may be adjusted (0 = no limit).", 0, G_MAXUINT64, + DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + /** + * GstRtpBin:max-ts-offset: + * + * Used to set an upper limit of how large a time offset may be. This + * is used to protect against unrealistic values as a result of either + * client,server or clock issues. + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET, + g_param_spec_int64 ("max-ts-offset", "Max TS Offset", + "The maximum absolute value of the time offset in (nanoseconds). " + "Note, if the ntp-sync parameter is set the default value is " + "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); @@ -2578,6 +2665,9 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session); klass->get_internal_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session); + klass->get_storage = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_storage); + klass->get_internal_storage = + GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_storage); klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder); klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder); klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder); @@ -2591,7 +2681,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) { gchar *cname; - rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); + rtpbin->priv = gst_rtp_bin_get_instance_private (rtpbin); g_mutex_init (&rtpbin->priv->bin_lock); g_mutex_init (&rtpbin->priv->dyn_lock); @@ -2616,6 +2706,9 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC; rtpbin->max_streams = DEFAULT_MAX_STREAMS; + rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT; + rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET; + rtpbin->max_ts_offset_is_set = FALSE; /* some default SDES entries */ cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ()); @@ -2731,6 +2824,15 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, break; case PROP_NTP_SYNC: rtpbin->ntp_sync = g_value_get_boolean (value); + /* The default value of max_ts_offset depends on ntp_sync. If user + * hasn't set it then change default value */ + if (!rtpbin->max_ts_offset_is_set) { + if (rtpbin->ntp_sync) { + rtpbin->max_ts_offset = 0; + } else { + rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET; + } + } break; case PROP_RTCP_SYNC: g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value)); @@ -2840,6 +2942,15 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, case PROP_MAX_STREAMS: rtpbin->max_streams = g_value_get_uint (value); break; + case PROP_MAX_TS_OFFSET_ADJUSTMENT: + rtpbin->max_ts_offset_adjustment = g_value_get_uint64 (value); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "max-ts-offset-adjustment", value); + break; + case PROP_MAX_TS_OFFSET: + rtpbin->max_ts_offset = g_value_get_int64 (value); + rtpbin->max_ts_offset_is_set = TRUE; + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2928,6 +3039,12 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_MAX_STREAMS: g_value_set_uint (value, rtpbin->max_streams); break; + case PROP_MAX_TS_OFFSET_ADJUSTMENT: + g_value_set_uint64 (value, rtpbin->max_ts_offset_adjustment); + break; + case PROP_MAX_TS_OFFSET: + g_value_set_int64 (value, rtpbin->max_ts_offset); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3011,39 +3128,35 @@ gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message) GstStructure *caps_structure; const gchar *caps_str_media = NULL; temp_pad_src = gst_element_get_static_pad (stream->buffer, "src"); - temp_caps_src = gst_pad_get_current_caps(temp_pad_src); - GST_DEBUG_OBJECT (bin, "stream %p percent %d : temp_caps_src=%"GST_PTR_FORMAT, stream,stream->percent,temp_caps_src); - if (temp_caps_src) - { + temp_caps_src = gst_pad_get_current_caps (temp_pad_src); + GST_DEBUG_OBJECT (bin, + "stream %p percent %d : temp_caps_src=%" GST_PTR_FORMAT, + stream, stream->percent, temp_caps_src); + if (temp_caps_src) { caps_structure = gst_caps_get_structure (temp_caps_src, 0); - caps_str_media = gst_structure_get_string (caps_structure, "media"); - if (caps_str_media != NULL) - { - if ((strcmp(caps_str_media,"video") != 0)&&(strcmp(caps_str_media,"audio") != 0)) - { - GST_DEBUG_OBJECT (bin, "Non Audio/Video Stream.. ignoring the same !!"); - gst_caps_unref( temp_caps_src ); - gst_object_unref( temp_pad_src ); + caps_str_media = + gst_structure_get_string (caps_structure, "media"); + if (caps_str_media != NULL) { + if ((strcmp (caps_str_media, "video") != 0) + && (strcmp (caps_str_media, "audio") != 0)) { + GST_DEBUG_OBJECT (bin, + "Non Audio/Video Stream.. ignoring the same !!"); + gst_caps_unref (temp_caps_src); + gst_object_unref (temp_pad_src); continue; - } - else if(stream->percent >= 100) - { + } else if (stream->percent >= 100) { /* Most of the time buffering icon displays in rtsp playback. - Optimizing the buffering updation code. Whenever any stream percentage - reaches 100 do not post buffering messages.*/ - if(stream->prev_percent < 100) - { + Optimizing the buffering updation code. Whenever any stream percentage + reaches 100 do not post buffering messages. */ + if (stream->prev_percent < 100) buffering_flag = TRUE; - } else - { update_buffering_status = FALSE; - } } } - gst_caps_unref( temp_caps_src ); + gst_caps_unref (temp_caps_src); } - gst_object_unref( temp_pad_src ); + gst_object_unref (temp_pad_src); #else GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream, stream->percent); @@ -3102,18 +3215,14 @@ gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message) gst_message_unref (message); #ifdef TIZEN_FEATURE_RTSP_MODIFICATION - if (rtpbin->buffer_mode == RTP_JITTER_BUFFER_MODE_SLAVE) - { - if(update_buffering_status==FALSE) - { - break; - } - if(buffering_flag) - { - min_percent=100; - GST_DEBUG_OBJECT (bin, "forcefully change min_percent to 100!!!"); - } - } + if (rtpbin->buffer_mode == RTP_JITTER_BUFFER_MODE_SLAVE) { + if (update_buffering_status == FALSE) + break; + if (buffering_flag) { + min_percent = 100; + GST_DEBUG_OBJECT (bin, "forcefully change min_percent to 100!!!"); + } + } #endif /* make a new buffering message with the min value */ message = @@ -3297,21 +3406,46 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) return TRUE; } -/* a new pad (SSRC) was created in @session. This signal is emited from the - * payload demuxer. */ static void -new_payload_found (GstElement * element, guint pt, GstPad * pad, - GstRtpBinStream * stream) +expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream, + guint8 pt) { - GstRtpBin *rtpbin; GstElementClass *klass; GstPadTemplate *templ; gchar *padname; GstPad *gpad; - rtpbin = stream->bin; + gst_object_ref (pad); - GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt); + if (stream->session->storage) { + GstElement *fec_decoder = + session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER); + + if (fec_decoder) { + GstPad *sinkpad, *srcpad; + GstPadLinkReturn ret; + + sinkpad = gst_element_get_static_pad (fec_decoder, "sink"); + + if (!sinkpad) + goto fec_decoder_sink_failed; + + ret = gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + + if (ret != GST_PAD_LINK_OK) + goto fec_decoder_link_failed; + + srcpad = gst_element_get_static_pad (fec_decoder, "src"); + + if (!srcpad) + goto fec_decoder_src_failed; + + gst_pad_sticky_events_foreach (pad, copy_sticky_events, srcpad); + gst_object_unref (pad); + pad = srcpad; + } + } GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown); @@ -3330,15 +3464,51 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad, gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); +done: + gst_object_unref (pad); + return; shutdown: { GST_DEBUG ("ignoring, we are shutting down"); - return; + goto done; + } +fec_decoder_sink_failed: + { + g_warning ("rtpbin: failed to get fec encoder sink pad for session %u", + stream->session->id); + goto done; + } +fec_decoder_src_failed: + { + g_warning ("rtpbin: failed to get fec encoder src pad for session %u", + stream->session->id); + goto done; + } +fec_decoder_link_failed: + { + g_warning ("rtpbin: failed to link fec decoder for session %u", + stream->session->id); + goto done; } } +/* a new pad (SSRC) was created in @session. This signal is emited from the + * payload demuxer. */ +static void +new_payload_found (GstElement * element, guint pt, GstPad * pad, + GstRtpBinStream * stream) +{ + GstRtpBin *rtpbin; + + rtpbin = stream->bin; + + GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt); + + expose_recv_src_pad (rtpbin, pad, stream, pt); +} + static void payload_pad_removed (GstElement * element, GstPad * pad, GstRtpBinStream * stream) @@ -3385,6 +3555,36 @@ no_caps: } } +static GstCaps * +ptdemux_pt_map_requested (GstElement * element, guint pt, + GstRtpBinSession * session) +{ + GstCaps *ret = pt_map_requested (element, pt, session); + + if (ret && gst_caps_get_size (ret) == 1) { + const GstStructure *s = gst_caps_get_structure (ret, 0); + gboolean is_fec; + + if (gst_structure_get_boolean (s, "is-fec", &is_fec) && is_fec) { + GValue v = G_VALUE_INIT; + GValue v2 = G_VALUE_INIT; + + GST_INFO_OBJECT (session->bin, "Will ignore FEC pt %u in session %u", pt, + session->id); + g_value_init (&v, GST_TYPE_ARRAY); + g_value_init (&v2, G_TYPE_INT); + g_object_get_property (G_OBJECT (element), "ignored-payload-types", &v); + g_value_set_int (&v2, pt); + gst_value_array_append_value (&v, &v2); + g_value_unset (&v2); + g_object_set_property (G_OBJECT (element), "ignored-payload-types", &v); + g_value_unset (&v); + } + } + + return ret; +} + static void payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session) { @@ -3396,7 +3596,7 @@ payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session) 0, session->id, pt); } -/* emited when caps changed for the session */ +/* emitted when caps changed for the session */ static void caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session) { @@ -3484,41 +3684,31 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, stream->demux_padremoved_sig = g_signal_connect (stream->demux, "pad-removed", (GCallback) payload_pad_removed, stream); - /* connect to the request-pt-map signal. This signal will be emited by the + /* connect to the request-pt-map signal. This signal will be emitted by the * demuxer so that it can apply a proper caps on the buffers for the * depayloaders. */ stream->demux_ptreq_sig = g_signal_connect (stream->demux, - "request-pt-map", (GCallback) pt_map_requested, session); + "request-pt-map", (GCallback) ptdemux_pt_map_requested, session); /* connect to the signal so it can be forwarded. */ stream->demux_ptchange_sig = g_signal_connect (stream->demux, "payload-type-change", (GCallback) payload_type_change, session); + + GST_RTP_SESSION_UNLOCK (session); + GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); } else { /* add rtpjitterbuffer src pad to pads */ - GstElementClass *klass; - GstPadTemplate *templ; - gchar *padname; - GstPad *gpad, *pad; + GstPad *pad; pad = gst_element_get_static_pad (stream->buffer, "src"); - /* ghost the pad to the parent */ - klass = GST_ELEMENT_GET_CLASS (rtpbin); - templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u"); - padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u", - stream->session->id, stream->ssrc, 255); - gpad = gst_ghost_pad_new_from_template (padname, pad, templ); - g_free (padname); + GST_RTP_SESSION_UNLOCK (session); + GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); - gst_pad_set_active (gpad, TRUE); - gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad); - gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); + expose_recv_src_pad (rtpbin, pad, stream, 255); gst_object_unref (pad); } - GST_RTP_SESSION_UNLOCK (session); - GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); - return; /* ERRORS */ @@ -3536,39 +3726,11 @@ no_stream: } } -static void -session_maybe_create_bundle_demuxer (GstRtpBinSession * session) -{ - GstRtpBin *rtpbin; - - if (session->bundle_demux) - return; - - rtpbin = session->bin; - if (g_signal_has_handler_pending (rtpbin, - gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, TRUE)) { - GST_DEBUG_OBJECT (rtpbin, "Adding a bundle SSRC demuxer to session %u", - session->id); - session->bundle_demux = gst_element_factory_make ("rtpssrcdemux", NULL); - session->bundle_demux_newpad_sig = g_signal_connect (session->bundle_demux, - "new-ssrc-pad", (GCallback) new_bundled_ssrc_pad_found, session); - - gst_bin_add (GST_BIN_CAST (rtpbin), session->bundle_demux); - gst_element_sync_state_with_parent (session->bundle_demux); - } else { - GST_DEBUG_OBJECT (rtpbin, - "No handler for the on-bundled-ssrc signal so no need for a bundle SSRC demuxer in session %u", - session->id); - } -} - static GstPad * -complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session, - gboolean bundle_demuxer_needed) +complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) { guint sessid = session->id; GstPad *recv_rtp_sink; - GstPad *funnel_src; GstElement *decoder; g_assert (!session->recv_rtp_sink); @@ -3582,9 +3744,6 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session, g_signal_connect (session->recv_rtp_sink, "notify::caps", (GCallback) caps_changed, session); - if (bundle_demuxer_needed) - session_maybe_create_bundle_demuxer (session); - GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder"); decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER); if (decoder) { @@ -3602,14 +3761,8 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session, if (decsrc == NULL) goto dec_src_failed; - if (session->bundle_demux) { - GstPad *demux_sink; - demux_sink = gst_element_get_static_pad (session->bundle_demux, "sink"); - ret = gst_pad_link (decsrc, demux_sink); - gst_object_unref (demux_sink); - } else { - ret = gst_pad_link (decsrc, session->recv_rtp_sink); - } + ret = gst_pad_link (decsrc, session->recv_rtp_sink); + gst_object_unref (decsrc); if (ret != GST_PAD_LINK_OK) @@ -3617,19 +3770,9 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session, } else { GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given"); - if (session->bundle_demux) { - recv_rtp_sink = - gst_element_get_static_pad (session->bundle_demux, "sink"); - } else { - recv_rtp_sink = - gst_element_get_request_pad (session->rtp_funnel, "sink_%u"); - } + recv_rtp_sink = gst_object_ref (session->recv_rtp_sink); } - funnel_src = gst_element_get_static_pad (session->rtp_funnel, "src"); - gst_pad_link (funnel_src, session->recv_rtp_sink); - gst_object_unref (funnel_src); - return recv_rtp_sink; /* ERRORS */ @@ -3671,8 +3814,7 @@ complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, if (session->recv_rtp_src == NULL) goto pad_failed; - /* find out if we need AUX elements or if we can go into the SSRC demuxer - * directly */ + /* find out if we need AUX elements */ aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER); if (aux) { gchar *pname; @@ -3692,8 +3834,7 @@ complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, if (ret != GST_PAD_LINK_OK) goto aux_link_failed; - /* this can be NULL when this AUX element is not to be linked to - * an SSRC demuxer */ + /* this can be NULL when this AUX element is not to be linked any further */ pname = g_strdup_printf ("src_%u", sessid); recv_rtp_src = gst_element_get_static_pad (aux, pname); g_free (pname); @@ -3701,6 +3842,22 @@ complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, recv_rtp_src = gst_object_ref (session->recv_rtp_src); } + /* Add a storage element if needed */ + if (recv_rtp_src && session->storage) { + GstPadLinkReturn ret; + GstPad *sinkpad = gst_element_get_static_pad (session->storage, "sink"); + + ret = gst_pad_link (recv_rtp_src, sinkpad); + + gst_object_unref (sinkpad); + gst_object_unref (recv_rtp_src); + + if (ret != GST_PAD_LINK_OK) + goto storage_link_failed; + + recv_rtp_src = gst_element_get_static_pad (session->storage, "src"); + } + if (recv_rtp_src) { GstPad *sinkdpad; @@ -3735,6 +3892,11 @@ aux_link_failed: g_warning ("rtpbin: failed to link AUX pad to session %u", sessid); return; } +storage_link_failed: + { + g_warning ("rtpbin: failed to link storage"); + return; + } } /* Create a pad for receiving RTP for the session in @name. Must be called with @@ -3768,11 +3930,10 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) return session->recv_rtp_sink_ghost; /* setup the session sink pad */ - recv_rtp_sink = complete_session_sink (rtpbin, session, TRUE); + recv_rtp_sink = complete_session_sink (rtpbin, session); if (!recv_rtp_sink) goto session_sink_failed; - GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); session->recv_rtp_sink_ghost = gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ); @@ -3813,11 +3974,6 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig); session->demux_padremoved_sig = 0; } - if (session->bundle_demux_newpad_sig) { - g_signal_handler_disconnect (session->bundle_demux, - session->bundle_demux_newpad_sig); - session->bundle_demux_newpad_sig = 0; - } if (session->recv_rtp_src) { gst_object_unref (session->recv_rtp_src); session->recv_rtp_src = NULL; @@ -3837,12 +3993,11 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) static GstPad * complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, - guint sessid, gboolean bundle_demuxer_needed) + guint sessid) { GstElement *decoder; GstPad *sinkdpad; GstPad *decsink = NULL; - GstPad *funnel_src; /* get recv_rtp pad and store */ GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); @@ -3851,9 +4006,6 @@ complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, if (session->recv_rtcp_sink == NULL) goto pad_failed; - if (bundle_demuxer_needed) - session_maybe_create_bundle_demuxer (session); - GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder"); decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER); if (decoder) { @@ -3870,26 +4022,15 @@ complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, if (decsrc == NULL) goto dec_src_failed; - if (session->bundle_demux) { - GstPad *demux_sink; - demux_sink = - gst_element_get_static_pad (session->bundle_demux, "rtcp_sink"); - ret = gst_pad_link (decsrc, demux_sink); - gst_object_unref (demux_sink); - } else { - ret = gst_pad_link (decsrc, session->recv_rtcp_sink); - } + ret = gst_pad_link (decsrc, session->recv_rtcp_sink); + gst_object_unref (decsrc); if (ret != GST_PAD_LINK_OK) goto dec_link_failed; } else { GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given"); - if (session->bundle_demux) { - decsink = gst_element_get_static_pad (session->bundle_demux, "rtcp_sink"); - } else { - decsink = gst_element_get_request_pad (session->rtcp_funnel, "sink_%u"); - } + decsink = gst_object_ref (session->recv_rtcp_sink); } /* get srcpad, link to SSRCDemux */ @@ -3903,10 +4044,6 @@ complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING); gst_object_unref (sinkdpad); - funnel_src = gst_element_get_static_pad (session->rtcp_funnel, "src"); - gst_pad_link (funnel_src, session->recv_rtcp_sink); - gst_object_unref (funnel_src); - return decsink; pad_failed: @@ -3970,7 +4107,7 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink_ghost != NULL) return session->recv_rtcp_sink_ghost; - decsink = complete_session_rtcp (rtpbin, session, sessid, TRUE); + decsink = complete_session_rtcp (rtpbin, session, sessid); if (!decsink) goto create_error; @@ -4026,11 +4163,12 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) GstElement *encoder; GstElementClass *klass; GstPadTemplate *templ; + gboolean ret = FALSE; /* get srcpad */ - session->send_rtp_src = - gst_element_get_static_pad (session->session, "send_rtp_src"); - if (session->send_rtp_src == NULL) + send_rtp_src = gst_element_get_static_pad (session->session, "send_rtp_src"); + + if (send_rtp_src == NULL) goto no_srcpad; GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder"); @@ -4048,22 +4186,22 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) if (encsrc == NULL) goto enc_src_failed; - send_rtp_src = encsrc; - ename = g_strdup_printf ("rtp_sink_%u", sessid); encsink = gst_element_get_static_pad (encoder, ename); g_free (ename); if (encsink == NULL) goto enc_sink_failed; - ret = gst_pad_link (session->send_rtp_src, encsink); + ret = gst_pad_link (send_rtp_src, encsink); gst_object_unref (encsink); + gst_object_unref (send_rtp_src); + + send_rtp_src = encsrc; if (ret != GST_PAD_LINK_OK) goto enc_link_failed; } else { GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given"); - send_rtp_src = gst_object_ref (session->send_rtp_src); } /* ghost the new source pad */ @@ -4072,37 +4210,43 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u"); session->send_rtp_src_ghost = gst_ghost_pad_new_from_template (gname, send_rtp_src, templ); - gst_object_unref (send_rtp_src); gst_pad_set_active (session->send_rtp_src_ghost, TRUE); gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events, session->send_rtp_src_ghost); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost); g_free (gname); - return TRUE; + ret = TRUE; + +done: + if (send_rtp_src) + gst_object_unref (send_rtp_src); + + return ret; /* ERRORS */ no_srcpad: { g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid); - return FALSE; + goto done; } enc_src_failed: { - g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid); - return FALSE; + g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT + " src pad for session %u", encoder, sessid); + goto done; } enc_sink_failed: { - g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid); - gst_object_unref (send_rtp_src); - return FALSE; + g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT + " sink pad for session %u", encoder, sessid); + goto done; } enc_link_failed: { - g_warning ("rtpbin: failed to link rtp encoder for session %u", sessid); - gst_object_unref (send_rtp_src); - return FALSE; + g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u", + encoder, sessid); + goto done; } } @@ -4162,8 +4306,9 @@ create_error: } existing_session: { - g_warning ("rtpbin: session %u is already a sender", sessid); - return FALSE; + GST_DEBUG_OBJECT (rtpbin, + "skipping src_%i setup, since it is already configured.", sessid); + return TRUE; } pad_failed: { @@ -4207,6 +4352,8 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) guint sessid; GstPad *send_rtp_sink; GstElement *aux; + GstElement *encoder; + GstElement *prev = NULL; GstRtpBinSession *session; /* first get the session number */ @@ -4230,19 +4377,47 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink != NULL) goto existing_session; + encoder = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER); + + if (encoder) { + GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder"); + + send_rtp_sink = gst_element_get_static_pad (encoder, "sink"); + + if (!send_rtp_sink) + goto enc_sink_failed; + + prev = encoder; + } + GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender"); aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER); if (aux) { + GstPad *sinkpad; GST_DEBUG_OBJECT (rtpbin, "linking AUX sender"); if (!setup_aux_sender (rtpbin, session, aux)) goto aux_session_failed; pname = g_strdup_printf ("sink_%u", sessid); - send_rtp_sink = gst_element_get_static_pad (aux, pname); + sinkpad = gst_element_get_static_pad (aux, pname); g_free (pname); - if (send_rtp_sink == NULL) + if (sinkpad == NULL) goto aux_sink_failed; + + if (!prev) { + send_rtp_sink = sinkpad; + } else { + GstPad *srcpad = gst_element_get_static_pad (prev, "src"); + GstPadLinkReturn ret; + + ret = gst_pad_link (srcpad, sinkpad); + gst_object_unref (srcpad); + if (ret != GST_PAD_LINK_OK) { + goto aux_link_failed; + } + } + prev = aux; } else { /* get send_rtp pad and store */ session->send_rtp_sink = @@ -4253,7 +4428,17 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (!complete_session_src (rtpbin, session)) goto session_src_failed; - send_rtp_sink = gst_object_ref (session->send_rtp_sink); + if (!prev) { + send_rtp_sink = gst_object_ref (session->send_rtp_sink); + } else { + GstPad *srcpad = gst_element_get_static_pad (prev, "src"); + GstPadLinkReturn ret; + + ret = gst_pad_link (srcpad, session->send_rtp_sink); + gst_object_unref (srcpad); + if (ret != GST_PAD_LINK_OK) + goto session_link_failed; + } } session->send_rtp_sink_ghost = @@ -4290,6 +4475,12 @@ aux_sink_failed: g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); return NULL; } +aux_link_failed: + { + g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u", + aux, sessid); + return NULL; + } pad_failed: { g_warning ("rtpbin: failed to get session pad for session %u", sessid); @@ -4300,6 +4491,18 @@ session_src_failed: g_warning ("rtpbin: failed to setup source pads for session %u", sessid); return NULL; } +session_link_failed: + { + g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u", + session, sessid); + return NULL; + } +enc_sink_failed: + { + g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT + " sink pad for session %u", encoder, sessid); + return NULL; + } } static void @@ -4311,10 +4514,6 @@ remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) session->send_rtp_src_ghost); session->send_rtp_src_ghost = NULL; } - if (session->send_rtp_src) { - gst_object_unref (session->send_rtp_src); - session->send_rtp_src = NULL; - } if (session->send_rtp_sink) { gst_element_release_request_pad (GST_ELEMENT_CAST (session->session), session->send_rtp_sink); @@ -4333,7 +4532,8 @@ remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) * RTP_BIN_LOCK. */ static GstPad * -create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, + const gchar * name) { guint sessid; GstPad *encsrc; @@ -4346,8 +4546,13 @@ create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) /* get or create session */ session = find_session_by_id (rtpbin, sessid); - if (!session) - goto no_session; + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } /* check if pad was requested */ if (session->send_rtcp_src_ghost != NULL) @@ -4404,9 +4609,9 @@ no_name: g_warning ("rtpbin: invalid name given"); return NULL; } -no_session: +create_error: { - g_warning ("rtpbin: session with id %d does not exist", sessid); + /* create_session already warned */ return NULL; } pad_failed: @@ -4548,7 +4753,7 @@ gst_rtp_bin_request_new_pad (GstElement * element, result = create_send_rtp (rtpbin, templ, pad_name); } else if (templ == gst_element_class_get_pad_template (klass, "send_rtcp_src_%u")) { - result = create_rtcp (rtpbin, templ, pad_name); + result = create_send_rtcp (rtpbin, templ, pad_name); } else goto wrong_template;