#define GST_CAT_DEFAULT (queue_debug)
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
-#define STATUS(queue, msg) \
+#define STATUS(queue, pad, msg) \
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
"bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer);
static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset,
guint size, GstCaps * caps, GstBuffer ** buf);
+static gboolean gst_queue_push_one (GstQueue * queue);
static void gst_queue_loop (GstPad * pad);
static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
static GstStateChangeReturn gst_queue_change_state (GstElement * element,
GstStateChange transition);
+static gboolean gst_queue_is_empty (GstQueue * queue);
+static gboolean gst_queue_is_filled (GstQueue * queue);
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
g_cond_signal (queue->item_del);
}
-#define STATUS(queue, msg) \
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
- "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
- "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
- "-%" G_GUINT64_FORMAT " ns, %u elements", \
- GST_DEBUG_PAD_NAME (pad), \
- queue->cur_level.buffers, \
- queue->min_threshold.buffers, \
- queue->max_size.buffers, \
- queue->cur_level.bytes, \
- queue->min_threshold.bytes, \
- queue->max_size.bytes, \
- queue->cur_level.time, \
- queue->min_threshold.time, \
- queue->max_size.time, \
- queue->queue->length)
-
static gboolean
gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
{
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
- STATUS (queue, "received flush start event");
+ STATUS (queue, pad, "received flush start event");
/* forward event */
gst_pad_push_event (queue->srcpad, event);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
goto done;
case GST_EVENT_FLUSH_STOP:
- STATUS (queue, "received flush stop event");
+ STATUS (queue, pad, "received flush stop event");
/* forward event */
gst_pad_push_event (queue->srcpad, event);
}
GST_QUEUE_MUTEX_UNLOCK (queue);
- STATUS (queue, "after flush");
+ STATUS (queue, pad, "after flush");
goto done;
case GST_EVENT_EOS:
- STATUS (queue, "received EOS");
+ STATUS (queue, pad, "received EOS");
have_eos = TRUE;
break;
default:
/* don't leak. Instead, wait for space to be available */
case GST_QUEUE_NO_LEAK:
- STATUS (queue, "pre-full wait");
+ STATUS (queue, pad, "pre-full wait");
while (gst_queue_is_filled (queue)) {
- STATUS (queue, "waiting for item_del signal from thread using qlock");
+ STATUS (queue, pad,
+ "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
if (queue->srcresult != GST_FLOW_OK)
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
* half of state change executes */
- STATUS (queue, "received item_del signal from thread using qlock");
+ STATUS (queue, pad,
+ "received item_del signal from thread using qlock");
}
- STATUS (queue, "post-full wait");
+ STATUS (queue, pad, "post-full wait");
GST_QUEUE_MUTEX_UNLOCK (queue);
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
queue->cur_level.time += GST_BUFFER_DURATION (buffer);
- STATUS (queue, "+ level");
+ STATUS (queue, pad, "+ level");
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
g_cond_signal (queue->item_add);
}
}
-static void
-gst_queue_loop (GstPad * pad)
+static gboolean
+gst_queue_push_one (GstQueue * queue)
{
- GstQueue *queue;
- GstMiniObject *data;
gboolean restart = TRUE;
-
- queue = GST_QUEUE (GST_PAD_PARENT (pad));
-
- /* have to lock for thread-safety */
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-
-restart:
- while (gst_queue_is_empty (queue)) {
- GST_QUEUE_MUTEX_UNLOCK (queue);
- g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-
- STATUS (queue, "pre-empty wait");
- while (gst_queue_is_empty (queue)) {
- STATUS (queue, "waiting for item_add");
-
- GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
- g_thread_self ());
- g_cond_wait (queue->item_add, queue->qlock);
-
- /* we released the lock in the g_cond above so we might be
- * flushing now */
- if (queue->srcresult != GST_FLOW_OK)
- goto out_flushing;
-
- GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
- g_thread_self ());
- STATUS (queue, "got item_add signal");
- }
-
- STATUS (queue, "post-empty wait");
- GST_QUEUE_MUTEX_UNLOCK (queue);
- g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
- }
+ GstMiniObject *data;
/* There's something in the list now, whatever it is */
data = g_queue_pop_head (queue->queue);
queue->cur_level.time -= GST_BUFFER_DURATION (data);
GST_QUEUE_MUTEX_UNLOCK (queue);
- result = gst_pad_push (pad, GST_BUFFER (data));
+ result = gst_pad_push (queue->srcpad, GST_BUFFER (data));
/* need to check for srcresult here as well */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
/* else result of push indicates what happens */
gst_pad_push_event (queue->srcpad, GST_EVENT (data));
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
if (restart == TRUE)
- goto restart;
+ return TRUE;
} else {
g_warning ("Unexpected object in queue %s (refcounting problem?)",
GST_OBJECT_NAME (queue));
}
- STATUS (queue, "after _get()");
+ STATUS (queue, queue->srcpad, "after _get()");
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
+
+ return FALSE;
+
+out_flushing:
+ gst_pad_pause_task (queue->srcpad);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "exit because task paused, reason: %s",
+ gst_flow_get_name (queue->srcresult));
+
+ return FALSE; /* FALSE == no restart */
+}
+
+static void
+gst_queue_loop (GstPad * pad)
+{
+ GstQueue *queue;
+
+ queue = GST_QUEUE (GST_PAD_PARENT (pad));
+
+ /* have to lock for thread-safety */
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+
+restart:
+ while (gst_queue_is_empty (queue)) {
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+
+ STATUS (queue, pad, "pre-empty wait");
+ while (gst_queue_is_empty (queue)) {
+ STATUS (queue, pad, "waiting for item_add");
+
+ GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
+ g_thread_self ());
+ g_cond_wait (queue->item_add, queue->qlock);
+
+ /* we released the lock in the g_cond above so we might be
+ * flushing now */
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
+ GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
+ g_thread_self ());
+ STATUS (queue, pad, "got item_add signal");
+ }
+
+ STATUS (queue, pad, "post-empty wait");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ }
+
+ if (gst_queue_push_one (queue))
+ goto restart;
+
GST_QUEUE_MUTEX_UNLOCK (queue);
return;