if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
-
#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
-#define JBUF_WAIT(priv) (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
-#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
- JBUF_WAIT(priv); \
- if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
- goto label; \
+#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
+ (priv)->waiting_timer = TRUE; \
+ g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
+ (priv)->waiting_timer = FALSE; \
+} G_STMT_END
+#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_timer)) \
+ g_cond_signal (&(priv)->jbuf_timer); \
} G_STMT_END
-#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
+#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
+ (priv)->waiting_event = TRUE; \
+ g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
+ (priv)->waiting_event = FALSE; \
+ if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
+ goto label; \
+} G_STMT_END
+#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_event)) \
+ g_cond_signal (&(priv)->jbuf_event); \
+} G_STMT_END
struct _GstRtpJitterBufferPrivate
{
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
- GCond jbuf_cond;
- gboolean waiting;
+ gboolean waiting_timer;
+ GCond jbuf_timer;
+ gboolean waiting_event;
+ GCond jbuf_event;
gboolean discont;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
+ gboolean timer_running;
+ GThread *timer_thread;
+
/* properties */
guint latency_ms;
guint64 latency_ns;
static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
+static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
+
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{
priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
- g_cond_init (&priv->jbuf_cond);
+ g_cond_init (&priv->jbuf_timer);
+ g_cond_init (&priv->jbuf_event);
/* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf);
g_array_free (jitterbuffer->priv->timers, TRUE);
g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
- g_cond_clear (&jitterbuffer->priv->jbuf_cond);
+ g_cond_clear (&jitterbuffer->priv->jbuf_timer);
+ g_cond_clear (&jitterbuffer->priv->jbuf_event);
g_object_unref (jitterbuffer->priv->jbuf);
GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->out_offset));
priv->active = active;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
}
if (!active) {
rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
priv->srcresult = GST_FLOW_FLUSHING;
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
/* unlock clock, we just unschedule, the entry will be released by the
* locking streaming thread. */
unschedule_current_timer (jitterbuffer);
priv->last_pt = -1;
/* block until we go to PLAYING */
priv->blocked = TRUE;
+ priv->timer_running = TRUE;
+ priv->timer_thread =
+ g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
JBUF_LOCK (priv);
/* unblock to allow streaming in PLAYING */
priv->blocked = FALSE;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
JBUF_UNLOCK (priv);
break;
default:
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
+ priv->timer_running = FALSE;
+ JBUF_SIGNAL_TIMER (priv);
+ JBUF_UNLOCK (priv);
+ g_thread_join (priv->timer_thread);
+ priv->timer_thread = NULL;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
if (ret && !priv->eos) {
GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
priv->eos = TRUE;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
} else if (priv->eos) {
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
} else {
timer->rtx_retry = 0;
}
recalculate_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_TIMER (priv);
return timer;
}
continue;
if (gap == 0) {
+ GST_DEBUG ("found timer for current seqnum");
/* the timer for the current seqnum */
timer = test;
} else if (gap > priv->rtx_delay_reorder) {
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
}
}
expected_dts = priv->last_in_dts + duration;
if (priv->do_retransmission) {
- expected++;
type = TIMER_TYPE_EXPECTED;
+ if (find_timer (jitterbuffer, type, expected))
+ expected++;
} else {
type = TIMER_TYPE_LOST;
}
do_handle_sync (jitterbuffer);
/* signal addition of new buffer when the _loop is waiting. */
- if (priv->waiting && priv->active)
- JBUF_SIGNAL (priv);
+ if (priv->active && priv->waiting_timer)
+ JBUF_SIGNAL_EVENT (priv);
/* let's unschedule and unblock any waiting buffers. We only want to do this
* when the tail buffer changed */
}
/* the timeout for when we expected a packet expired */
-static GstFlowReturn
+static void
do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
GstClockTimeDiff clock_jitter)
{
}
reschedule_timer (jitterbuffer, timer, timer->seqnum,
timer->rtx_base + timer->rtx_retry);
-
- return priv->srcresult;
}
/* a packet is lost */
-static GstFlowReturn
+static void
do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
GstClockTimeDiff clock_jitter)
{
priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
/* remove timer now */
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
if (priv->do_lost) {
GstEvent *event;
"late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
JBUF_UNLOCK (priv);
gst_pad_push_event (priv->srcpad, event);
- JBUF_LOCK_CHECK (priv, flushing);
- }
- return GST_FLOW_OK;
-
- /* ERRORS */
-flushing:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- return priv->srcresult;
+ JBUF_LOCK (priv);
}
}
-static GstFlowReturn
+static void
do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
+}
- return GST_FLOW_EOS;
+static void
+do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTimeDiff clock_jitter)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+ GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
+
+ priv->next_seqnum = timer->seqnum;
+ remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
}
/* called when we need to wait for the next timeout.
*
* If there are no timers, we wait on a gcond until something new happens.
*/
-static GstFlowReturn
+static void
wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstFlowReturn result = GST_FLOW_OK;
- gint i, len;
- TimerData *timer = NULL;
- GstClockTime timer_timeout = -1;
- gint timer_idx;
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
- GstClockTime test_timeout = get_timeout (jitterbuffer, test);
-
- GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT,
- i, test->seqnum, GST_TIME_ARGS (test_timeout));
-
- /* find the smallest timeout */
- if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
- timer = test;
- timer_timeout = test_timeout;
- if (timer_timeout == -1)
- break;
+ JBUF_LOCK (priv);
+ while (priv->timer_running) {
+ TimerData *timer = NULL;
+ GstClockTime timer_timeout = -1;
+ gint i, len;
+ gint timer_idx;
+
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ GstClockTime test_timeout = get_timeout (jitterbuffer, test);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
+ i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
+
+ /* find the smallest timeout */
+ if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
+ timer = test;
+ timer_timeout = test_timeout;
+ if (timer_timeout == -1)
+ break;
+ }
}
- }
- if (timer) {
- GstClock *clock;
- GstClockTime sync_time;
- GstClockID id;
- GstClockReturn ret;
- GstClockTimeDiff clock_jitter;
-
- /* no timestamp, timeout immeditately */
- if (timer_timeout == -1)
- goto do_timeout;
+ if (timer) {
+ GstClock *clock;
+ GstClockTime sync_time;
+ GstClockID id;
+ GstClockReturn ret;
+ GstClockTimeDiff clock_jitter;
+
+ /* no timestamp, timeout immeditately */
+ if (timer_timeout == -1)
+ goto do_timeout;
+
+ GST_OBJECT_LOCK (jitterbuffer);
+ clock = GST_ELEMENT_CLOCK (jitterbuffer);
+ if (!clock) {
+ GST_OBJECT_UNLOCK (jitterbuffer);
+ /* let's just push if there is no clock */
+ GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
+ goto do_timeout;
+ }
- GST_OBJECT_LOCK (jitterbuffer);
- clock = GST_ELEMENT_CLOCK (jitterbuffer);
- if (!clock) {
+ /* prepare for sync against clock */
+ sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ /* add latency of peer to get input time */
+ sync_time += priv->peer_latency;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
+ " with sync time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
+
+ /* create an entry for the clock */
+ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
+ priv->unscheduled = FALSE;
+ priv->timer_timeout = timer_timeout;
+ priv->timer_seqnum = timer->seqnum;
+ timer_idx = timer->idx;
GST_OBJECT_UNLOCK (jitterbuffer);
- /* let's just push if there is no clock */
- GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
- goto do_timeout;
- }
-
- /* prepare for sync against clock */
- sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
- /* add latency of peer to get input time */
- sync_time += priv->peer_latency;
- GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
- " with sync time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
-
- /* create an entry for the clock */
- id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
- priv->unscheduled = FALSE;
- priv->timer_timeout = timer_timeout;
- priv->timer_seqnum = timer->seqnum;
- timer_idx = timer->idx;
- GST_OBJECT_UNLOCK (jitterbuffer);
-
- /* release the lock so that the other end can push stuff or unlock */
- JBUF_UNLOCK (priv);
-
- ret = gst_clock_id_wait (id, &clock_jitter);
+ /* release the lock so that the other end can push stuff or unlock */
+ JBUF_UNLOCK (priv);
- JBUF_LOCK (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
- ret, priv->timer_seqnum, clock_jitter);
- /* and free the entry */
- gst_clock_id_unref (id);
- priv->clock_id = NULL;
-
- /* at this point, the clock could have been unlocked by a timeout, a new
- * tail element was added to the queue or because we are shutting down. Check
- * for shutdown first. */
- if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
- goto flushing;
-
- if (priv->timers->len <= timer_idx)
- goto done;
-
- /* we released the lock, the array might have changed */
- timer = &g_array_index (priv->timers, TimerData, timer_idx);
- /* if changed to timeout immediately, do so */
- if (timer->timeout == -1)
- goto do_timeout;
-
- /* if we got unscheduled and we are not flushing, it's because a new tail
- * element became available in the queue or we flushed the queue.
- * Grab it and try to push or sync. */
- if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
- GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
- goto done;
- }
+ ret = gst_clock_id_wait (id, &clock_jitter);
- do_timeout:
- switch (timer->type) {
- case TIMER_TYPE_EXPECTED:
- result = do_expected_timeout (jitterbuffer, timer, clock_jitter);
- break;
- case TIMER_TYPE_LOST:
- result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
- break;
- case TIMER_TYPE_DEADLINE:
- priv->next_seqnum = timer->seqnum;
- remove_timer (jitterbuffer, timer);
- break;
- case TIMER_TYPE_EOS:
- result = do_eos_timeout (jitterbuffer, timer);
+ JBUF_LOCK (priv);
+ if (!priv->timer_running)
break;
- }
- } else {
- /* no timers, wait for activity */
- GST_DEBUG_OBJECT (jitterbuffer, "waiting");
- priv->waiting = TRUE;
- JBUF_WAIT_CHECK (priv, flushing);
- priv->waiting = FALSE;
- GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
- }
-done:
- return result;
+ GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
+ ret, priv->timer_seqnum, clock_jitter);
+ /* and free the entry */
+ gst_clock_id_unref (id);
+ priv->clock_id = NULL;
+
+ if (priv->timers->len <= timer_idx)
+ continue;
+
+ /* we released the lock, the array might have changed */
+ timer = &g_array_index (priv->timers, TimerData, timer_idx);
+ /* if changed to timeout immediately, do so */
+ if (timer->timeout == -1)
+ goto do_timeout;
+
+ /* if we got unscheduled and we are not flushing, it's because a new tail
+ * element became available in the queue or we flushed the queue.
+ * Grab it and try to push or sync. */
+ if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
+ continue;
+ }
-flushing:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- return priv->srcresult;
+ do_timeout:
+ switch (timer->type) {
+ case TIMER_TYPE_EXPECTED:
+ do_expected_timeout (jitterbuffer, timer, clock_jitter);
+ break;
+ case TIMER_TYPE_LOST:
+ do_lost_timeout (jitterbuffer, timer, clock_jitter);
+ break;
+ case TIMER_TYPE_DEADLINE:
+ do_deadline_timeout (jitterbuffer, timer, clock_jitter);
+ break;
+ case TIMER_TYPE_EOS:
+ do_eos_timeout (jitterbuffer, timer);
+ break;
+ }
+ } else {
+ /* no timers, wait for activity */
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting");
+ JBUF_WAIT_TIMER (priv);
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
+ }
}
+ GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
+ return;
}
/*
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
- if (G_LIKELY (result == GST_FLOW_WAIT))
+ if (G_LIKELY (result == GST_FLOW_WAIT)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
/* now wait for the next event */
- result = wait_next_timeout (jitterbuffer);
+ JBUF_WAIT_EVENT (priv, flushing);
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
+ result = GST_FLOW_OK;
+ }
}
while (result == GST_FLOW_OK);
JBUF_UNLOCK (priv);