guint buffering_iteration;
/* for measuring input/output rates */
+ GTimer *in_timer;
+ gboolean in_timer_started;
+ gdouble last_in_elapsed;
guint64 bytes_in;
- guint64 bytes_out;
- GTimer *timer;
gdouble byte_in_rate;
+
+ GTimer *out_timer;
+ gboolean out_timer_started;
+ gdouble last_out_elapsed;
+ guint64 bytes_out;
gdouble byte_out_rate;
- gdouble last_elapsed;
- gboolean timer_started;
GMutex *qlock; /* lock for queue (vs object lock) */
gboolean waiting_add;
queue->srcresult = GST_FLOW_WRONG_STATE;
queue->is_eos = FALSE;
- queue->timer = g_timer_new ();
+ queue->in_timer = g_timer_new ();
+ queue->out_timer = g_timer_new ();
queue->qlock = g_mutex_new ();
queue->waiting_add = FALSE;
g_mutex_free (queue->qlock);
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
- g_timer_destroy (queue->timer);
+ g_timer_destroy (queue->in_timer);
+ g_timer_destroy (queue->out_timer);
/* temp_file path cleanup */
if (queue->temp_location != NULL)
queue->bytes_out = 0;
queue->byte_in_rate = 0.0;
queue->byte_out_rate = 0.0;
- queue->last_elapsed = 0.0;
- queue->timer_started = FALSE;
+ queue->last_in_elapsed = 0.0;
+ queue->last_out_elapsed = 0.0;
+ queue->in_timer_started = FALSE;
+ queue->out_timer_started = FALSE;
}
/* the interval in seconds to recalculate the rate */
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
static void
-update_rates (GstQueue * queue)
+update_in_rates (GstQueue * queue)
{
gdouble elapsed, period;
gdouble byte_in_rate;
- gdouble byte_out_rate;
- if (!queue->timer_started) {
- queue->timer_started = TRUE;
- g_timer_start (queue->timer);
+ if (!queue->in_timer_started) {
+ queue->in_timer_started = TRUE;
+ g_timer_start (queue->in_timer);
return;
}
- elapsed = g_timer_elapsed (queue->timer, NULL);
+ elapsed = g_timer_elapsed (queue->in_timer, NULL);
/* recalc after each interval. */
- if (queue->last_elapsed + RATE_INTERVAL < elapsed) {
- period = elapsed - queue->last_elapsed;
+ if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
+ period = elapsed - queue->last_in_elapsed;
GST_DEBUG_OBJECT (queue,
- "rates: period %f, in %" G_GUINT64_FORMAT ", out %" G_GUINT64_FORMAT,
- period, queue->bytes_in, queue->bytes_out);
+ "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
byte_in_rate = queue->bytes_in / period;
- byte_out_rate = queue->bytes_out / period;
if (queue->byte_in_rate == 0.0)
queue->byte_in_rate = byte_in_rate;
else
queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
- if (queue->byte_out_rate == 0.0)
- queue->byte_out_rate = byte_out_rate;
- else
- queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
-
/* reset the values to calculate rate over the next interval */
- queue->last_elapsed = elapsed;
+ queue->last_in_elapsed = elapsed;
queue->bytes_in = 0;
- queue->bytes_out = 0;
}
if (queue->byte_in_rate > 0.0) {
queue->cur_level.rate_time =
queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
}
+ GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
+ queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
+}
+
+static void
+update_out_rates (GstQueue * queue)
+{
+ gdouble elapsed, period;
+ gdouble byte_out_rate;
+
+ if (!queue->out_timer_started) {
+ queue->out_timer_started = TRUE;
+ g_timer_start (queue->out_timer);
+ return;
+ }
+
+ elapsed = g_timer_elapsed (queue->out_timer, NULL);
+
+ /* recalc after each interval. */
+ if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
+ period = elapsed - queue->last_out_elapsed;
- GST_DEBUG_OBJECT (queue, "rates: in %f, out %f, time %" GST_TIME_FORMAT,
- queue->byte_in_rate, queue->byte_out_rate,
- GST_TIME_ARGS (queue->cur_level.rate_time));
+ GST_DEBUG_OBJECT (queue,
+ "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
+
+ byte_out_rate = queue->bytes_out / period;
+
+ if (queue->byte_out_rate == 0.0)
+ queue->byte_out_rate = byte_out_rate;
+ else
+ queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
+
+ /* reset the values to calculate rate over the next interval */
+ queue->last_out_elapsed = elapsed;
+ queue->bytes_out = 0;
+ }
+ GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
+ queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
}
static void
/* apply new buffer to segment stats */
apply_buffer (queue, buffer, &queue->sink_segment);
/* update the byterate stats */
- update_rates (queue);
+ update_in_rates (queue);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue_write_buffer_to_file (queue, buffer);
queue->bytes_out += size;
apply_buffer (queue, buffer, &queue->src_segment);
/* update the byterate stats */
- update_rates (queue);
+ update_out_rates (queue);
/* update the buffering */
update_buffering (queue);
/* We make space available if we're "full" according to whatever
* the user defined as "full". */
- while (gst_queue_is_filled (queue)) {
+ if (gst_queue_is_filled (queue)) {
gboolean started;
/* pause the timer while we wait. The fact that we are waiting does not mean
* the byterate on the input pad is lower */
- if ((started = queue->timer_started))
- g_timer_stop (queue->timer);
+ if ((started = queue->in_timer_started))
+ g_timer_stop (queue->in_timer);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, waiting for free space");
- /* Wait for space to be available, we could be unlocked because of a flush. */
- GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
+ do {
+ /* Wait for space to be available, we could be unlocked because of a flush. */
+ GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
+ }
+ while (gst_queue_is_filled (queue));
/* and continue if we were running before */
if (started)
- g_timer_continue (queue->timer);
+ g_timer_continue (queue->in_timer);
}
/* put buffer in queue now */
/* have to lock for thread-safety */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
- while (gst_queue_is_empty (queue)) {
- /* we recheck, we could be unlocked because of a flush. */
- GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+ if (gst_queue_is_empty (queue)) {
+ gboolean started;
+
+ /* pause the timer while we wait. The fact that we are waiting does not mean
+ * the byterate on the output pad is lower */
+ if ((started = queue->out_timer_started))
+ g_timer_stop (queue->out_timer);
+
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "queue is empty, waiting for new data");
+ do {
+ /* Wait for data to be available, we could be unlocked because of a flush. */
+ GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+ }
+ while (gst_queue_is_empty (queue));
+
+ /* and continue if we were running before */
+ if (started)
+ g_timer_continue (queue->out_timer);
}
ret = gst_queue_push_one (queue);
queue->srcresult = ret;