PROP_RTCP_SYNC_SEND_TIME
};
-#define GST_RTP_SESSION_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
-
#define GST_RTP_SESSION_LOCK(sess) g_mutex_lock (&(sess)->priv->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
GstRtpNtpTimeSource ntp_time_source;
gboolean rtcp_sync_send_time;
- guint rtx_count;
+ guint recv_rtx_req_count;
+ guint sent_rtx_req_count;
};
/* callbacks to handle actions from the session manager */
static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
RTPSource * src, gpointer data, gpointer user_data);
static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
- RTPSource * src, GstBuffer * buffer, gpointer user_data);
+ RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
GstBuffer * buffer, gpointer user_data);
static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
static void gst_rtp_session_notify_nack (RTPSession * sess,
guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
+static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
+ gpointer user_data);
+static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
static RTPSessionCallbacks callbacks = {
gst_rtp_session_process_rtp,
gst_rtp_session_request_key_unit,
gst_rtp_session_request_time,
gst_rtp_session_notify_nack,
- gst_rtp_session_reconfigure
+ gst_rtp_session_reconfigure,
+ gst_rtp_session_notify_early_rtcp
};
/* GObject vmethods */
}
#define gst_rtp_session_parent_class parent_class
-G_DEFINE_TYPE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
static void
gst_rtp_session_class_init (GstRtpSessionClass * klass)
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
- g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
-
gobject_class->finalize = gst_rtp_session_finalize;
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
*/
gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
- NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstRtpSession::on-new-ssrc:
* Various session statistics. This property returns a GstStructure
* with name application/x-rtp-session-stats with the following fields:
*
- * "rtx-count" G_TYPE_UINT The number of retransmission events
- * received from downstream (in receiver mode)
- * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
+ * "recv-rtx-req-count G_TYPE_UINT The number of retransmission event
+ * received from downstream (in receiver mode) (Since 1.16)
+ * "sent-rtx-req-count" G_TYPE_UINT The number of retransmission event
+ * sent downstream (in sender mode) (Since 1.16)
+ * "rtx-count" G_TYPE_UINT DEPRECATED Since 1.16, same as
+ * "recv-rtx-req-count".
+ * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
* dropped (due to bandwidth constraints)
- * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
- * "recv-nack-count" G_TYPE_UINT Number of NACKs received
- * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource::stats for all
+ * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
+ * "recv-nack-count" G_TYPE_UINT Number of NACKs received
+ * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource::stats for all
* RTP sources (Since 1.8)
*
* Since: 1.4
GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
"rtpsession", 0, "RTP Session");
+
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
+
}
static void
gst_rtp_session_init (GstRtpSession * rtpsession)
{
- rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
+ rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession);
g_mutex_init (&rtpsession->priv->lock);
g_cond_init (&rtpsession->priv->cond);
rtpsession->priv->sysclock = gst_system_clock_obtain ();
rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) gst_caps_unref);
+ rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
+
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
rtpsession->priv->thread_stopped = TRUE;
- rtpsession->priv->rtx_count = 0;
+ rtpsession->priv->recv_rtx_req_count = 0;
+ rtpsession->priv->sent_rtx_req_count = 0;
rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
}
GstStructure *s;
g_object_get (rtpsession->priv->session, "stats", &s, NULL);
- gst_structure_set (s, "rtx-count", G_TYPE_UINT, rtpsession->priv->rtx_count,
- NULL);
+ gst_structure_set (s, "rtx-count", G_TYPE_UINT,
+ rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT,
+ rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT,
+ rtpsession->priv->sent_rtx_req_count, NULL);
return s;
}
GST_RTP_SESSION_UNLOCK (rtpsession);
rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
GST_RTP_SESSION_LOCK (rtpsession);
-
- if (!rtp_session_get_num_sources (session)) {
- /* when no sources left in the session, all of the them have went
- * BYE at some point and removed, we can send EOS to the
- * pipeline. */
- GstPad *rtcp_src = rtpsession->send_rtcp_src;
-
- if (rtcp_src) {
- gst_object_ref (rtcp_src);
- GST_LOG_OBJECT (rtpsession, "sending EOS");
- GST_RTP_SESSION_UNLOCK (rtpsession);
- gst_pad_push_event (rtpsession->send_rtcp_src, gst_event_new_eos ());
- GST_RTP_SESSION_LOCK (rtpsession);
- gst_object_unref (rtcp_src);
- }
- }
}
/* mark the thread as stopped now */
rtpsession->priv->thread_stopped = TRUE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_RTP_SESSION_LOCK (rtpsession);
- if (rtpsession->send_rtp_src)
- rtpsession->priv->wait_send = TRUE;
+ rtpsession->priv->wait_send = TRUE;
GST_RTP_SESSION_UNLOCK (rtpsession);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
case GST_STATE_CHANGE_PAUSED_TO_READY:
/* downstream is now releasing the dataflow and we can join. */
join_rtcp_thread (rtpsession);
+ rtp_session_reset (rtpsession->priv->session);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
static void
gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
{
+ GST_RTP_SESSION_LOCK (rtpsession);
g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
}
/* called when the session manager has an RTP packet or a list of packets
GST_RTP_SESSION_UNLOCK (rtpsession);
event = gst_event_new_stream_start (stream_id);
+ rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event);
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
if (have_group_id)
gst_event_set_group_id (event, group_id);
gst_pad_push_event (srcpad, event);
gst_segment_init (&seg, GST_FORMAT_TIME);
event = gst_event_new_segment (&seg);
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
gst_pad_push_event (srcpad, event);
}
/* called when the session manager has an RTCP packet ready for further
- * sending. */
+ * sending. The eos flag is set when an EOS event should be sent downstream as
+ * well. */
static GstFlowReturn
gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
- GstBuffer * buffer, gpointer user_data)
+ GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data)
{
GstFlowReturn result;
GstRtpSession *rtpsession;
GST_LOG_OBJECT (rtpsession, "sending RTCP");
result = gst_pad_push (rtcp_src, buffer);
+ /* Forward send an EOS on the RTCP sink if we received an EOS on the
+ * send_rtp_sink. We don't need to check the recv_rtp_sink since in this
+ * case the EOS event would already have been sent */
+ if (all_sources_bye && rtpsession->send_rtp_sink &&
+ GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) {
+ GstEvent *event;
+
+ GST_LOG_OBJECT (rtpsession, "sending EOS");
+
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
+ gst_pad_push_event (rtcp_src, event);
+ }
gst_object_unref (rtcp_src);
} else {
GST_RTP_SESSION_UNLOCK (rtpsession);
}
case GST_EVENT_FLUSH_STOP:
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+ rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
break;
case GST_EVENT_SEGMENT:
gst_object_ref (rtcp_src);
GST_RTP_SESSION_UNLOCK (rtpsession);
+ gst_event_unref (event);
+
if (rtcp_src) {
+ event = gst_event_new_eos ();
+ if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
ret = gst_pad_push_event (rtcp_src, event);
gst_object_unref (rtcp_src);
} else {
- gst_event_unref (event);
ret = TRUE;
}
break;
all_headers, count))
forward = FALSE;
} else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
- GstClockTime running_time;
guint seqnum, delay, deadline, max_delay, avg_rtt;
GST_RTP_SESSION_LOCK (rtpsession);
- rtpsession->priv->rtx_count++;
+ rtpsession->priv->recv_rtx_req_count++;
GST_RTP_SESSION_UNLOCK (rtpsession);
- if (!gst_structure_get_clock_time (s, "running-time", &running_time))
- running_time = -1;
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
if (!gst_structure_get_uint (s, "seqnum", &seqnum))
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstClockTime current_time;
+ GstClockTime running_time;
guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (parent);
GST_RTP_SESSION_UNLOCK (rtpsession);
current_time = gst_clock_get_time (priv->sysclock);
- get_current_times (rtpsession, NULL, &ntpnstime);
+ get_current_times (rtpsession, &running_time, &ntpnstime);
- rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
+ rtp_session_process_rtcp (priv->session, buffer, current_time, running_time,
+ ntpnstime);
return GST_FLOW_OK; /* always return OK */
}
return ret;
}
+
static gboolean
gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
GstEvent * event)
return TRUE;
}
-/* Recieve an RTP packet or a list of packets to be send to the receivers,
+/* Receive an RTP packet or a list of packets to be sent to the receivers,
* send to RTP session manager and forward to send_rtp_src.
*/
static GstFlowReturn
if (is_list) {
GstBuffer *buffer = NULL;
- /* All groups in an list have the same timestamp.
- * So, just take it from the first group. */
+ /* All buffers in a list have the same timestamp.
+ * So, just take it from the first buffer. */
buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
if (buffer)
timestamp = GST_BUFFER_PTS (buffer);
"ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
gst_pad_push_event (send_rtp_sink, event);
+ GST_RTP_SESSION_LOCK (rtpsession);
+ rtpsession->priv->sent_rtx_req_count++;
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
if (blp == 0)
break;
gst_object_unref (send_rtp_sink);
}
}
+
+static void
+gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data)
+{
+ GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+
+ GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP");
+ /* with an early RTCP request, we might have to start the RTCP thread */
+ GST_RTP_SESSION_LOCK (rtpsession);
+ signal_waiting_rtcp_thread_unlocked (rtpsession);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+}