SIGNAL_UNDERRUN,
SIGNAL_RUNNING,
SIGNAL_OVERRUN,
+ SIGNAL_PUSHING,
LAST_SIGNAL
};
g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ /**
+ * GstQueue::pushing:
+ * @queue: the queue instance
+ *
+ * Reports when the queue has enough data to start pushing data again on the
+ * source pad.
+ */
+ gst_queue_signals[SIGNAL_PUSHING] =
+ g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
/* properties */
g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
}
static void
+update_time_level (GstQueue * queue)
+{
+ gint64 sink_time, src_time;
+
+ sink_time =
+ gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+ queue->sink_segment.last_stop);
+
+ src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+ queue->src_segment.last_stop);
+
+ if (sink_time >= src_time)
+ queue->cur_level.time = sink_time - src_time;
+ else
+ queue->cur_level.time = 0;
+}
+
+static void
gst_queue_locked_flush (GstQueue * queue)
{
while (!g_queue_is_empty (queue->queue)) {
queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
queue->min_threshold.time = queue->orig_min_threshold.time;
+ gst_segment_init (&queue->sink_segment, GST_FORMAT_UNDEFINED);
+ gst_segment_init (&queue->src_segment, GST_FORMAT_UNDEFINED);
/* we deleted something... */
g_cond_signal (queue->item_del);
STATUS (queue, pad, "received EOS");
have_eos = TRUE;
break;
+ case GST_EVENT_NEWSEGMENT:
+ {
+ gboolean update;
+ GstFormat format;
+ gdouble rate, arate;
+ gint64 start, stop, time;
+
+ gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+ &start, &stop, &time);
+
+ GST_DEBUG_OBJECT (queue, "received NEWSEGMENT in %s",
+ gst_format_get_name (format));
+
+ /* now configure the values */
+ gst_segment_set_newsegment_full (&queue->sink_segment, update,
+ rate, arate, format, start, stop, time);
+ break;
+ }
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
/* we put the event in the queue, we don't have to act ourselves */
queue->cur_level.time >= queue->max_size.time)));
}
-
static GstFlowReturn
gst_queue_chain (GstPad * pad, GstBuffer * buffer)
{
GstQueue *queue;
+ GstClockTime duration, timestamp;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ duration = GST_BUFFER_DURATION (buffer);
+
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
+ "adding buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+ GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+ GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
/* We make space available if we're "full" according to whatever
* the user defined as "full". Note that this only applies to buffers.
* to make things read-only. Also keep our list uptodate. */
queue->cur_level.bytes -= GST_BUFFER_SIZE (leak);
queue->cur_level.buffers--;
- if (GST_BUFFER_DURATION (leak) != GST_CLOCK_TIME_NONE)
- queue->cur_level.time -= GST_BUFFER_DURATION (leak);
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ duration = GST_BUFFER_DURATION (buffer);
+
+ /* update start time in queue */
+ if (queue->src_segment.format == GST_FORMAT_TIME) {
+ gint64 last_stop;
+
+ if (timestamp != GST_CLOCK_TIME_NONE)
+ last_stop = timestamp;
+ else
+ last_stop = queue->src_segment.last_stop;
+
+ gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
+ last_stop);
+
+ update_time_level (queue);
+ } else if (duration != GST_CLOCK_TIME_NONE) {
+ if (queue->cur_level.time > duration)
+ queue->cur_level.time -= duration;
+ else
+ queue->cur_level.time = 0;
+ }
gst_buffer_unref (leak);
break;
}
/* add buffer to the statistics */
queue->cur_level.buffers++;
queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
- if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
- queue->cur_level.time += GST_BUFFER_DURATION (buffer);
+ /* update start time in queue */
+ if (queue->sink_segment.format == GST_FORMAT_TIME) {
+ gint64 last_stop;
+
+ if (timestamp != GST_CLOCK_TIME_NONE)
+ last_stop = timestamp;
+ else
+ last_stop = queue->sink_segment.last_stop;
+
+ if (duration != GST_CLOCK_TIME_NONE)
+ last_stop += duration;
+
+ gst_segment_set_last_stop (&queue->sink_segment, GST_FORMAT_TIME,
+ last_stop);
+
+ update_time_level (queue);
+ } else if (duration != GST_CLOCK_TIME_NONE) {
+ queue->cur_level.time += duration;
+ }
STATUS (queue, pad, "+ level");
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
if (GST_IS_BUFFER (data)) {
GstFlowReturn result;
+ GstClockTime timestamp, duration;
+ GstBuffer *buffer = GST_BUFFER (data);
/* Update statistics */
queue->cur_level.buffers--;
- queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
- if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
- queue->cur_level.time -= GST_BUFFER_DURATION (data);
+ queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
+
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ duration = GST_BUFFER_DURATION (buffer);
+
+ /* update start time in queue */
+ if (queue->src_segment.format == GST_FORMAT_TIME) {
+ gint64 last_stop;
+
+ if (timestamp != GST_CLOCK_TIME_NONE)
+ last_stop = timestamp;
+ else
+ last_stop = queue->src_segment.last_stop;
+
+ gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
+ last_stop);
+
+ update_time_level (queue);
+ } else if (duration != GST_CLOCK_TIME_NONE) {
+ if (queue->cur_level.time > duration)
+ queue->cur_level.time -= duration;
+ else
+ queue->cur_level.time = 0;
+ }
GST_QUEUE_MUTEX_UNLOCK (queue);
- result = gst_pad_push (queue->srcpad, GST_BUFFER (data));
+ result = gst_pad_push (queue->srcpad, buffer);
/* need to check for srcresult here as well */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
/* else result of push indicates what happens */
gst_pad_pause_task (queue->srcpad);
}
} else if (GST_IS_EVENT (data)) {
- if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
- queue->cur_level.buffers = 0;
- queue->cur_level.bytes = 0;
- queue->cur_level.time = 0;
- /* all incomming data is now unexpected */
- queue->srcresult = GST_FLOW_UNEXPECTED;
- /* and we don't need to process anymore */
- GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
- gst_pad_pause_task (queue->srcpad);
- restart = FALSE;
+ GstEvent *event = GST_EVENT (data);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_EOS:
+ queue->cur_level.buffers = 0;
+ queue->cur_level.bytes = 0;
+ queue->cur_level.time = 0;
+ /* all incomming data is now unexpected */
+ queue->srcresult = GST_FLOW_UNEXPECTED;
+ /* and we don't need to process anymore */
+ GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
+ gst_pad_pause_task (queue->srcpad);
+ restart = FALSE;
+ break;
+ case GST_EVENT_NEWSEGMENT:
+ {
+ gboolean update;
+ GstFormat format;
+ gdouble rate, arate;
+ gint64 start, stop, time;
+
+ gst_event_parse_new_segment_full (event, &update, &rate, &arate,
+ &format, &start, &stop, &time);
+
+ /* now configure the values */
+ gst_segment_set_newsegment_full (&queue->src_segment, update,
+ rate, arate, format, start, stop, time);
+ break;
+ }
+ default:
+ break;
}
GST_QUEUE_MUTEX_UNLOCK (queue);
- gst_pad_push_event (queue->srcpad, GST_EVENT (data));
+ gst_pad_push_event (queue->srcpad, event);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
if (restart == TRUE)
return TRUE;
STATUS (queue, pad, "post-empty wait");
GST_QUEUE_MUTEX_UNLOCK (queue);
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_PUSHING], 0);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
}