VOID:UINT
VOID:UINT,UINT
VOID:OBJECT,OBJECT
+VOID:BOOL,UINT64
gulong buffer_handlesync_sig;
gulong buffer_ptreq_sig;
gulong buffer_ntpstop_sig;
- gboolean buffering;
+ gint percent;
/* the PT demuxer of the SSRC */
GstElement *demux;
stream->have_sync = FALSE;
stream->unix_delta = 0;
+ stream->percent = 100;
session->streams = g_slist_prepend (session->streams, stream);
/* provide clock_rate to the jitterbuffer when needed */
stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
(GCallback) on_npt_stop, stream);
+ g_object_set_data (G_OBJECT (buffer), "GstRtpBinSession", session);
+ g_object_set_data (G_OBJECT (buffer), "GstRtpBinStream", stream);
+
/* configure latency and packet lost */
g_object_set (buffer, "latency", rtpbin->latency, NULL);
g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
case GST_MESSAGE_BUFFERING:
{
gint percent;
+ gint min_percent = 100;
+ GSList *sessions, *streams, *elements = NULL;
+ GstRtpBinStream *stream;
+ guint64 base_time = 0;
+ gboolean change = FALSE, active = FALSE;
gst_message_parse_buffering (message, &percent);
+ stream =
+ g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
+ "GstRtpBinStream");
+
+ GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
+
+ /* get the stream */
+ if (stream) {
+ GST_RTP_BIN_LOCK (rtpbin);
+ /* fill in the percent */
+ stream->percent = percent;
+
+ for (sessions = rtpbin->sessions; sessions;
+ sessions = g_slist_next (sessions)) {
+ GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
+
+ GST_RTP_SESSION_LOCK (session);
+ for (streams = session->streams; streams;
+ streams = g_slist_next (streams)) {
+ GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
+ GstElement *element = stream->buffer;
+
+ GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
+ stream->percent);
+
+ /* find min percent */
+ if (min_percent > stream->percent)
+ min_percent = stream->percent;
+
+ elements = g_slist_prepend (elements, gst_object_ref (element));
+ }
+ GST_RTP_SESSION_UNLOCK (session);
+ }
+ GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
+
+ if (rtpbin->buffering) {
+ if (min_percent == 100) {
+ rtpbin->buffering = FALSE;
+ active = TRUE;
+ change = TRUE;
+ }
+ } else {
+ /* pause the streams */
+ rtpbin->buffering = TRUE;
+ active = FALSE;
+ change = TRUE;
+ }
+ GST_RTP_BIN_UNLOCK (rtpbin);
+
+ gst_message_unref (message);
+ message =
+ gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
+
+ if (change) {
+ while (elements) {
+ GstElement *element = elements->data;
+
+ GST_DEBUG_OBJECT (bin, "setting %p to %d", element, active);
+ g_signal_emit_by_name (element, "set-active", active, base_time,
+ NULL);
+ gst_object_unref (element);
+ elements = g_slist_delete_link (elements, elements);
+ }
+ }
+ }
+
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
gboolean do_lost;
gboolean ignore_pt;
RTPJitterBufferMode buffer_mode;
+ gboolean buffering;
/* a list of session */
GSList *sessions;
SIGNAL_CLEAR_PT_MAP,
SIGNAL_HANDLE_SYNC,
SIGNAL_ON_NPT_STOP,
+ SIGNAL_SET_ACTIVE,
LAST_SIGNAL
};
GCond *jbuf_cond;
gboolean waiting;
gboolean discont;
+ gboolean active;
/* properties */
guint latency_ms;
static void
gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
static void
-do_stats_cb (RTPJitterBuffer * jbuf, guint percent,
- GstRtpJitterBuffer * jitterbuffer);
+gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
+ gboolean active, guint64 base_time);
static void
gst_rtp_jitter_buffer_base_init (gpointer klass)
G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+ /**
+ * GstRtpJitterBuffer::set-active:
+ * @buffer: the object which received the signal
+ *
+ * Start pushing out packets with the given base time. This signal is only
+ * useful in buffering mode.
+ */
+ gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
+ g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
+ gst_rtp_bin_marshal_VOID__BOOL_UINT64, G_TYPE_NONE, 2, G_TYPE_BOOLEAN,
+ G_TYPE_UINT64);
+
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
+ klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
GST_DEBUG_CATEGORY_INIT
(rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
priv->jbuf = rtp_jitter_buffer_new ();
rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
- rtp_jitter_buffer_set_stats_cb (priv->jbuf, (RTPBufferingStats) do_stats_cb,
- jitterbuffer);
priv->jbuf_lock = g_mutex_new ();
priv->jbuf_cond = g_cond_new ();
JBUF_UNLOCK (priv);
}
+static void
+gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
+ guint64 base_time)
+{
+ GstRtpJitterBufferPrivate *priv;
+
+ priv = jbuf->priv;
+
+ JBUF_LOCK (priv);
+ GST_DEBUG_OBJECT (jbuf, "setting active %d at time %" GST_TIME_FORMAT, active,
+ GST_TIME_ARGS (base_time));
+ priv->active = active;
+ JBUF_SIGNAL (priv);
+ JBUF_UNLOCK (priv);
+}
+
static GstCaps *
gst_rtp_jitter_buffer_getcaps (GstPad * pad)
{
}
static void
-do_stats_cb (RTPJitterBuffer * jbuf, guint percent,
- GstRtpJitterBuffer * jitterbuffer)
+post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
{
GstMessage *message;
GstClockTime timestamp;
guint64 latency_ts;
gboolean tail;
+ gint percent = -1;
guint8 pt;
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
GstBuffer *old_buf;
- old_buf = rtp_jitter_buffer_pop (priv->jbuf);
+ old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
gst_rtp_buffer_get_seq (old_buf));
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
- priv->clock_rate, priv->latency_ns, &tail)))
+ priv->clock_rate, &tail, &percent)))
goto duplicate;
/* signal addition of new buffer when the _loop is waiting. */
finished:
JBUF_UNLOCK (priv);
+ if (percent != -1)
+ post_buffering_percent (jitterbuffer, percent);
+
gst_object_unref (jitterbuffer);
return ret;
GstClock *clock;
GstClockID id;
GstClockTime sync_time;
+ gint percent = -1;
priv = jitterbuffer->priv;
/* always wait if we are blocked */
if (G_LIKELY (!priv->blocked)) {
/* we're buffering but not EOS, wait. */
- if (!priv->eos && rtp_jitter_buffer_is_buffering (priv->jbuf))
+ if (!priv->eos && (!priv->active
+ || rtp_jitter_buffer_is_buffering (priv->jbuf)))
goto do_wait;
/* if we have a packet, we can exit the loop and grab it */
if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
if (G_UNLIKELY (gap < 0)) {
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
- outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
gst_buffer_unref (outbuf);
goto again;
}
push_buffer:
/* when we get here we are ready to pop and push the buffer */
- outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
if (G_UNLIKELY (discont || priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
priv->next_seqnum = (seqnum + 1) & 0xffff;
JBUF_UNLOCK (priv);
+ if (percent != -1)
+ post_buffering_percent (jitterbuffer, percent);
+
/* push buffer */
GST_DEBUG_OBJECT (jitterbuffer,
"Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
/* actions */
void (*clear_pt_map) (GstRtpJitterBuffer *buffer);
+ void (*set_active) (GstRtpJitterBuffer *buffer, gboolean active, guint64 base_time);
+
/*< private > */
gpointer _gst_reserved[GST_PADDING];
};
}
/**
- * rtp_jitter_buffer_set_stats_cb:
- * @jbuf: an #RTPJitterBuffer
- * @stats: the stats callback
- * @user_data: user data passed to the callback
- *
- * Install a callbacl that will be called when the buffering state of @jbuf
- * changed.
- */
-void
-rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer * jbuf,
- RTPBufferingStats stats_cb, gpointer user_data)
-{
- jbuf->stats_cb = stats_cb;
- jbuf->stats_data = user_data;
-}
-
-
-/**
* rtp_jitter_buffer_get_mode:
* @jbuf: an #RTPJitterBuffer
*
}
static void
-update_buffer_level (RTPJitterBuffer * jbuf)
+update_buffer_level (RTPJitterBuffer * jbuf, gint * percent)
{
GstBuffer *high_buf, *low_buf;
gboolean post = FALSE;
}
}
if (post) {
- gint percent;
+ gint perc;
if (jbuf->buffering) {
- percent = (jbuf->level * 100 / jbuf->delay);
- percent = MIN (percent, 100);
+ perc = (jbuf->level * 100 / jbuf->delay);
+ perc = MIN (perc, 100);
} else {
- percent = 100;
+ perc = 100;
}
- if (jbuf->stats_cb)
- jbuf->stats_cb (jbuf, percent, jbuf->stats_data);
+ if (percent)
+ *percent = perc;
- GST_DEBUG ("buffering %d", percent);
+ GST_DEBUG ("buffering %d", perc);
}
}
*/
gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
- GstClockTime time, guint32 clock_rate, GstClockTime max_delay,
- gboolean * tail)
+ GstClockTime time, guint32 clock_rate, gboolean * tail, gint * percent)
{
GList *list;
guint32 rtptime;
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
- update_buffer_level (jbuf);
+ update_buffer_level (jbuf, percent);
+ else
+ *percent = -1;
/* tail was changed when we did not find a previous packet, we set the return
* flag when requested. */
/**
* rtp_jitter_buffer_pop:
* @jbuf: an #RTPJitterBuffer
+ * @percent: the buffering percent
*
* Pops the oldest buffer from the packet queue of @jbuf. The popped buffer will
* have its timestamp adjusted with the incomming running_time and the detected
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
*/
GstBuffer *
-rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
+rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
{
GstBuffer *buf;
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
- update_buffer_level (jbuf);
+ update_buffer_level (jbuf, percent);
+ else
+ *percent = -1;
return buf;
}
#define RTP_TYPE_JITTER_BUFFER_MODE (rtp_jitter_buffer_mode_get_type())
GType rtp_jitter_buffer_mode_get_type (void);
-/**
- * RTPBufferingStats:
- * @jbuf: an #RTPJitterBuffer
- * @percent: the buffering percent
- * @user_data: user data specified when registering
- *
- * Called when buffering is going on in @jbuf.
- */
-typedef void (*RTPBufferingStats) (RTPJitterBuffer *jbuf, guint percent, gpointer user_data);
-
#define RTP_JITTER_BUFFER_MAX_WINDOW 512
/**
* RTPJitterBuffer:
guint64 level;
guint64 low_level;
guint64 high_level;
- RTPBufferingStats stats_cb;
- gpointer stats_data;
/* for calculating skew */
GstClockTime base_time;
/* managing lifetime */
RTPJitterBuffer* rtp_jitter_buffer_new (void);
-void rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer *jbuf, RTPBufferingStats stats_cb,
- gpointer user_data);
-
RTPJitterBufferMode rtp_jitter_buffer_get_mode (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_mode (RTPJitterBuffer *jbuf, RTPJitterBufferMode mode);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf,
GstClockTime time,
guint32 clock_rate,
- GstClockTime max_delay,
- gboolean *tail);
+ gboolean *tail, gint *percent);
GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
-GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf);
+GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);