/**
* SECTION:element-rtpsession
+ * @title: rtpsession
* @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
*
* The RTP session manager models participants with unique SSRC in an RTP
* functionality can be activated.
*
* The session manager currently implements RFC 3550 including:
- * <itemizedlist>
- * <listitem>
- * <para>RTP packet validation based on consecutive sequence numbers.</para>
- * </listitem>
- * <listitem>
- * <para>Maintainance of the SSRC participant database.</para>
- * </listitem>
- * <listitem>
- * <para>Keeping per participant statistics based on received RTCP packets.</para>
- * </listitem>
- * <listitem>
- * <para>Scheduling of RR/SR RTCP packets.</para>
- * </listitem>
- * <listitem>
- * <para>Support for multiple sender SSRC.</para>
- * </listitem>
- * </itemizedlist>
+ *
+ * * RTP packet validation based on consecutive sequence numbers.
+ *
+ * * Maintenance of the SSRC participant database.
+ *
+ * * Keeping per participant statistics based on received RTCP packets.
+ *
+ * * Scheduling of RR/SR RTCP packets.
+ *
+ * * Support for multiple sender SSRC.
*
* The rtpsession will not demux packets based on SSRC or payload type, nor will
- * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
+ * it correct for packet reordering and jitter. Use #GstRtpSsrcDemux,
* #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
* perform these tasks. It is usually a good idea to use #GstRtpBin, which
* combines all these features in one element.
* mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
* signal.
*
- * <refsect2>
- * <title>Example pipelines</title>
+ * ## Example pipelines
* |[
* gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
* ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
* correctly because the second udpsink will not preroll correctly (no RTCP
* packets are sent in the PAUSED state). Applications should manually set and
* keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
- * </refsect2>
+ *
*/
#ifdef HAVE_CONFIG_H
GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
#define GST_CAT_DEFAULT gst_rtp_session_debug
+#define GST_TYPE_RTP_NTP_TIME_SOURCE (gst_rtp_ntp_time_source_get_type ())
GType
gst_rtp_ntp_time_source_get_type (void)
{
PROP_MAX_DROPOUT_TIME,
PROP_MAX_MISORDER_TIME,
PROP_STATS,
+ PROP_TWCC_STATS,
PROP_RTP_PROFILE,
PROP_NTP_TIME_SOURCE,
PROP_RTCP_SYNC_SEND_TIME
};
-#define GST_RTP_SESSION_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
-
#define GST_RTP_SESSION_LOCK(sess) g_mutex_lock (&(sess)->priv->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
GstRtpNtpTimeSource ntp_time_source;
gboolean rtcp_sync_send_time;
- guint rtx_count;
+ guint recv_rtx_req_count;
+ guint sent_rtx_req_count;
+
+ GstStructure *last_twcc_stats;
+
+ /*
+ * This is the list of processed packets in the receive path when upstream
+ * pushed a buffer list.
+ */
+ GstBufferList *processed_list;
};
/* callbacks to handle actions from the session manager */
gpointer user_data);
static void gst_rtp_session_notify_nack (RTPSession * sess,
guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
+static void gst_rtp_session_notify_twcc (RTPSession * sess,
+ GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data);
static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
+static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
+ gpointer user_data);
+static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_recv_rtp_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
+static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
static RTPSessionCallbacks callbacks = {
gst_rtp_session_process_rtp,
gst_rtp_session_request_key_unit,
gst_rtp_session_request_time,
gst_rtp_session_notify_nack,
- gst_rtp_session_reconfigure
+ gst_rtp_session_notify_twcc,
+ gst_rtp_session_reconfigure,
+ gst_rtp_session_notify_early_rtcp
};
/* GObject vmethods */
}
#define gst_rtp_session_parent_class parent_class
-G_DEFINE_TYPE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
static void
gst_rtp_session_class_init (GstRtpSessionClass * klass)
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
- g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
-
gobject_class->finalize = gst_rtp_session_finalize;
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
- NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
/**
* GstRtpSession::clear-pt-map:
* @sess: the object which received the signal
*/
gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
- NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
+ NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstRtpSession::on-new-ssrc:
gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
- NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-ssrc_collision:
* @sess: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
- on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
- G_TYPE_NONE, 1, G_TYPE_UINT);
+ on_ssrc_collision), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-ssrc_validated:
* @sess: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
- on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
- G_TYPE_NONE, 1, G_TYPE_UINT);
+ on_ssrc_validated), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-ssrc-active:
* @sess: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
- on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
- G_TYPE_NONE, 1, G_TYPE_UINT);
+ on_ssrc_active), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-ssrc-sdes:
* @session: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
- NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-bye-ssrc:
gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
- NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-bye-timeout:
* @sess: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
- NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-timeout:
* @sess: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
- NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-sender-timeout:
* @sess: the object which received the signal
gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
- on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
- G_TYPE_NONE, 1, G_TYPE_UINT);
+ on_sender_timeout), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-new-sender-ssrc:
gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
- NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstRtpSession::on-sender-ssrc-active:
gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
- on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
- G_TYPE_NONE, 1, G_TYPE_UINT);
+ on_ssrc_active), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
g_param_spec_double ("bandwidth", "Bandwidth",
g_object_class_install_property (gobject_class, PROP_SDES,
g_param_spec_boxed ("sdes", "SDES",
"The SDES items of this session",
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
+ | GST_PARAM_DOC_SHOW_DEFAULT));
+#else
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#endif
g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
g_param_spec_uint ("num-sources", "Num Sources",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
- * GstRtpSession::stats:
+ * GstRtpSession:stats:
*
- * Various session statistics. This property returns a GstStructure
- * with name application/x-rtp-session-stats with the following fields:
+ * Various session statistics. This property returns a #GstStructure
+ * with name `application/x-rtp-session-stats` with the following fields:
*
- * "rtx-count" G_TYPE_UINT The number of retransmission events
- * received from downstream (in receiver mode)
- * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
+ * * "recv-rtx-req-count" G_TYPE_UINT The number of retransmission events
+ * received from downstream (in receiver mode) (Since 1.16)
+ * * "sent-rtx-req-count" G_TYPE_UINT The number of retransmission events
+ * sent downstream (in sender mode) (Since 1.16)
+ * * "rtx-count" G_TYPE_UINT DEPRECATED Since 1.16, same as
+ * "recv-rtx-req-count".
+ * * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
* dropped (due to bandwidth constraints)
- * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
- * "recv-nack-count" G_TYPE_UINT Number of NACKs received
- * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource::stats for all
+ * * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
+ * * "recv-nack-count" G_TYPE_UINT Number of NACKs received
+ * * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource:stats for all
* RTP sources (Since 1.8)
*
* Since: 1.4
"Various statistics", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstRtpSession:twcc-stats:
+ *
+ * Various statistics derived from TWCC. This property returns a GstStructure
+ * with name RTPTWCCStats with the following fields:
+ *
+ * "bitrate-sent" G_TYPE_UINT The actual sent bitrate of TWCC packets
+ * "bitrate-recv" G_TYPE_UINT The estimated bitrate for the receiver.
+ * "packets-sent" G_TYPE_UINT Number of packets sent
+ * "packets-recv" G_TYPE_UINT Number of packets reported recevied
+ * "packet-loss-pct" G_TYPE_DOUBLE Packetloss percentage, based on
+ * packets reported as lost from the recevier.
+ * "avg-delta-of-delta", G_TYPE_INT64 In nanoseconds, a moving window
+ * average of the difference in inter-packet spacing between
+ * sender and receiver. A sudden increase in this number can indicate
+ * network congestion.
+ *
+ * Since: 1.18
+ */
+ g_object_class_install_property (gobject_class, PROP_TWCC_STATS,
+ g_param_spec_boxed ("twcc-stats", "TWCC Statistics",
+ "Various statistics from TWCC", GST_TYPE_STRUCTURE,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
g_param_spec_enum ("rtp-profile", "RTP Profile",
"RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
g_param_spec_enum ("ntp-time-source", "NTP Time Source",
"NTP time source for RTCP packets",
- gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
+ GST_TYPE_RTP_NTP_TIME_SOURCE, DEFAULT_NTP_TIME_SOURCE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
"rtpsession", 0, "RTP Session");
+
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp_list);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
+
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ gst_type_mark_as_plugin_api (GST_TYPE_RTP_NTP_TIME_SOURCE, 0);
+ gst_type_mark_as_plugin_api (RTP_TYPE_SESSION, 0);
+ gst_type_mark_as_plugin_api (RTP_TYPE_SOURCE, 0);
+#endif
}
static void
gst_rtp_session_init (GstRtpSession * rtpsession)
{
- rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
+ rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession);
g_mutex_init (&rtpsession->priv->lock);
g_cond_init (&rtpsession->priv->cond);
rtpsession->priv->sysclock = gst_system_clock_obtain ();
rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) gst_caps_unref);
+ rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
+
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
rtpsession->priv->thread_stopped = TRUE;
- rtpsession->priv->rtx_count = 0;
+ rtpsession->priv->recv_rtx_req_count = 0;
+ rtpsession->priv->sent_rtx_req_count = 0;
rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
}
g_cond_clear (&rtpsession->priv->cond);
g_object_unref (rtpsession->priv->sysclock);
g_object_unref (rtpsession->priv->session);
+ if (rtpsession->priv->last_twcc_stats)
+ gst_structure_free (rtpsession->priv->last_twcc_stats);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
case PROP_STATS:
g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
break;
+ case PROP_TWCC_STATS:
+ GST_RTP_SESSION_LOCK (rtpsession);
+ g_value_set_boxed (value, priv->last_twcc_stats);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+ break;
case PROP_RTP_PROFILE:
g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
break;
GstStructure *s;
g_object_get (rtpsession->priv->session, "stats", &s, NULL);
- gst_structure_set (s, "rtx-count", G_TYPE_UINT, rtpsession->priv->rtx_count,
- NULL);
+ gst_structure_set (s, "rtx-count", G_TYPE_UINT,
+ rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT,
+ rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT,
+ rtpsession->priv->sent_rtx_req_count, NULL);
return s;
}
switch (rtpsession->priv->ntp_time_source) {
case GST_RTP_NTP_TIME_SOURCE_NTP:
case GST_RTP_NTP_TIME_SOURCE_UNIX:{
- GTimeVal current;
-
/* get current NTP time */
- g_get_current_time (¤t);
- ntpns = GST_TIMEVAL_TO_TIME (current);
+ ntpns = g_get_real_time () * GST_USECOND;
/* add constant to convert from 1970 based time to 1900 based time */
if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
g_thread_join (rtpsession->priv->thread);
/* only create a new thread if the old one was stopped. Otherwise we can
* just reuse the currently running one. */
- rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp-thread",
+ rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp",
(GThreadFunc) rtcp_thread, rtpsession, &error);
rtpsession->priv->thread_stopped = FALSE;
}
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_RTP_SESSION_LOCK (rtpsession);
- if (rtpsession->send_rtp_src)
- rtpsession->priv->wait_send = TRUE;
+ rtpsession->priv->wait_send = TRUE;
GST_RTP_SESSION_UNLOCK (rtpsession);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
case GST_STATE_CHANGE_PAUSED_TO_READY:
/* downstream is now releasing the dataflow and we can join. */
join_rtcp_thread (rtpsession);
+ rtp_session_reset (rtpsession->priv->session);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
static void
gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
{
+ GST_RTP_SESSION_LOCK (rtpsession);
g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
}
-/* called when the session manager has an RTP packet or a list of packets
- * ready for further processing */
+/* called when the session manager has an RTP packet ready to be pushed */
static GstFlowReturn
gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
GstBuffer * buffer, gpointer user_data)
GST_RTP_SESSION_UNLOCK (rtpsession);
if (rtp_src) {
- GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
- result = gst_pad_push (rtp_src, buffer);
+ if (rtpsession->priv->processed_list) {
+ GST_LOG_OBJECT (rtpsession, "queueing received RTP packet");
+ gst_buffer_list_add (rtpsession->priv->processed_list, buffer);
+ result = GST_FLOW_OK;
+ } else {
+ GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
+ result = gst_pad_push (rtp_src, buffer);
+ }
gst_object_unref (rtp_src);
} else {
GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
GST_RTP_SESSION_UNLOCK (rtpsession);
event = gst_event_new_stream_start (stream_id);
+ rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event);
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
if (have_group_id)
gst_event_set_group_id (event, group_id);
gst_pad_push_event (srcpad, event);
gst_segment_init (&seg, GST_FORMAT_TIME);
event = gst_event_new_segment (&seg);
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
gst_pad_push_event (srcpad, event);
}
* well. */
static GstFlowReturn
gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
- GstBuffer * buffer, gboolean eos, gpointer user_data)
+ GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data)
{
GstFlowReturn result;
GstRtpSession *rtpsession;
GST_LOG_OBJECT (rtpsession, "sending RTCP");
result = gst_pad_push (rtcp_src, buffer);
- /* we have to send EOS after this packet */
- if (eos) {
+ /* Forward send an EOS on the RTCP sink if we received an EOS on the
+ * send_rtp_sink. We don't need to check the recv_rtp_sink since in this
+ * case the EOS event would already have been sent */
+ if (all_sources_bye && rtpsession->send_rtp_sink &&
+ GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) {
+ GstEvent *event;
+
GST_LOG_OBJECT (rtpsession, "sending EOS");
- gst_pad_push_event (rtcp_src, gst_event_new_eos ());
+
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
+ gst_pad_push_event (rtcp_src, event);
}
gst_object_unref (rtcp_src);
} else {
GST_DEBUG_OBJECT (rtpsession, "parsing caps");
s = gst_caps_get_structure (caps, 0);
+
if (!gst_structure_get_int (s, "payload", &payload))
return;
if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
return;
+ rtp_session_update_recv_caps_structure (rtpsession->priv->session, s);
+
g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
gst_caps_ref (caps));
}
}
case GST_EVENT_FLUSH_STOP:
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+ rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
break;
case GST_EVENT_SEGMENT:
gst_object_ref (rtcp_src);
GST_RTP_SESSION_UNLOCK (rtpsession);
+ gst_event_unref (event);
+
if (rtcp_src) {
+ event = gst_event_new_eos ();
+ if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
ret = gst_pad_push_event (rtcp_src, event);
gst_object_unref (rtcp_src);
} else {
- gst_event_unref (event);
ret = TRUE;
}
break;
all_headers, count))
forward = FALSE;
} else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
- GstClockTime running_time;
guint seqnum, delay, deadline, max_delay, avg_rtt;
GST_RTP_SESSION_LOCK (rtpsession);
- rtpsession->priv->rtx_count++;
+ rtpsession->priv->recv_rtx_req_count++;
GST_RTP_SESSION_UNLOCK (rtpsession);
- if (!gst_structure_get_clock_time (s, "running-time", &running_time))
- running_time = -1;
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
if (!gst_structure_get_uint (s, "seqnum", &seqnum))
}
static gboolean
+process_received_buffer_in_list (GstBuffer ** buffer, guint idx, gpointer data)
+{
+ gint ret;
+
+ ret = gst_rtp_session_chain_recv_rtp (NULL, data, *buffer);
+ if (ret != GST_FLOW_OK)
+ GST_ERROR ("Processing individual buffer in a list failed");
+
+ /*
+ * The buffer has been processed, remove it from the original list, if it was
+ * a valid RTP buffer it has been added to the "processed" list in
+ * gst_rtp_session_process_rtp().
+ */
+ *buffer = NULL;
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_session_chain_recv_rtp_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
+ GstBufferList *processed_list;
+
+ processed_list = gst_buffer_list_new ();
+
+ /* Set some private data to detect that a buffer list is being pushed. */
+ rtpsession->priv->processed_list = processed_list;
+
+ /*
+ * Individually process the buffers from the incoming buffer list as the
+ * incoming RTP packets in the list can be mixed in all sorts of ways:
+ * - different frames,
+ * - different sources,
+ * - different types (RTP or RTCP)
+ */
+ gst_buffer_list_foreach (list,
+ (GstBufferListFunc) process_received_buffer_in_list, parent);
+
+ gst_buffer_list_unref (list);
+
+ /* Clean up private data in case the next push does not use a buffer list. */
+ rtpsession->priv->processed_list = NULL;
+
+ if (gst_buffer_list_length (processed_list) == 0 || !rtpsession->recv_rtp_src) {
+ gst_buffer_list_unref (processed_list);
+ return GST_FLOW_OK;
+ }
+
+ GST_LOG_OBJECT (rtpsession, "pushing received RTP list");
+ return gst_pad_push_list (rtpsession->recv_rtp_src, processed_list);
+}
+
+static gboolean
gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstClockTime current_time;
+ GstClockTime running_time;
guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (parent);
GST_RTP_SESSION_UNLOCK (rtpsession);
current_time = gst_clock_get_time (priv->sysclock);
- get_current_times (rtpsession, NULL, &ntpnstime);
+ get_current_times (rtpsession, &running_time, &ntpnstime);
- rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
+ rtp_session_process_rtcp (priv->session, buffer, current_time, running_time,
+ ntpnstime);
return GST_FLOW_OK; /* always return OK */
}
return TRUE;
}
-/* Recieve an RTP packet or a list of packets to be send to the receivers,
+/* Receive an RTP packet or a list of packets to be sent to the receivers,
* send to RTP session manager and forward to send_rtp_src.
*/
static GstFlowReturn
if (is_list) {
GstBuffer *buffer = NULL;
- /* All groups in an list have the same timestamp.
- * So, just take it from the first group. */
+ /* All buffers in a list have the same timestamp.
+ * So, just take it from the first buffer. */
buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
if (buffer)
timestamp = GST_BUFFER_PTS (buffer);
"recv_rtp_sink");
gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
gst_rtp_session_chain_recv_rtp);
+ gst_pad_set_chain_list_function (rtpsession->recv_rtp_sink,
+ gst_rtp_session_chain_recv_rtp_list);
gst_pad_set_event_function (rtpsession->recv_rtp_sink,
gst_rtp_session_event_recv_rtp_sink);
gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
"ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
gst_pad_push_event (send_rtp_sink, event);
+ GST_RTP_SESSION_LOCK (rtpsession);
+ rtpsession->priv->sent_rtx_req_count++;
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
if (blp == 0)
break;
}
static void
+gst_rtp_session_notify_twcc (RTPSession * sess,
+ GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data)
+{
+ GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+ GstEvent *event;
+ GstPad *send_rtp_sink;
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ if ((send_rtp_sink = rtpsession->send_rtp_sink))
+ gst_object_ref (send_rtp_sink);
+ if (rtpsession->priv->last_twcc_stats)
+ gst_structure_free (rtpsession->priv->last_twcc_stats);
+ rtpsession->priv->last_twcc_stats = twcc_stats;
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ if (send_rtp_sink) {
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, twcc_packets);
+ gst_pad_push_event (send_rtp_sink, event);
+ gst_object_unref (send_rtp_sink);
+ }
+
+ g_object_notify (G_OBJECT (rtpsession), "twcc-stats");
+}
+
+static void
gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
{
GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
gst_object_unref (send_rtp_sink);
}
}
+
+static void
+gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data)
+{
+ GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+
+ GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP");
+ /* with an early RTCP request, we might have to start the RTCP thread */
+ GST_RTP_SESSION_LOCK (rtpsession);
+ signal_waiting_rtcp_thread_unlocked (rtpsession);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+}