X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Fgstrtpbin.c;h=ad8272c0a006300d3538869eb0063f492379ed3b;hb=0ecc52c2ee7de6745b6a9422e4f20647a7783636;hp=22efef2d4341de0e3353c8283b4784cc7ae42870;hpb=ddb0b9c42257b5282ad153614efa1fd033131679;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 22efef2..ad8272c 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -67,7 +67,7 @@ * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders * and decoders in order to support SRTP. The encoders must provide the pads - * rtp_sink_\%d and rtp_src_\%d for RTP and rtcp_sink_\%d and rtcp_src_\%d for + * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for * RTCP. The session number will be used in the pad name. The decoders must provide * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will * be placed before the #GstRtpSession element, thus they must support SSRC demuxing @@ -80,7 +80,7 @@ * sink_\%u pad that matches the sessionid in the signal and it should have 1 or * more src_\%u pads. For each src_%\u pad, a session will be made (if needed) * and the pad will be linked to the session send_rtp_sink pad. Each session will - * then expose its source pad ad send_rtp_src_\%u on #GstRtpBin. + * then expose its source pad as send_rtp_src_\%u on #GstRtpBin. * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin. @@ -132,8 +132,6 @@ * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1 * on port 5007. * - * - * Last reviewed on 2007-08-30 (0.10.6) */ #ifdef HAVE_CONFIG_H @@ -199,9 +197,6 @@ static GstStaticPadTemplate rtpbin_send_rtp_src_template = GST_STATIC_CAPS ("application/x-rtp;application/x-srtp") ); -#define GST_RTP_BIN_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) - #define GST_RTP_BIN_LOCK(bin) g_mutex_lock (&(bin)->priv->bin_lock) #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock) @@ -225,6 +220,10 @@ G_STMT_START { \ #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin) \ GST_RTP_BIN_DYN_UNLOCK (bin); \ +/* Minimum time offset to apply. This compensates for rounding errors in NTP to + * RTP timestamp conversions */ +#define MIN_TS_OFFSET (4 * GST_MSECOND) + struct _GstRtpBinPrivate { GMutex bin_lock; @@ -237,8 +236,8 @@ struct _GstRtpBinPrivate gboolean autoremove; - /* UNIX (ntp) time of last SR sync used */ - guint64 last_unix; + /* NTP time in ns of last SR sync used */ + guint64 last_ntpnstime; /* list of extra elements */ GList *elements; @@ -251,7 +250,10 @@ enum SIGNAL_PAYLOAD_TYPE_CHANGE, SIGNAL_CLEAR_PT_MAP, SIGNAL_RESET_SYNC, + SIGNAL_GET_SESSION, SIGNAL_GET_INTERNAL_SESSION, + SIGNAL_GET_STORAGE, + SIGNAL_GET_INTERNAL_STORAGE, SIGNAL_ON_NEW_SSRC, SIGNAL_ON_SSRC_COLLISION, @@ -269,11 +271,20 @@ enum SIGNAL_REQUEST_RTCP_ENCODER, SIGNAL_REQUEST_RTCP_DECODER, + SIGNAL_REQUEST_FEC_DECODER, + SIGNAL_REQUEST_FEC_ENCODER, + SIGNAL_NEW_JITTERBUFFER, + SIGNAL_NEW_STORAGE, SIGNAL_REQUEST_AUX_SENDER, SIGNAL_REQUEST_AUX_RECEIVER, + SIGNAL_ON_NEW_SENDER_SSRC, + SIGNAL_ON_SENDER_SSRC_ACTIVE, + + SIGNAL_ON_BUNDLED_SSRC, + LAST_SIGNAL }; @@ -290,6 +301,16 @@ enum #define DEFAULT_RTCP_SYNC_INTERVAL 0 #define DEFAULT_DO_SYNC_EVENT FALSE #define DEFAULT_DO_RETRANSMISSION FALSE +#define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP +#define DEFAULT_NTP_TIME_SOURCE GST_RTP_NTP_TIME_SOURCE_NTP +#define DEFAULT_RTCP_SYNC_SEND_TIME TRUE +#define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000 +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_RFC7273_SYNC FALSE +#define DEFAULT_MAX_STREAMS G_MAXUINT +#define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0) +#define DEFAULT_MAX_TS_OFFSET G_GINT64_CONSTANT(3000000000) enum { @@ -307,14 +328,16 @@ enum PROP_USE_PIPELINE_CLOCK, PROP_DO_SYNC_EVENT, PROP_DO_RETRANSMISSION, - PROP_LAST -}; - -enum -{ - GST_RTP_BIN_RTCP_SYNC_ALWAYS, - GST_RTP_BIN_RTCP_SYNC_INITIAL, - GST_RTP_BIN_RTCP_SYNC_RTP + PROP_RTP_PROFILE, + PROP_NTP_TIME_SOURCE, + PROP_RTCP_SYNC_SEND_TIME, + PROP_MAX_RTCP_RTP_TIME_DIFF, + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, + PROP_RFC7273_SYNC, + PROP_MAX_STREAMS, + PROP_MAX_TS_OFFSET_ADJUSTMENT, + PROP_MAX_TS_OFFSET, }; #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) @@ -352,6 +375,14 @@ static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void free_client (GstRtpBinClient * client, GstRtpBin * bin); static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin); +static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id); +static GstPad *complete_session_sink (GstRtpBin * rtpbin, + GstRtpBinSession * session); +static void +complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid); +static GstPad *complete_session_rtcp (GstRtpBin * rtpbin, + GstRtpBinSession * session, guint sessid); /* Manages the RTP stream for one SSRC. * @@ -404,6 +435,9 @@ struct _GstRtpBinStream * there they are pushed into an SSRC demuxer that splits the stream based on * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with * the GstRtpBinStream above). + * + * Before the SSRC demuxer, a storage element may be inserted for the purpose + * of Forward Error Correction. */ struct _GstRtpBinSession { @@ -418,6 +452,9 @@ struct _GstRtpBinSession gulong demux_newpad_sig; gulong demux_padremoved_sig; + /* Fec support */ + GstElement *storage; + GMutex lock; /* list of GstRtpBinStream */ @@ -438,7 +475,6 @@ struct _GstRtpBinSession GstPad *sync_src; GstPad *send_rtp_sink; GstPad *send_rtp_sink_ghost; - GstPad *send_rtp_src; GstPad *send_rtp_src_ghost; GstPad *send_rtcp_src; GstPad *send_rtcp_src_ghost; @@ -567,6 +603,21 @@ on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream) stream->session->id, stream->ssrc); } +static void +on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0, + sess->id, ssrc); +} + +static void +on_sender_ssrc_active (GstElement * session, guint32 ssrc, + GstRtpBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], + 0, sess->id, ssrc); +} + /* must be called with the SESSION lock */ static GstRtpBinStream * find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc) @@ -610,6 +661,7 @@ create_session (GstRtpBin * rtpbin, gint id) { GstRtpBinSession *sess; GstElement *session, *demux; + GstElement *storage = NULL; GstState target; if (!(session = gst_element_factory_make ("rtpsession", NULL))) @@ -618,20 +670,40 @@ create_session (GstRtpBin * rtpbin, gint id) if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) goto no_demux; + if (!(storage = gst_element_factory_make ("rtpstorage", NULL))) + goto no_storage; + + /* need to sink the storage or otherwise signal handlers from bindings will + * take ownership of it and we don't own it anymore */ + gst_object_ref_sink (storage); + g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_STORAGE], 0, storage, + id); + sess = g_new0 (GstRtpBinSession, 1); g_mutex_init (&sess->lock); sess->id = id; sess->bin = rtpbin; sess->session = session; sess->demux = demux; + sess->storage = storage; + sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); /* configure SDES items */ GST_OBJECT_LOCK (rtpbin); - g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock", - rtpbin->use_pipeline_clock, NULL); + g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile", + rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, + NULL); + if (rtpbin->use_pipeline_clock) + g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock, + NULL); + else + g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL); + + g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time, + "max-misorder-time", rtpbin->max_misorder_time, NULL); GST_OBJECT_UNLOCK (rtpbin); /* provide clock_rate to the session manager when needed */ @@ -655,9 +727,18 @@ create_session (GstRtpBin * rtpbin, gint id) g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess); g_signal_connect (sess->session, "on-sender-timeout", (GCallback) on_sender_timeout, sess); + g_signal_connect (sess->session, "on-new-sender-ssrc", + (GCallback) on_new_sender_ssrc, sess); + g_signal_connect (sess->session, "on-sender-ssrc-active", + (GCallback) on_sender_ssrc_active, sess); gst_bin_add (GST_BIN_CAST (rtpbin), session); gst_bin_add (GST_BIN_CAST (rtpbin), demux); + gst_bin_add (GST_BIN_CAST (rtpbin), storage); + + /* unref the storage again, the bin has a reference now and + * we don't need it anymore */ + gst_object_unref (storage); GST_OBJECT_LOCK (rtpbin); target = GST_STATE_TARGET (rtpbin); @@ -666,6 +747,7 @@ create_session (GstRtpBin * rtpbin, gint id) /* change state only to what's needed */ gst_element_set_state (demux, target); gst_element_set_state (session, target); + gst_element_set_state (storage, target); return sess; @@ -681,6 +763,13 @@ no_demux: g_warning ("rtpbin: could not create rtpssrcdemux element"); return NULL; } +no_storage: + { + gst_object_unref (session); + gst_object_unref (demux); + g_warning ("rtpbin: could not create rtpstorage element"); + return NULL; + } } static gboolean @@ -692,6 +781,10 @@ bin_manage_element (GstRtpBin * bin, GstElement * element) GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element); } else { GST_DEBUG_OBJECT (bin, "adding requested element %p", element); + + if (g_object_is_floating (element)) + element = gst_object_ref_sink (element); + if (!gst_bin_add (GST_BIN_CAST (bin), element)) goto add_failed; if (!gst_element_sync_state_with_parent (element)) @@ -707,6 +800,7 @@ bin_manage_element (GstRtpBin * bin, GstElement * element) add_failed: { GST_WARNING_OBJECT (bin, "unable to add element"); + gst_object_unref (element); return FALSE; } } @@ -721,10 +815,13 @@ remove_bin_element (GstElement * element, GstRtpBin * bin) if (find) { priv->elements = g_list_delete_link (priv->elements, find); - if (!g_list_find (priv->elements, element)) + if (!g_list_find (priv->elements, element)) { + gst_element_set_locked_state (element, TRUE); gst_bin_remove (GST_BIN_CAST (bin), element); - else - gst_object_unref (element); + gst_element_set_state (element, GST_STATE_NULL); + } + + gst_object_unref (element); } } @@ -769,7 +866,7 @@ get_pt_map (GstRtpBinSession * session, guint pt) GValue ret = { 0 }; GValue args[3] = { {0}, {0}, {0} }; - GST_DEBUG ("searching pt %d in cache", pt); + GST_DEBUG ("searching pt %u in cache", pt); GST_RTP_SESSION_LOCK (session); @@ -782,7 +879,7 @@ get_pt_map (GstRtpBinSession * session, guint pt) bin = session->bin; - GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id); + GST_DEBUG ("emiting signal for pt %u in session %u", pt, session->id); /* not in cache, send signal to request caps */ g_value_init (&args[0], GST_TYPE_ELEMENT); @@ -818,7 +915,7 @@ get_pt_map (GstRtpBinSession * session, guint pt) if (!caps) goto no_caps; - GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps); + GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps); /* store in cache, take additional ref */ g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt), @@ -902,6 +999,23 @@ gst_rtp_bin_clear_pt_map (GstRtpBin * bin) gst_rtp_bin_reset_sync (bin); } +static GstElement * +gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id) +{ + GstRtpBinSession *session; + GstElement *ret = NULL; + + GST_RTP_BIN_LOCK (bin); + GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id); + session = find_session_by_id (bin, (gint) session_id); + if (session) { + ret = gst_object_ref (session->session); + } + GST_RTP_BIN_UNLOCK (bin); + + return ret; +} + static RTPSession * gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id) { @@ -909,7 +1023,7 @@ gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id) GstRtpBinSession *session; GST_RTP_BIN_LOCK (bin); - GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d", + GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u", session_id); session = find_session_by_id (bin, (gint) session_id); if (session) { @@ -922,6 +1036,43 @@ gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id) } static GstElement * +gst_rtp_bin_get_storage (GstRtpBin * bin, guint session_id) +{ + GstRtpBinSession *session; + GstElement *res = NULL; + + GST_RTP_BIN_LOCK (bin); + GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u", + session_id); + session = find_session_by_id (bin, (gint) session_id); + if (session && session->storage) { + res = gst_object_ref (session->storage); + } + GST_RTP_BIN_UNLOCK (bin); + + return res; +} + +static GObject * +gst_rtp_bin_get_internal_storage (GstRtpBin * bin, guint session_id) +{ + GObject *internal_storage = NULL; + GstRtpBinSession *session; + + GST_RTP_BIN_LOCK (bin); + GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u", + session_id); + session = find_session_by_id (bin, (gint) session_id); + if (session && session->storage) { + g_object_get (session->storage, "internal-storage", &internal_storage, + NULL); + } + GST_RTP_BIN_UNLOCK (bin); + + return internal_storage; +} + +static GstElement * gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id) { GST_DEBUG_OBJECT (bin, "return NULL encoder"); @@ -956,6 +1107,21 @@ gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin, GST_RTP_BIN_UNLOCK (bin); } +static void +gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin, + const gchar * name, const GValue * value) +{ + GSList *sessions; + + GST_RTP_BIN_LOCK (bin); + for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) { + GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data; + + g_object_set_property (G_OBJECT (sess->session), name, value); + } + GST_RTP_BIN_UNLOCK (bin); +} + /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */ static GstRtpBinClient * get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created) @@ -1002,7 +1168,7 @@ static void get_current_times (GstRtpBin * bin, GstClockTime * running_time, guint64 * ntpnstime) { - guint64 ntpns; + guint64 ntpns = -1; GstClock *clock; GstClockTime base_time, rt, clock_time; @@ -1012,24 +1178,42 @@ get_current_times (GstRtpBin * bin, GstClockTime * running_time, gst_object_ref (clock); GST_OBJECT_UNLOCK (bin); + /* get current clock time and convert to running time */ clock_time = gst_clock_get_time (clock); + rt = clock_time - base_time; if (bin->use_pipeline_clock) { - ntpns = clock_time - base_time; + ntpns = rt; + /* add constant to convert from 1970 based time to 1900 based time */ + ntpns += (2208988800LL * GST_SECOND); } else { - GTimeVal current; - - /* get current NTP time */ - g_get_current_time (¤t); - ntpns = GST_TIMEVAL_TO_TIME (current); + switch (bin->ntp_time_source) { + case GST_RTP_NTP_TIME_SOURCE_NTP: + case GST_RTP_NTP_TIME_SOURCE_UNIX:{ + GTimeVal current; + + /* get current NTP time */ + g_get_current_time (¤t); + ntpns = GST_TIMEVAL_TO_TIME (current); + + /* add constant to convert from 1970 based time to 1900 based time */ + if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP) + ntpns += (2208988800LL * GST_SECOND); + break; + } + case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME: + ntpns = rt; + break; + case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME: + ntpns = clock_time; + break; + default: + ntpns = -1; /* Fix uninited compiler warning */ + g_assert_not_reached (); + break; + } } - /* add constant to convert from 1970 based time to 1900 based time */ - ntpns += (2208988800LL * GST_SECOND); - - /* get current clock time and convert to running time */ - rt = clock_time - base_time; - gst_object_unref (clock); } else { GST_OBJECT_UNLOCK (bin); @@ -1044,7 +1228,8 @@ get_current_times (GstRtpBin * bin, GstClockTime * running_time, static void stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream, - gint64 ts_offset, gboolean check) + gint64 ts_offset, gint64 max_ts_offset, gint64 min_ts_offset, + gboolean allow_positive_ts_offset) { gint64 prev_ts_offset; @@ -1060,19 +1245,25 @@ stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream, "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff); - if (check) { - /* only change diff when it changed more than 4 milliseconds. This - * compensates for rounding errors in NTP to RTP timestamp - * conversions */ - if (ABS (diff) < 4 * GST_MSECOND) { - GST_DEBUG_OBJECT (bin, "offset too small, ignoring"); + /* ignore minor offsets */ + if (ABS (diff) < min_ts_offset) { + GST_DEBUG_OBJECT (bin, "offset too small, ignoring"); + return; + } + + /* sanity check offset */ + if (max_ts_offset > 0) { + if (ts_offset > 0 && !allow_positive_ts_offset) { + GST_DEBUG_OBJECT (bin, + "offset is positive (clocks are out of sync), ignoring"); return; } - if (ABS (diff) > (3 * GST_SECOND)) { - GST_WARNING_OBJECT (bin, "offset unusually large, ignoring"); + if (ABS (ts_offset) > max_ts_offset) { + GST_DEBUG_OBJECT (bin, "offset too large, ignoring"); return; } } + g_object_set (stream->buffer, "ts-offset", ts_offset, NULL); } GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, @@ -1110,12 +1301,8 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, GstRtpBinClient *client; gboolean created; GSList *walk; - guint64 local_rt; - guint64 local_rtp; - GstClockTime running_time; + GstClockTime running_time, running_time_rtp; guint64 ntpnstime; - gint64 ntpdiff, rtdiff; - guint64 last_unix; /* first find or create the CNAME */ client = get_client (bin, len, data, &created); @@ -1157,51 +1344,59 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, * local rtptime. The local rtp time is used to construct timestamps on the * buffers so we will calculate what running_time corresponds to the RTP * timestamp in the SR packet. */ - local_rtp = last_extrtptime - base_rtptime; + running_time_rtp = last_extrtptime - base_rtptime; GST_DEBUG_OBJECT (bin, "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, " "clock-base %" G_GINT64_FORMAT, base_rtptime, - last_extrtptime, local_rtp, clock_rate, rtp_clock_base); + last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base); /* calculate local RTP time in gstreamer timestamp, we essentially perform the * same conversion that a jitterbuffer would use to convert an rtp timestamp * into a corresponding gstreamer timestamp. Note that the base_time also * contains the drift between sender and receiver. */ - local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate); - local_rt += base_time; + running_time = + gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate); + running_time += base_time; - /* convert ntptime to unix time since 1900 */ - last_unix = gst_util_uint64_scale (ntptime, GST_SECOND, + /* convert ntptime to nanoseconds */ + ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND, (G_GINT64_CONSTANT (1) << 32)); stream->have_sync = TRUE; GST_DEBUG_OBJECT (bin, - "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT, - local_rt, last_unix); + "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT, + running_time, ntpnstime); /* recalc inter stream playout offset, but only if there is more than one * stream or we're doing NTP sync. */ if (bin->ntp_sync) { + gint64 ntpdiff, rtdiff; + guint64 local_ntpnstime; + GstClockTime local_running_time; + /* For NTP sync we need to first get a snapshot of running_time and NTP * time. We know at what running_time we play a certain RTP time, we also * calculated when we would play the RTP time in the SR packet. Now we need * to know how the running_time and the NTP time relate to eachother. */ - get_current_times (bin, &running_time, &ntpnstime); + get_current_times (bin, &local_running_time, &local_ntpnstime); /* see how far away the NTP time is. This is the difference between the * current NTP time and the NTP time in the last SR packet. */ - ntpdiff = ntpnstime - last_unix; + ntpdiff = local_ntpnstime - ntpnstime; /* see how far away the running_time is. This is the difference between the * current running_time and the running_time of the RTP timestamp in the * last SR packet. */ - rtdiff = running_time - local_rt; + rtdiff = local_running_time - running_time; GST_DEBUG_OBJECT (bin, - "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT, - ntpnstime, last_unix); + "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT, + local_ntpnstime, ntpnstime); + GST_DEBUG_OBJECT (bin, + "local running time %" G_GUINT64_FORMAT ", SR RTP running time %" + G_GUINT64_FORMAT, local_running_time, running_time); GST_DEBUG_OBJECT (bin, "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff, rtdiff); @@ -1209,18 +1404,19 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, /* combine to get the final diff to apply to the running_time */ stream->rt_delta = rtdiff - ntpdiff; - stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE); + stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset, + 0, FALSE); } else { gint64 min, rtp_min, clock_base = stream->clock_base; gboolean all_sync, use_rtp; gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync); - /* calculate delta between server and receiver. last_unix is created by + /* calculate delta between server and receiver. ntpnstime is created by * converting the ntptime in the last SR packet to a gstreamer timestamp. This * delta expresses the difference to our timeline and the server timeline. The * difference in itself doesn't mean much but we can combine the delta of * multiple streams to create a stream specific offset. */ - stream->rt_delta = last_unix - local_rt; + stream->rt_delta = ntpnstime - running_time; /* calculate the min of all deltas, ignoring streams that did not yet have a * valid rt_delta because we did not yet receive an SR packet for those @@ -1334,14 +1530,14 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, } /* bail out if we adjusted recently enough */ - if (all_sync && (last_unix - bin->priv->last_unix) < + if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) < bin->rtcp_sync_interval * GST_MSECOND) { GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; " "previous sender info too recent " - "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix); + "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime); return; } - bin->priv->last_unix = last_unix; + bin->priv->last_ntpnstime = ntpnstime; /* calculate offsets for each stream */ for (walk = client->streams; walk; walk = g_slist_next (walk)) { @@ -1361,7 +1557,8 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, else ts_offset = ostream->rt_delta - min; - stream_set_ts_offset (bin, ostream, ts_offset, TRUE); + stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset, + MIN_TS_OFFSET, TRUE); } } gst_rtp_bin_send_sync_event (stream); @@ -1496,12 +1693,16 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) rtpbin = session->bin; + if (g_slist_length (session->streams) >= rtpbin->max_streams) + goto max_streams; + if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL))) goto no_jitterbuffer; - if (!rtpbin->ignore_pt) + if (!rtpbin->ignore_pt) { if (!(demux = gst_element_factory_make ("rtpptdemux", NULL))) goto no_demux; + } stream = g_new0 (GstRtpBinStream, 1); stream->ssrc = ssrc; @@ -1532,7 +1733,17 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL); g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL); g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL); - + g_object_set (buffer, "max-rtcp-rtp-time-diff", + rtpbin->max_rtcp_rtp_time_diff, NULL); + g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, + "max-misorder-time", rtpbin->max_misorder_time, NULL); + g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL); + g_object_set (buffer, "max-ts-offset-adjustment", + rtpbin->max_ts_offset_adjustment, NULL); + + /* need to sink the jitterbufer or otherwise signal handlers from bindings will + * take ownership of it and we don't own it anymore */ + gst_object_ref_sink (buffer); g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0, buffer, session->id, ssrc); @@ -1540,6 +1751,10 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) gst_bin_add (GST_BIN_CAST (rtpbin), demux); gst_bin_add (GST_BIN_CAST (rtpbin), buffer); + /* unref the jitterbuffer again, the bin has a reference now and + * we don't need it anymore */ + gst_object_unref (buffer); + /* link stuff */ if (demux) gst_element_link_pads_full (buffer, "src", demux, "sink", @@ -1567,6 +1782,12 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) return stream; /* ERRORS */ +max_streams: + { + GST_WARNING_OBJECT (rtpbin, "stream exeeds maximum (%d)", + rtpbin->max_streams); + return NULL; + } no_jitterbuffer: { g_warning ("rtpbin: could not create rtpjitterbuffer element"); @@ -1657,7 +1878,7 @@ static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad); static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message); #define gst_rtp_bin_parent_class parent_class -G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN); +G_DEFINE_TYPE_WITH_PRIVATE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN); static gboolean _gst_element_accumulator (GSignalInvocationHint * ihint, @@ -1702,8 +1923,6 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) gstelement_class = (GstElementClass *) klass; gstbin_class = (GstBinClass *) klass; - g_type_class_add_private (klass, sizeof (GstRtpBinPrivate)); - gobject_class->dispose = gst_rtp_bin_dispose; gobject_class->finalize = gst_rtp_bin_finalize; gobject_class->set_property = gst_rtp_bin_set_property; @@ -1776,6 +1995,21 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) 0, G_TYPE_NONE); /** + * GstRtpBin::get-session: + * @rtpbin: the object which received the signal + * @id: the session id + * + * Request the related GstRtpSession as #GstElement related with session @id. + * + * Since: 1.8 + */ + gst_rtp_bin_signals[SIGNAL_GET_SESSION] = + g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass, + get_session), NULL, NULL, g_cclosure_marshal_generic, + GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** * GstRtpBin::get-internal-session: * @rtpbin: the object which received the signal * @id: the session id @@ -1789,6 +2023,36 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) RTP_TYPE_SESSION, 1, G_TYPE_UINT); /** + * GstRtpBin::get-internal-storage: + * @rtpbin: the object which received the signal + * @id: the session id + * + * Request the internal RTPStorage object as #GObject in session @id. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_STORAGE] = + g_signal_new ("get-internal-storage", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass, + get_internal_storage), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_OBJECT, 1, G_TYPE_UINT); + + /** + * GstRtpBin::get-storage: + * @rtpbin: the object which received the signal + * @id: the session id + * + * Request the RTPStorage element as #GObject in session @id. + * + * Since: 1.16 + */ + gst_rtp_bin_signals[SIGNAL_GET_STORAGE] = + g_signal_new ("get-storage", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass, + get_storage), NULL, NULL, g_cclosure_marshal_generic, + GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** * GstRtpBin::on-new-ssrc: * @rtpbin: the object which received the signal * @session: the session @@ -2012,6 +2276,23 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT); /** + * GstRtpBin::new-storage: + * @rtpbin: the object which received the signal + * @storage: the new storage + * @session: the session + * + * Notify that a new @storage was created for @session. + * This signal can, for example, be used to configure @storage. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_NEW_STORAGE] = + g_signal_new ("new-storage", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + new_storage), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 2, GST_TYPE_ELEMENT, G_TYPE_UINT); + + /** * GstRtpBin::request-aux-sender: * @rtpbin: the object which received the signal * @session: the session @@ -2028,6 +2309,7 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_aux_sender), _gst_element_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + /** * GstRtpBin::request-aux-receiver: * @rtpbin: the object which received the signal @@ -2046,6 +2328,73 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) request_aux_receiver), _gst_element_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + /** + * GstRtpBin::request-fec-decoder: + * @rtpbin: the object which received the signal + * @session: the session index + * + * Request a FEC decoder element for the given @session. The element + * will be added to the bin after the pt demuxer. + * + * If no handler is connected, no FEC decoder will be used. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER] = + g_signal_new ("request-fec-decoder", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + request_fec_decoder), _gst_element_accumulator, NULL, + g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** + * GstRtpBin::request-fec-encoder: + * @rtpbin: the object which received the signal + * @session: the session index + * + * Request a FEC encoder element for the given @session. The element + * will be added to the bin after the RTPSession. + * + * If no handler is connected, no FEC encoder will be used. + * + * Since: 1.14 + */ + gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_ENCODER] = + g_signal_new ("request-fec-encoder", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + request_fec_encoder), _gst_element_accumulator, NULL, + g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); + + /** + * GstRtpBin::on-new-sender-ssrc: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the sender SSRC + * + * Notify of a new sender SSRC that entered @session. + * + * Since: 1.8 + */ + gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] = + g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT, + G_TYPE_UINT); + /** + * GstRtpBin::on-sender-ssrc-active: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the sender SSRC + * + * Notify of a sender SSRC that is active, i.e., sending RTCP. + * + * Since: 1.8 + */ + gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] = + g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, + on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); + g_object_class_install_property (gobject_class, PROP_SDES, g_param_spec_boxed ("sdes", "SDES", "The SDES items of this session", @@ -2068,9 +2417,10 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK, g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock", - "Use the pipeline running-time to set the NTP time in the RTCP SR messages", + "Use the pipeline running-time to set the NTP time in the RTCP SR messages " + "(DEPRECATED: Use ntp-time-source property)", DEFAULT_USE_PIPELINE_CLOCK, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED)); /** * GstRtpBin:buffer-mode: * @@ -2121,32 +2471,130 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) "Send event downstream when a stream is synchronized to the sender", DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin:do-retransmission: + * + * Enables RTP retransmission on all streams. To control retransmission on + * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and + * set the #GstRtpJitterBuffer::do-retransmission property on the + * #GstRtpJitterBuffer object instead. + */ g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION, g_param_spec_boolean ("do-retransmission", "Do retransmission", - "Send an event downstream to request packet retransmission", + "Enable retransmission on all streams", DEFAULT_DO_RETRANSMISSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin:rtp-profile: + * + * Sets the default RTP profile of newly created RTP sessions. The + * profile can be changed afterwards on a per-session basis. + */ + g_object_class_install_property (gobject_class, PROP_RTP_PROFILE, + g_param_spec_enum ("rtp-profile", "RTP Profile", + "Default RTP profile of newly created sessions", + GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE, + g_param_spec_enum ("ntp-time-source", "NTP Time Source", + "NTP time source for RTCP packets", + gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME, + g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time", + "Use send time or capture time for RTCP sync " + "(TRUE = send time, FALSE = capture time)", + DEFAULT_RTCP_SYNC_SEND_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF, + g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff", + "Maximum amount of time in ms that the RTP time in RTCP SRs " + "is allowed to be ahead (-1 disabled)", -1, G_MAXINT, + DEFAULT_MAX_RTCP_RTP_TIME_DIFF, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC, + g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock", + "Synchronize received streams to the RFC7273 clock " + "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_STREAMS, + g_param_spec_uint ("max-streams", "Max Streams", + "The maximum number of streams to create for one session", + 0, G_MAXUINT, DEFAULT_MAX_STREAMS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstRtpBin:max-ts-offset-adjustment: + * + * Syncing time stamps to NTP time adds a time offset. This parameter + * specifies the maximum number of nanoseconds per frame that this time offset + * may be adjusted with. This is used to avoid sudden large changes to time + * stamps. + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT, + g_param_spec_uint64 ("max-ts-offset-adjustment", + "Max Timestamp Offset Adjustment", + "The maximum number of nanoseconds per frame that time stamp offsets " + "may be adjusted (0 = no limit).", 0, G_MAXUINT64, + DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + /** + * GstRtpBin:max-ts-offset: + * + * Used to set an upper limit of how large a time offset may be. This + * is used to protect against unrealistic values as a result of either + * client,server or clock issues. + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET, + g_param_spec_int64 ("max-ts-offset", "Max TS Offset", + "The maximum absolute value of the time offset in (nanoseconds). " + "Note, if the ntp-sync parameter is set the default value is " + "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad); /* sink pads */ - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpbin_send_rtp_sink_template)); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_recv_rtp_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_recv_rtcp_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_send_rtp_sink_template); /* src pads */ - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpbin_recv_rtp_src_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpbin_send_rtcp_src_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpbin_send_rtp_src_template)); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_recv_rtp_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_send_rtcp_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_send_rtp_src_template); gst_element_class_set_static_metadata (gstelement_class, "RTP Bin", "Filter/Network/RTP", @@ -2157,8 +2605,12 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map); klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync); + klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session); klass->get_internal_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session); + klass->get_storage = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_storage); + klass->get_internal_storage = + GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_storage); klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder); klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder); klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder); @@ -2172,7 +2624,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) { gchar *cname; - rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); + rtpbin->priv = gst_rtp_bin_get_instance_private (rtpbin); g_mutex_init (&rtpbin->priv->bin_lock); g_mutex_init (&rtpbin->priv->dyn_lock); @@ -2189,6 +2641,17 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK; rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT; rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION; + rtpbin->rtp_profile = DEFAULT_RTP_PROFILE; + rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE; + rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME; + rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF; + rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; + rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; + rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC; + rtpbin->max_streams = DEFAULT_MAX_STREAMS; + rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT; + rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET; + rtpbin->max_ts_offset_is_set = FALSE; /* some default SDES entries */ cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ()); @@ -2304,6 +2767,15 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, break; case PROP_NTP_SYNC: rtpbin->ntp_sync = g_value_get_boolean (value); + /* The default value of max_ts_offset depends on ntp_sync. If user + * hasn't set it then change default value */ + if (!rtpbin->max_ts_offset_is_set) { + if (rtpbin->ntp_sync) { + rtpbin->max_ts_offset = 0; + } else { + rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET; + } + } break; case PROP_RTCP_SYNC: g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value)); @@ -2349,29 +2821,102 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-retransmission", value); break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + case PROP_RTP_PROFILE: + rtpbin->rtp_profile = g_value_get_enum (value); break; - } -} - -static void -gst_rtp_bin_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstRtpBin *rtpbin; - - rtpbin = GST_RTP_BIN (object); + case PROP_NTP_TIME_SOURCE:{ + GSList *sessions; + GST_RTP_BIN_LOCK (rtpbin); + rtpbin->ntp_time_source = g_value_get_enum (value); + for (sessions = rtpbin->sessions; sessions; + sessions = g_slist_next (sessions)) { + GstRtpBinSession *session = (GstRtpBinSession *) sessions->data; - switch (prop_id) { - case PROP_LATENCY: + g_object_set (G_OBJECT (session->session), + "ntp-time-source", rtpbin->ntp_time_source, NULL); + } + GST_RTP_BIN_UNLOCK (rtpbin); + break; + } + case PROP_RTCP_SYNC_SEND_TIME:{ + GSList *sessions; GST_RTP_BIN_LOCK (rtpbin); - g_value_set_uint (value, rtpbin->latency_ms); + rtpbin->rtcp_sync_send_time = g_value_get_boolean (value); + for (sessions = rtpbin->sessions; sessions; + sessions = g_slist_next (sessions)) { + GstRtpBinSession *session = (GstRtpBinSession *) sessions->data; + + g_object_set (G_OBJECT (session->session), + "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL); + } GST_RTP_BIN_UNLOCK (rtpbin); break; - case PROP_DROP_ON_LATENCY: + } + case PROP_MAX_RTCP_RTP_TIME_DIFF: GST_RTP_BIN_LOCK (rtpbin); - g_value_set_boolean (value, rtpbin->drop_on_latency); + rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value); + GST_RTP_BIN_UNLOCK (rtpbin); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "max-rtcp-rtp-time-diff", value); + break; + case PROP_MAX_DROPOUT_TIME: + GST_RTP_BIN_LOCK (rtpbin); + rtpbin->max_dropout_time = g_value_get_uint (value); + GST_RTP_BIN_UNLOCK (rtpbin); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "max-dropout-time", value); + gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time", + value); + break; + case PROP_MAX_MISORDER_TIME: + GST_RTP_BIN_LOCK (rtpbin); + rtpbin->max_misorder_time = g_value_get_uint (value); + GST_RTP_BIN_UNLOCK (rtpbin); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "max-misorder-time", value); + gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time", + value); + break; + case PROP_RFC7273_SYNC: + rtpbin->rfc7273_sync = g_value_get_boolean (value); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "rfc7273-sync", value); + break; + case PROP_MAX_STREAMS: + rtpbin->max_streams = g_value_get_uint (value); + break; + case PROP_MAX_TS_OFFSET_ADJUSTMENT: + rtpbin->max_ts_offset_adjustment = g_value_get_uint64 (value); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "max-ts-offset-adjustment", value); + break; + case PROP_MAX_TS_OFFSET: + rtpbin->max_ts_offset = g_value_get_int64 (value); + rtpbin->max_ts_offset_is_set = TRUE; + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_bin_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRtpBin *rtpbin; + + rtpbin = GST_RTP_BIN (object); + + switch (prop_id) { + case PROP_LATENCY: + GST_RTP_BIN_LOCK (rtpbin); + g_value_set_uint (value, rtpbin->latency_ms); + GST_RTP_BIN_UNLOCK (rtpbin); + break; + case PROP_DROP_ON_LATENCY: + GST_RTP_BIN_LOCK (rtpbin); + g_value_set_boolean (value, rtpbin->drop_on_latency); GST_RTP_BIN_UNLOCK (rtpbin); break; case PROP_SDES: @@ -2411,6 +2956,38 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, rtpbin->do_retransmission); GST_RTP_BIN_UNLOCK (rtpbin); break; + case PROP_RTP_PROFILE: + g_value_set_enum (value, rtpbin->rtp_profile); + break; + case PROP_NTP_TIME_SOURCE: + g_value_set_enum (value, rtpbin->ntp_time_source); + break; + case PROP_RTCP_SYNC_SEND_TIME: + g_value_set_boolean (value, rtpbin->rtcp_sync_send_time); + break; + case PROP_MAX_RTCP_RTP_TIME_DIFF: + GST_RTP_BIN_LOCK (rtpbin); + g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff); + GST_RTP_BIN_UNLOCK (rtpbin); + break; + case PROP_MAX_DROPOUT_TIME: + g_value_set_uint (value, rtpbin->max_dropout_time); + break; + case PROP_MAX_MISORDER_TIME: + g_value_set_uint (value, rtpbin->max_misorder_time); + break; + case PROP_RFC7273_SYNC: + g_value_set_boolean (value, rtpbin->rfc7273_sync); + break; + case PROP_MAX_STREAMS: + g_value_set_uint (value, rtpbin->max_streams); + break; + case PROP_MAX_TS_OFFSET_ADJUSTMENT: + g_value_set_uint64 (value, rtpbin->max_ts_offset_adjustment); + break; + case PROP_MAX_TS_OFFSET: + g_value_set_int64 (value, rtpbin->max_ts_offset); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2628,7 +3205,7 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - priv->last_unix = 0; + priv->last_ntpnstime = 0; GST_LOG_OBJECT (rtpbin, "clearing shutdown flag"); g_atomic_int_set (&priv->shutdown, 0); break; @@ -2684,21 +3261,57 @@ manage_failed: } } -/* a new pad (SSRC) was created in @session. This signal is emited from the - * payload demuxer. */ +static gboolean +copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) +{ + GstPad *gpad = GST_PAD_CAST (user_data); + + GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event); + gst_pad_store_sticky_event (gpad, *event); + + return TRUE; +} + static void -new_payload_found (GstElement * element, guint pt, GstPad * pad, - GstRtpBinStream * stream) +expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream, + guint8 pt) { - GstRtpBin *rtpbin; GstElementClass *klass; GstPadTemplate *templ; gchar *padname; GstPad *gpad; - rtpbin = stream->bin; + gst_object_ref (pad); + + if (stream->session->storage) { + GstElement *fec_decoder = + session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER); + + if (fec_decoder) { + GstPad *sinkpad, *srcpad; + GstPadLinkReturn ret; + + sinkpad = gst_element_get_static_pad (fec_decoder, "sink"); - GST_DEBUG ("new payload pad %d", pt); + if (!sinkpad) + goto fec_decoder_sink_failed; + + ret = gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + + if (ret != GST_PAD_LINK_OK) + goto fec_decoder_link_failed; + + srcpad = gst_element_get_static_pad (fec_decoder, "src"); + + if (!srcpad) + goto fec_decoder_src_failed; + + gst_pad_sticky_events_foreach (pad, copy_sticky_events, srcpad); + gst_object_unref (pad); + pad = srcpad; + } + } GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown); @@ -2714,15 +3327,52 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad, gst_pad_set_active (gpad, TRUE); GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); + gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); +done: + gst_object_unref (pad); + return; shutdown: { GST_DEBUG ("ignoring, we are shutting down"); - return; + goto done; } +fec_decoder_sink_failed: + { + g_warning ("rtpbin: failed to get fec encoder sink pad for session %u", + stream->session->id); + goto done; + } +fec_decoder_src_failed: + { + g_warning ("rtpbin: failed to get fec encoder src pad for session %u", + stream->session->id); + goto done; + } +fec_decoder_link_failed: + { + g_warning ("rtpbin: failed to link fec decoder for session %u", + stream->session->id); + goto done; + } +} + +/* a new pad (SSRC) was created in @session. This signal is emited from the + * payload demuxer. */ +static void +new_payload_found (GstElement * element, guint pt, GstPad * pad, + GstRtpBinStream * stream) +{ + GstRtpBin *rtpbin; + + rtpbin = stream->bin; + + GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt); + + expose_recv_src_pad (rtpbin, pad, stream, pt); } static void @@ -2754,7 +3404,7 @@ pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session) rtpbin = session->bin; - GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt, + GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt, session->id); caps = get_pt_map (session, pt); @@ -2771,18 +3421,48 @@ no_caps: } } +static GstCaps * +ptdemux_pt_map_requested (GstElement * element, guint pt, + GstRtpBinSession * session) +{ + GstCaps *ret = pt_map_requested (element, pt, session); + + if (ret && gst_caps_get_size (ret) == 1) { + const GstStructure *s = gst_caps_get_structure (ret, 0); + gboolean is_fec; + + if (gst_structure_get_boolean (s, "is-fec", &is_fec) && is_fec) { + GValue v = G_VALUE_INIT; + GValue v2 = G_VALUE_INIT; + + GST_INFO_OBJECT (session->bin, "Will ignore FEC pt %u in session %u", pt, + session->id); + g_value_init (&v, GST_TYPE_ARRAY); + g_value_init (&v2, G_TYPE_INT); + g_object_get_property (G_OBJECT (element), "ignored-payload-types", &v); + g_value_set_int (&v2, pt); + gst_value_array_append_value (&v, &v2); + g_value_unset (&v2); + g_object_set_property (G_OBJECT (element), "ignored-payload-types", &v); + g_value_unset (&v); + } + } + + return ret; +} + static void payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session) { GST_DEBUG_OBJECT (session->bin, - "emiting signal for pt type changed to %d in session %d", pt, + "emiting signal for pt type changed to %u in session %u", pt, session->id); g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE], 0, session->id, pt); } -/* emited when caps changed for the session */ +/* emitted when caps changed for the session */ static void caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session) { @@ -2803,8 +3483,10 @@ caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session) s = gst_caps_get_structure (caps, 0); /* get payload, finish when it's not there */ - if (!gst_structure_get_int (s, "payload", &payload)) + if (!gst_structure_get_int (s, "payload", &payload)) { + gst_caps_unref (caps); return; + } GST_RTP_SESSION_LOCK (session); GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload); @@ -2868,40 +3550,31 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, stream->demux_padremoved_sig = g_signal_connect (stream->demux, "pad-removed", (GCallback) payload_pad_removed, stream); - /* connect to the request-pt-map signal. This signal will be emited by the + /* connect to the request-pt-map signal. This signal will be emitted by the * demuxer so that it can apply a proper caps on the buffers for the * depayloaders. */ stream->demux_ptreq_sig = g_signal_connect (stream->demux, - "request-pt-map", (GCallback) pt_map_requested, session); + "request-pt-map", (GCallback) ptdemux_pt_map_requested, session); /* connect to the signal so it can be forwarded. */ stream->demux_ptchange_sig = g_signal_connect (stream->demux, "payload-type-change", (GCallback) payload_type_change, session); + + GST_RTP_SESSION_UNLOCK (session); + GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); } else { /* add rtpjitterbuffer src pad to pads */ - GstElementClass *klass; - GstPadTemplate *templ; - gchar *padname; - GstPad *gpad, *pad; + GstPad *pad; pad = gst_element_get_static_pad (stream->buffer, "src"); - /* ghost the pad to the parent */ - klass = GST_ELEMENT_GET_CLASS (rtpbin); - templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u"); - padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u", - stream->session->id, stream->ssrc, 255); - gpad = gst_ghost_pad_new_from_template (padname, pad, templ); - g_free (padname); + GST_RTP_SESSION_UNLOCK (session); + GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); - gst_pad_set_active (gpad, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); + expose_recv_src_pad (rtpbin, pad, stream, 255); gst_object_unref (pad); } - GST_RTP_SESSION_UNLOCK (session); - GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin); - return; /* ERRORS */ @@ -2919,15 +3592,14 @@ no_stream: } } -static gboolean +static GstPad * complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) { - gchar *gname; guint sessid = session->id; GstPad *recv_rtp_sink; GstElement *decoder; - GstElementClass *klass; - GstPadTemplate *templ; + + g_assert (!session->recv_rtp_sink); /* get recv_rtp pad and store */ session->recv_rtp_sink = @@ -2956,6 +3628,7 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) goto dec_src_failed; ret = gst_pad_link (decsrc, session->recv_rtp_sink); + gst_object_unref (decsrc); if (ret != GST_PAD_LINK_OK) @@ -2966,86 +3639,48 @@ complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session) recv_rtp_sink = gst_object_ref (session->recv_rtp_sink); } - GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); - klass = GST_ELEMENT_GET_CLASS (rtpbin); - gname = g_strdup_printf ("recv_rtp_sink_%u", sessid); - templ = gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u"); - session->recv_rtp_sink_ghost = - gst_ghost_pad_new_from_template (gname, recv_rtp_sink, templ); - gst_object_unref (recv_rtp_sink); - gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost); - g_free (gname); - - return TRUE; + return recv_rtp_sink; /* ERRORS */ pad_failed: { g_warning ("rtpbin: failed to get session recv_rtp_sink pad"); - return FALSE; + return NULL; } dec_sink_failed: { - g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid); - return FALSE; + g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid); + return NULL; } dec_src_failed: { - g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid); + g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid); gst_object_unref (recv_rtp_sink); - return FALSE; + return NULL; } dec_link_failed: { - g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid); + g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid); gst_object_unref (recv_rtp_sink); - return FALSE; + return NULL; } } -/* Create a pad for receiving RTP for the session in @name. Must be called with - * RTP_BIN_LOCK. - */ -static GstPad * -create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +static void +complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid) { - guint sessid; GstElement *aux; GstPad *recv_rtp_src; - GstRtpBinSession *session; - - /* first get the session number */ - if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1) - goto no_name; - GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); - - /* get or create session */ - session = find_session_by_id (rtpbin, sessid); - if (!session) { - GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); - /* create session now */ - session = create_session (rtpbin, sessid); - if (session == NULL) - goto create_error; - } - - /* check if pad was requested */ - if (session->recv_rtp_sink_ghost != NULL) - return session->recv_rtp_sink_ghost; - - /* setup the session sink pad */ - if (!complete_session_sink (rtpbin, session)) - goto session_sink_failed; + g_assert (!session->recv_rtp_src); session->recv_rtp_src = gst_element_get_static_pad (session->session, "recv_rtp_src"); if (session->recv_rtp_src == NULL) goto pad_failed; - /* find out if we need AUX elements or if we can go into the SSRC demuxer - * directly */ + /* find out if we need AUX elements */ aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER); if (aux) { gchar *pname; @@ -3054,7 +3689,7 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver"); - pname = g_strdup_printf ("sink_%d", sessid); + pname = g_strdup_printf ("sink_%u", sessid); auxsink = gst_element_get_static_pad (aux, pname); g_free (pname); if (auxsink == NULL) @@ -3065,15 +3700,30 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (ret != GST_PAD_LINK_OK) goto aux_link_failed; - /* this can be NULL when this AUX element is not to be linked to - * an SSRC demuxer */ - pname = g_strdup_printf ("src_%d", sessid); + /* this can be NULL when this AUX element is not to be linked any further */ + pname = g_strdup_printf ("src_%u", sessid); recv_rtp_src = gst_element_get_static_pad (aux, pname); g_free (pname); } else { recv_rtp_src = gst_object_ref (session->recv_rtp_src); } + /* Add a storage element if needed */ + if (recv_rtp_src && session->storage) { + GstPadLinkReturn ret; + GstPad *sinkpad = gst_element_get_static_pad (session->storage, "sink"); + + ret = gst_pad_link (recv_rtp_src, sinkpad); + + gst_object_unref (sinkpad); + gst_object_unref (recv_rtp_src); + + if (ret != GST_PAD_LINK_OK) + goto storage_link_failed; + + recv_rtp_src = gst_element_get_static_pad (session->storage, "src"); + } + if (recv_rtp_src) { GstPad *sinkdpad; @@ -3081,8 +3731,8 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) sinkdpad = gst_element_get_static_pad (session->demux, "sink"); GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad"); gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING); - gst_object_unref (recv_rtp_src); gst_object_unref (sinkdpad); + gst_object_unref (recv_rtp_src); /* connect to the new-ssrc-pad signal of the SSRC demuxer */ session->demux_newpad_sig = g_signal_connect (session->demux, @@ -3090,6 +3740,75 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) session->demux_padremoved_sig = g_signal_connect (session->demux, "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session); } + + return; + +pad_failed: + { + g_warning ("rtpbin: failed to get session recv_rtp_src pad"); + return; + } +aux_sink_failed: + { + g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); + return; + } +aux_link_failed: + { + g_warning ("rtpbin: failed to link AUX pad to session %u", sessid); + return; + } +storage_link_failed: + { + g_warning ("rtpbin: failed to link storage"); + return; + } +} + +/* Create a pad for receiving RTP for the session in @name. Must be called with + * RTP_BIN_LOCK. + */ +static GstPad * +create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +{ + guint sessid; + GstRtpBinSession *session; + GstPad *recv_rtp_sink; + + /* first get the session number */ + if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1) + goto no_name; + + GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid); + + /* get or create session */ + session = find_session_by_id (rtpbin, sessid); + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } + + /* check if pad was requested */ + if (session->recv_rtp_sink_ghost != NULL) + return session->recv_rtp_sink_ghost; + + /* setup the session sink pad */ + recv_rtp_sink = complete_session_sink (rtpbin, session); + if (!recv_rtp_sink) + goto session_sink_failed; + + GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); + session->recv_rtp_sink_ghost = + gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ); + gst_object_unref (recv_rtp_sink); + gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost); + + complete_session_receiver (rtpbin, session, sessid); + return session->recv_rtp_sink_ghost; /* ERRORS */ @@ -3108,21 +3827,6 @@ session_sink_failed: /* warning already done */ return NULL; } -pad_failed: - { - g_warning ("rtpbin: failed to get session recv_rtp_src pad"); - return NULL; - } -aux_sink_failed: - { - g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid); - return NULL; - } -aux_link_failed: - { - g_warning ("rtpbin: failed to link AUX pad to session %d", sessid); - return NULL; - } } static void @@ -3153,37 +3857,13 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) } } -/* Create a pad for receiving RTCP for the session in @name. Must be called with - * RTP_BIN_LOCK. - */ static GstPad * -create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, - const gchar * name) +complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid) { - guint sessid; GstElement *decoder; - GstRtpBinSession *session; - GstPad *sinkdpad, *decsink; - - /* first get the session number */ - if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1) - goto no_name; - - GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); - - /* get or create the session */ - session = find_session_by_id (rtpbin, sessid); - if (!session) { - GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); - /* create session now */ - session = create_session (rtpbin, sessid); - if (session == NULL) - goto create_error; - } - - /* check if pad was requested */ - if (session->recv_rtcp_sink_ghost != NULL) - return session->recv_rtcp_sink_ghost; + GstPad *sinkdpad; + GstPad *decsink = NULL; /* get recv_rtp pad and store */ GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); @@ -3209,6 +3889,7 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, goto dec_src_failed; ret = gst_pad_link (decsrc, session->recv_rtcp_sink); + gst_object_unref (decsrc); if (ret != GST_PAD_LINK_OK) @@ -3229,26 +3910,8 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING); gst_object_unref (sinkdpad); - session->recv_rtcp_sink_ghost = - gst_ghost_pad_new_from_template (name, decsink, templ); - gst_object_unref (decsink); - gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), - session->recv_rtcp_sink_ghost); + return decsink; - return session->recv_rtcp_sink_ghost; - - /* ERRORS */ -no_name: - { - g_warning ("rtpbin: invalid name given"); - return NULL; - } -create_error: - { - /* create_session already warned */ - return NULL; - } pad_failed: { g_warning ("rtpbin: failed to get session rtcp_sink pad"); @@ -3256,25 +3919,82 @@ pad_failed: } dec_sink_failed: { - g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid); return NULL; } dec_src_failed: { - g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid); - gst_object_unref (decsink); - return NULL; + g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid); + goto cleanup; } dec_link_failed: { - g_warning ("rtpbin: failed to link rtcp decoder for session %d", sessid); - gst_object_unref (decsink); - return NULL; + g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid); + goto cleanup; } src_pad_failed: { g_warning ("rtpbin: failed to get session sync_src pad"); - gst_object_unref (decsink); + } + +cleanup: + gst_object_unref (decsink); + return NULL; +} + +/* Create a pad for receiving RTCP for the session in @name. Must be called with + * RTP_BIN_LOCK. + */ +static GstPad * +create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, + const gchar * name) +{ + guint sessid; + GstRtpBinSession *session; + GstPad *decsink = NULL; + + /* first get the session number */ + if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1) + goto no_name; + + GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid); + + /* get or create the session */ + session = find_session_by_id (rtpbin, sessid); + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } + + /* check if pad was requested */ + if (session->recv_rtcp_sink_ghost != NULL) + return session->recv_rtcp_sink_ghost; + + decsink = complete_session_rtcp (rtpbin, session, sessid); + if (!decsink) + goto create_error; + + session->recv_rtcp_sink_ghost = + gst_ghost_pad_new_from_template (name, decsink, templ); + gst_object_unref (decsink); + gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), + session->recv_rtcp_sink_ghost); + + return session->recv_rtcp_sink_ghost; + + /* ERRORS */ +no_name: + { + g_warning ("rtpbin: invalid name given"); + return NULL; + } +create_error: + { + /* create_session already warned */ return NULL; } } @@ -3309,11 +4029,12 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) GstElement *encoder; GstElementClass *klass; GstPadTemplate *templ; + gboolean ret = FALSE; /* get srcpad */ - session->send_rtp_src = - gst_element_get_static_pad (session->session, "send_rtp_src"); - if (session->send_rtp_src == NULL) + send_rtp_src = gst_element_get_static_pad (session->session, "send_rtp_src"); + + if (send_rtp_src == NULL) goto no_srcpad; GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder"); @@ -3324,29 +4045,29 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) GstPadLinkReturn ret; GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder"); - ename = g_strdup_printf ("rtp_src_%d", sessid); + ename = g_strdup_printf ("rtp_src_%u", sessid); encsrc = gst_element_get_static_pad (encoder, ename); g_free (ename); if (encsrc == NULL) goto enc_src_failed; - send_rtp_src = encsrc; - - ename = g_strdup_printf ("rtp_sink_%d", sessid); + ename = g_strdup_printf ("rtp_sink_%u", sessid); encsink = gst_element_get_static_pad (encoder, ename); g_free (ename); if (encsink == NULL) goto enc_sink_failed; - ret = gst_pad_link (session->send_rtp_src, encsink); + ret = gst_pad_link (send_rtp_src, encsink); gst_object_unref (encsink); + gst_object_unref (send_rtp_src); + + send_rtp_src = encsrc; if (ret != GST_PAD_LINK_OK) goto enc_link_failed; } else { GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given"); - send_rtp_src = gst_object_ref (session->send_rtp_src); } /* ghost the new source pad */ @@ -3355,35 +4076,43 @@ complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u"); session->send_rtp_src_ghost = gst_ghost_pad_new_from_template (gname, send_rtp_src, templ); - gst_object_unref (send_rtp_src); gst_pad_set_active (session->send_rtp_src_ghost, TRUE); + gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events, + session->send_rtp_src_ghost); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost); g_free (gname); - return TRUE; + ret = TRUE; + +done: + if (send_rtp_src) + gst_object_unref (send_rtp_src); + + return ret; /* ERRORS */ no_srcpad: { - g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid); - return FALSE; + g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid); + goto done; } enc_src_failed: { - g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid); - return FALSE; + g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT + " src pad for session %u", encoder, sessid); + goto done; } enc_sink_failed: { - g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid); - gst_object_unref (send_rtp_src); - return FALSE; + g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT + " sink pad for session %u", encoder, sessid); + goto done; } enc_link_failed: { - g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid); - gst_object_unref (send_rtp_src); - return FALSE; + g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u", + encoder, sessid); + goto done; } } @@ -3443,22 +4172,23 @@ create_error: } existing_session: { - g_warning ("rtpbin: session %d is already a sender", sessid); - return FALSE; + GST_DEBUG_OBJECT (rtpbin, + "skipping src_%i setup, since it is already configured.", sessid); + return TRUE; } pad_failed: { - g_warning ("rtpbin: failed to get session pad for session %d", sessid); + g_warning ("rtpbin: failed to get session pad for session %u", sessid); return FALSE; } aux_link_failed: { - g_warning ("rtpbin: failed to link AUX for session %d", sessid); + g_warning ("rtpbin: failed to link AUX for session %u", sessid); return FALSE; } session_src_failed: { - g_warning ("rtpbin: failed to complete AUX for session %d", sessid); + g_warning ("rtpbin: failed to complete AUX for session %u", sessid); return FALSE; } } @@ -3488,6 +4218,8 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) guint sessid; GstPad *send_rtp_sink; GstElement *aux; + GstElement *encoder; + GstElement *prev = NULL; GstRtpBinSession *session; /* first get the session number */ @@ -3511,19 +4243,47 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink != NULL) goto existing_session; + encoder = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER); + + if (encoder) { + GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder"); + + send_rtp_sink = gst_element_get_static_pad (encoder, "sink"); + + if (!send_rtp_sink) + goto enc_sink_failed; + + prev = encoder; + } + GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender"); aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER); if (aux) { + GstPad *sinkpad; GST_DEBUG_OBJECT (rtpbin, "linking AUX sender"); if (!setup_aux_sender (rtpbin, session, aux)) goto aux_session_failed; - pname = g_strdup_printf ("sink_%d", sessid); - send_rtp_sink = gst_element_get_static_pad (aux, pname); + pname = g_strdup_printf ("sink_%u", sessid); + sinkpad = gst_element_get_static_pad (aux, pname); g_free (pname); - if (send_rtp_sink == NULL) + if (sinkpad == NULL) goto aux_sink_failed; + + if (!prev) { + send_rtp_sink = sinkpad; + } else { + GstPad *srcpad = gst_element_get_static_pad (prev, "src"); + GstPadLinkReturn ret; + + ret = gst_pad_link (srcpad, sinkpad); + gst_object_unref (srcpad); + if (ret != GST_PAD_LINK_OK) { + goto aux_link_failed; + } + } + prev = aux; } else { /* get send_rtp pad and store */ session->send_rtp_sink = @@ -3534,7 +4294,17 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (!complete_session_src (rtpbin, session)) goto session_src_failed; - send_rtp_sink = gst_object_ref (session->send_rtp_sink); + if (!prev) { + send_rtp_sink = gst_object_ref (session->send_rtp_sink); + } else { + GstPad *srcpad = gst_element_get_static_pad (prev, "src"); + GstPadLinkReturn ret; + + ret = gst_pad_link (srcpad, session->send_rtp_sink); + gst_object_unref (srcpad); + if (ret != GST_PAD_LINK_OK) + goto session_link_failed; + } } session->send_rtp_sink_ghost = @@ -3558,27 +4328,45 @@ create_error: } existing_session: { - g_warning ("rtpbin: session %d is already in use", sessid); + g_warning ("rtpbin: session %u is already in use", sessid); return NULL; } aux_session_failed: { - g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); return NULL; } aux_sink_failed: { - g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid); + return NULL; + } +aux_link_failed: + { + g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u", + aux, sessid); return NULL; } pad_failed: { - g_warning ("rtpbin: failed to get session pad for session %d", sessid); + g_warning ("rtpbin: failed to get session pad for session %u", sessid); return NULL; } session_src_failed: { - g_warning ("rtpbin: failed to setup source pads for session %d", sessid); + g_warning ("rtpbin: failed to setup source pads for session %u", sessid); + return NULL; + } +session_link_failed: + { + g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u", + session, sessid); + return NULL; + } +enc_sink_failed: + { + g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT + " sink pad for session %u", encoder, sessid); return NULL; } } @@ -3592,10 +4380,6 @@ remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) session->send_rtp_src_ghost); session->send_rtp_src_ghost = NULL; } - if (session->send_rtp_src) { - gst_object_unref (session->send_rtp_src); - session->send_rtp_src = NULL; - } if (session->send_rtp_sink) { gst_element_release_request_pad (GST_ELEMENT_CAST (session->session), session->send_rtp_sink); @@ -3614,7 +4398,8 @@ remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) * RTP_BIN_LOCK. */ static GstPad * -create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, + const gchar * name) { guint sessid; GstPad *encsrc; @@ -3627,8 +4412,13 @@ create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) /* get or create session */ session = find_session_by_id (rtpbin, sessid); - if (!session) - goto no_session; + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } /* check if pad was requested */ if (session->send_rtcp_src_ghost != NULL) @@ -3648,16 +4438,16 @@ create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) GstPadLinkReturn ret; GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder"); - ename = g_strdup_printf ("rtcp_sink_%d", sessid); - encsink = gst_element_get_static_pad (encoder, ename); - g_free (ename); - ename = g_strdup_printf ("rtcp_src_%d", sessid); + + ename = g_strdup_printf ("rtcp_src_%u", sessid); encsrc = gst_element_get_static_pad (encoder, ename); g_free (ename); - if (encsrc == NULL) goto enc_src_failed; + ename = g_strdup_printf ("rtcp_sink_%u", sessid); + encsink = gst_element_get_static_pad (encoder, ename); + g_free (ename); if (encsink == NULL) goto enc_sink_failed; @@ -3685,30 +4475,30 @@ no_name: g_warning ("rtpbin: invalid name given"); return NULL; } -no_session: +create_error: { - g_warning ("rtpbin: session with id %d does not exist", sessid); + /* create_session already warned */ return NULL; } pad_failed: { - g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid); + g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid); return NULL; } enc_src_failed: { - g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid); + g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid); return NULL; } enc_sink_failed: { - g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid); + g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid); gst_object_unref (encsrc); return NULL; } enc_link_failed: { - g_warning ("rtpbin: failed to link rtcp encoder for session %d", sessid); + g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid); gst_object_unref (encsrc); return NULL; } @@ -3829,7 +4619,7 @@ gst_rtp_bin_request_new_pad (GstElement * element, result = create_send_rtp (rtpbin, templ, pad_name); } else if (templ == gst_element_class_get_pad_template (klass, "send_rtcp_src_%u")) { - result = create_rtcp (rtpbin, templ, pad_name); + result = create_send_rtcp (rtpbin, templ, pad_name); } else goto wrong_template;