From 20a27a545a45d2d4e20115f9ae99474d18472e65 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 5 Oct 2009 19:45:35 +0200 Subject: [PATCH] rtpbin: more buffering updates Add signal to pause the jitterbuffer. This will be emitted from gstrtpbin when one of the jitterbuffers is buffering. Make rtpbin collect the buffering messages and post a new buffering message with the min value. Remove the stats callback from jitterbuffer but pass a percent integer to functions that affect the buffering state of the jitterbuffer. This allows us then to post buffering messages from outside of the jitterbuffer lock. --- gst/rtpmanager/gstrtpbin-marshal.list | 1 + gst/rtpmanager/gstrtpbin.c | 77 ++++++++++++++++++++++++++++++++++- gst/rtpmanager/gstrtpbin.h | 1 + gst/rtpmanager/gstrtpjitterbuffer.c | 61 ++++++++++++++++++++++----- gst/rtpmanager/gstrtpjitterbuffer.h | 2 + gst/rtpmanager/rtpjitterbuffer.c | 48 ++++++++-------------- gst/rtpmanager/rtpjitterbuffer.h | 20 +-------- 7 files changed, 149 insertions(+), 61 deletions(-) diff --git a/gst/rtpmanager/gstrtpbin-marshal.list b/gst/rtpmanager/gstrtpbin-marshal.list index ed73e43..1f6d2b7 100644 --- a/gst/rtpmanager/gstrtpbin-marshal.list +++ b/gst/rtpmanager/gstrtpbin-marshal.list @@ -6,3 +6,4 @@ VOID:UINT,OBJECT VOID:UINT VOID:UINT,UINT VOID:OBJECT,OBJECT +VOID:BOOL,UINT64 diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 0326878..16cf77f 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -297,7 +297,7 @@ struct _GstRtpBinStream gulong buffer_handlesync_sig; gulong buffer_ptreq_sig; gulong buffer_ntpstop_sig; - gboolean buffering; + gint percent; /* the PT demuxer of the SSRC */ GstElement *demux; @@ -1167,6 +1167,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->have_sync = FALSE; stream->unix_delta = 0; + stream->percent = 100; session->streams = g_slist_prepend (session->streams, stream); /* provide clock_rate to the jitterbuffer when needed */ @@ -1175,6 +1176,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop", (GCallback) on_npt_stop, stream); + g_object_set_data (G_OBJECT (buffer), "GstRtpBinSession", session); + g_object_set_data (G_OBJECT (buffer), "GstRtpBinStream", stream); + /* configure latency and packet lost */ g_object_set (buffer, "latency", rtpbin->latency, NULL); g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL); @@ -1779,9 +1783,80 @@ gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message) case GST_MESSAGE_BUFFERING: { gint percent; + gint min_percent = 100; + GSList *sessions, *streams, *elements = NULL; + GstRtpBinStream *stream; + guint64 base_time = 0; + gboolean change = FALSE, active = FALSE; gst_message_parse_buffering (message, &percent); + stream = + g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)), + "GstRtpBinStream"); + + GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream); + + /* get the stream */ + if (stream) { + GST_RTP_BIN_LOCK (rtpbin); + /* fill in the percent */ + stream->percent = percent; + + for (sessions = rtpbin->sessions; sessions; + sessions = g_slist_next (sessions)) { + GstRtpBinSession *session = (GstRtpBinSession *) sessions->data; + + GST_RTP_SESSION_LOCK (session); + for (streams = session->streams; streams; + streams = g_slist_next (streams)) { + GstRtpBinStream *stream = (GstRtpBinStream *) streams->data; + GstElement *element = stream->buffer; + + GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream, + stream->percent); + + /* find min percent */ + if (min_percent > stream->percent) + min_percent = stream->percent; + + elements = g_slist_prepend (elements, gst_object_ref (element)); + } + GST_RTP_SESSION_UNLOCK (session); + } + GST_DEBUG_OBJECT (bin, "min percent %d", min_percent); + + if (rtpbin->buffering) { + if (min_percent == 100) { + rtpbin->buffering = FALSE; + active = TRUE; + change = TRUE; + } + } else { + /* pause the streams */ + rtpbin->buffering = TRUE; + active = FALSE; + change = TRUE; + } + GST_RTP_BIN_UNLOCK (rtpbin); + + gst_message_unref (message); + message = + gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent); + + if (change) { + while (elements) { + GstElement *element = elements->data; + + GST_DEBUG_OBJECT (bin, "setting %p to %d", element, active); + g_signal_emit_by_name (element, "set-active", active, base_time, + NULL); + gst_object_unref (element); + elements = g_slist_delete_link (elements, elements); + } + } + } + GST_BIN_CLASS (parent_class)->handle_message (bin, message); break; } diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 44b6b01..a222f7a 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -49,6 +49,7 @@ struct _GstRtpBin { gboolean do_lost; gboolean ignore_pt; RTPJitterBufferMode buffer_mode; + gboolean buffering; /* a list of session */ GSList *sessions; diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 2d7be3d..69dbdc8 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -85,6 +85,7 @@ enum SIGNAL_CLEAR_PT_MAP, SIGNAL_HANDLE_SYNC, SIGNAL_ON_NPT_STOP, + SIGNAL_SET_ACTIVE, LAST_SIGNAL }; @@ -134,6 +135,7 @@ struct _GstRtpJitterBufferPrivate GCond *jbuf_cond; gboolean waiting; gboolean discont; + gboolean active; /* properties */ guint latency_ms; @@ -266,8 +268,8 @@ static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query); static void gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer); static void -do_stats_cb (RTPJitterBuffer * jbuf, guint percent, - GstRtpJitterBuffer * jitterbuffer); +gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer, + gboolean active, guint64 base_time); static void gst_rtp_jitter_buffer_base_init (gpointer klass) @@ -403,6 +405,20 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + /** + * GstRtpJitterBuffer::set-active: + * @buffer: the object which received the signal + * + * Start pushing out packets with the given base time. This signal is only + * useful in buffering mode. + */ + gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] = + g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL, + gst_rtp_bin_marshal_VOID__BOOL_UINT64, G_TYPE_NONE, 2, G_TYPE_BOOLEAN, + G_TYPE_UINT64); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state); gstelement_class->request_new_pad = @@ -411,6 +427,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad); klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map); + klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active); GST_DEBUG_CATEGORY_INIT (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer"); @@ -432,8 +449,6 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer, priv->jbuf = rtp_jitter_buffer_new (); rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns); - rtp_jitter_buffer_set_stats_cb (priv->jbuf, (RTPBufferingStats) do_stats_cb, - jitterbuffer); priv->jbuf_lock = g_mutex_new (); priv->jbuf_cond = g_cond_new (); @@ -631,6 +646,22 @@ gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer) JBUF_UNLOCK (priv); } +static void +gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active, + guint64 base_time) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jbuf->priv; + + JBUF_LOCK (priv); + GST_DEBUG_OBJECT (jbuf, "setting active %d at time %" GST_TIME_FORMAT, active, + GST_TIME_ARGS (base_time)); + priv->active = active; + JBUF_SIGNAL (priv); + JBUF_UNLOCK (priv); +} + static GstCaps * gst_rtp_jitter_buffer_getcaps (GstPad * pad) { @@ -1119,8 +1150,7 @@ parse_failed: } static void -do_stats_cb (RTPJitterBuffer * jbuf, guint percent, - GstRtpJitterBuffer * jitterbuffer) +post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent) { GstMessage *message; @@ -1141,6 +1171,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) GstClockTime timestamp; guint64 latency_ts; gboolean tail; + gint percent = -1; guint8 pt; jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); @@ -1256,7 +1287,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) { GstBuffer *old_buf; - old_buf = rtp_jitter_buffer_pop (priv->jbuf); + old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent); GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d", gst_rtp_buffer_get_seq (old_buf)); @@ -1273,7 +1304,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, - priv->clock_rate, priv->latency_ns, &tail))) + priv->clock_rate, &tail, &percent))) goto duplicate; /* signal addition of new buffer when the _loop is waiting. */ @@ -1295,6 +1326,9 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) finished: JBUF_UNLOCK (priv); + if (percent != -1) + post_buffering_percent (jitterbuffer, percent); + gst_object_unref (jitterbuffer); return ret; @@ -1427,6 +1461,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) GstClock *clock; GstClockID id; GstClockTime sync_time; + gint percent = -1; priv = jitterbuffer->priv; @@ -1438,7 +1473,8 @@ again: /* always wait if we are blocked */ if (G_LIKELY (!priv->blocked)) { /* we're buffering but not EOS, wait. */ - if (!priv->eos && rtp_jitter_buffer_is_buffering (priv->jbuf)) + if (!priv->eos && (!priv->active + || rtp_jitter_buffer_is_buffering (priv->jbuf))) goto do_wait; /* if we have a packet, we can exit the loop and grab it */ if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0) @@ -1517,7 +1553,7 @@ again: if (G_UNLIKELY (gap < 0)) { GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", seqnum, next_seqnum); - outbuf = rtp_jitter_buffer_pop (priv->jbuf); + outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent); gst_buffer_unref (outbuf); goto again; } @@ -1656,7 +1692,7 @@ again: push_buffer: /* when we get here we are ready to pop and push the buffer */ - outbuf = rtp_jitter_buffer_pop (priv->jbuf); + outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent); if (G_UNLIKELY (discont || priv->discont)) { /* set DISCONT flag when we missed a packet. We pushed the buffer writable @@ -1718,6 +1754,9 @@ push_buffer: priv->next_seqnum = (seqnum + 1) & 0xffff; JBUF_UNLOCK (priv); + if (percent != -1) + post_buffering_percent (jitterbuffer, percent); + /* push buffer */ GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum, diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index 6d7610e..68604a2 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -77,6 +77,8 @@ struct _GstRtpJitterBufferClass /* actions */ void (*clear_pt_map) (GstRtpJitterBuffer *buffer); + void (*set_active) (GstRtpJitterBuffer *buffer, gboolean active, guint64 base_time); + /*< private > */ gpointer _gst_reserved[GST_PADDING]; }; diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index 8397173..7c54da9 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -120,24 +120,6 @@ rtp_jitter_buffer_new (void) } /** - * rtp_jitter_buffer_set_stats_cb: - * @jbuf: an #RTPJitterBuffer - * @stats: the stats callback - * @user_data: user data passed to the callback - * - * Install a callbacl that will be called when the buffering state of @jbuf - * changed. - */ -void -rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer * jbuf, - RTPBufferingStats stats_cb, gpointer user_data) -{ - jbuf->stats_cb = stats_cb; - jbuf->stats_data = user_data; -} - - -/** * rtp_jitter_buffer_get_mode: * @jbuf: an #RTPJitterBuffer * @@ -225,7 +207,7 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time, } static void -update_buffer_level (RTPJitterBuffer * jbuf) +update_buffer_level (RTPJitterBuffer * jbuf, gint * percent) { GstBuffer *high_buf, *low_buf; gboolean post = FALSE; @@ -262,19 +244,19 @@ update_buffer_level (RTPJitterBuffer * jbuf) } } if (post) { - gint percent; + gint perc; if (jbuf->buffering) { - percent = (jbuf->level * 100 / jbuf->delay); - percent = MIN (percent, 100); + perc = (jbuf->level * 100 / jbuf->delay); + perc = MIN (perc, 100); } else { - percent = 100; + perc = 100; } - if (jbuf->stats_cb) - jbuf->stats_cb (jbuf, percent, jbuf->stats_data); + if (percent) + *percent = perc; - GST_DEBUG ("buffering %d", percent); + GST_DEBUG ("buffering %d", perc); } } @@ -575,8 +557,7 @@ no_skew: */ gboolean rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, - GstClockTime time, guint32 clock_rate, GstClockTime max_delay, - gboolean * tail) + GstClockTime time, guint32 clock_rate, gboolean * tail, gint * percent) { GList *list; guint32 rtptime; @@ -642,7 +623,9 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, /* buffering mode, update buffer stats */ if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER) - update_buffer_level (jbuf); + update_buffer_level (jbuf, percent); + else + *percent = -1; /* tail was changed when we did not find a previous packet, we set the return * flag when requested. */ @@ -662,6 +645,7 @@ duplicate: /** * rtp_jitter_buffer_pop: * @jbuf: an #RTPJitterBuffer + * @percent: the buffering percent * * Pops the oldest buffer from the packet queue of @jbuf. The popped buffer will * have its timestamp adjusted with the incomming running_time and the detected @@ -670,7 +654,7 @@ duplicate: * Returns: a #GstBuffer or %NULL when there was no packet in the queue. */ GstBuffer * -rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf) +rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent) { GstBuffer *buf; @@ -680,7 +664,9 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf) /* buffering mode, update buffer stats */ if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER) - update_buffer_level (jbuf); + update_buffer_level (jbuf, percent); + else + *percent = -1; return buf; } diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index c8f28d3..310cec0 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -58,16 +58,6 @@ typedef enum { #define RTP_TYPE_JITTER_BUFFER_MODE (rtp_jitter_buffer_mode_get_type()) GType rtp_jitter_buffer_mode_get_type (void); -/** - * RTPBufferingStats: - * @jbuf: an #RTPJitterBuffer - * @percent: the buffering percent - * @user_data: user data specified when registering - * - * Called when buffering is going on in @jbuf. - */ -typedef void (*RTPBufferingStats) (RTPJitterBuffer *jbuf, guint percent, gpointer user_data); - #define RTP_JITTER_BUFFER_MAX_WINDOW 512 /** * RTPJitterBuffer: @@ -88,8 +78,6 @@ struct _RTPJitterBuffer { guint64 level; guint64 low_level; guint64 high_level; - RTPBufferingStats stats_cb; - gpointer stats_data; /* for calculating skew */ GstClockTime base_time; @@ -117,9 +105,6 @@ GType rtp_jitter_buffer_get_type (void); /* managing lifetime */ RTPJitterBuffer* rtp_jitter_buffer_new (void); -void rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer *jbuf, RTPBufferingStats stats_cb, - gpointer user_data); - RTPJitterBufferMode rtp_jitter_buffer_get_mode (RTPJitterBuffer *jbuf); void rtp_jitter_buffer_set_mode (RTPJitterBuffer *jbuf, RTPJitterBufferMode mode); @@ -131,10 +116,9 @@ void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf) gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf, GstClockTime time, guint32 clock_rate, - GstClockTime max_delay, - gboolean *tail); + gboolean *tail, gint *percent); GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf); -GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf); +GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent); void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf); -- 2.7.4