static gboolean gst_queue_is_empty (GstQueue * queue);
static gboolean gst_queue_is_filled (GstQueue * queue);
+
+typedef struct
+{
+ gboolean is_query;
+ GstMiniObject *item;
+} GstQueueItem;
+
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
static GType
g_mutex_init (&queue->qlock);
g_cond_init (&queue->item_add);
g_cond_init (&queue->item_del);
+ g_cond_init (&queue->query_handled);
queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
static void
gst_queue_finalize (GObject * object)
{
- GstMiniObject *data;
GstQueue *queue = GST_QUEUE (object);
GST_DEBUG_OBJECT (queue, "finalizing queue");
while (!gst_queue_array_is_empty (queue->queue)) {
- data = gst_queue_array_pop_head (queue->queue);
+ GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
/* FIXME: if it's a query, shouldn't we unref that too? */
- if (!GST_IS_QUERY (data))
- gst_mini_object_unref (data);
+ if (!qitem->is_query)
+ gst_mini_object_unref (qitem->item);
+ g_slice_free (GstQueueItem, qitem);
}
gst_queue_array_free (queue->queue);
g_mutex_clear (&queue->qlock);
g_cond_clear (&queue->item_add);
g_cond_clear (&queue->item_del);
+ g_cond_clear (&queue->query_handled);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_queue_locked_flush (GstQueue * queue, gboolean full)
{
- GstMiniObject *data;
-
while (!gst_queue_array_is_empty (queue->queue)) {
- data = gst_queue_array_pop_head (queue->queue);
+ GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
+
/* Then lose another reference because we are supposed to destroy that
data when flushing */
- if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
- GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
- && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
- gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+ if (!full && GST_IS_EVENT (qitem->item) && GST_EVENT_IS_STICKY (qitem->item)
+ && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
+ && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
+ gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
}
- if (!GST_IS_QUERY (data))
- gst_mini_object_unref (data);
+ if (!qitem->is_query)
+ gst_mini_object_unref (qitem->item);
+ g_slice_free (GstQueueItem, qitem);
}
queue->last_query = FALSE;
+ g_cond_signal (&queue->query_handled);
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
queue->cur_level.bytes += gst_buffer_get_size (buffer);
apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
- if (item)
- gst_queue_array_push_tail (queue->queue, item);
+ if (item) {
+ GstQueueItem *qitem = g_slice_new (GstQueueItem);
+ qitem->item = item;
+ qitem->is_query = FALSE;
+ gst_queue_array_push_tail (queue->queue, qitem);
+ }
GST_QUEUE_SIGNAL_ADD (queue);
}
break;
}
- if (item)
- gst_queue_array_push_tail (queue->queue, item);
+ if (item) {
+ GstQueueItem *qitem = g_slice_new (GstQueueItem);
+ qitem->item = item;
+ qitem->is_query = FALSE;
+ gst_queue_array_push_tail (queue->queue, qitem);
+ }
GST_QUEUE_SIGNAL_ADD (queue);
}
static GstMiniObject *
gst_queue_locked_dequeue (GstQueue * queue)
{
+ GstQueueItem *qitem;
GstMiniObject *item;
- item = gst_queue_array_pop_head (queue->queue);
- if (item == NULL)
+ qitem = gst_queue_array_pop_head (queue->queue);
+ if (qitem == NULL)
goto no_item;
+ item = qitem->item;
+ g_slice_free (GstQueueItem, qitem);
+
if (GST_IS_BUFFER (item)) {
GstBuffer *buffer = GST_BUFFER_CAST (item);
/* unblock the loop and chain functions */
GST_QUEUE_SIGNAL_ADD (queue);
GST_QUEUE_SIGNAL_DEL (queue);
+ queue->last_query = FALSE;
+ g_cond_signal (&queue->query_handled);
GST_QUEUE_MUTEX_UNLOCK (queue);
/* make sure it pauses, this should happen since we sent
switch (GST_QUERY_TYPE (query)) {
default:
if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
+ GstQueueItem *qitem;
+
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
GST_QUERY_TYPE_NAME (query));
- gst_queue_array_push_tail (queue->queue, query);
+ qitem = g_slice_new (GstQueueItem);
+ qitem->item = GST_MINI_OBJECT_CAST (query);
+ qitem->is_query = TRUE;
+ gst_queue_array_push_tail (queue->queue, qitem);
GST_QUEUE_SIGNAL_ADD (queue);
- while (!gst_queue_array_is_empty (queue->queue)) {
- /* for as long as the queue has items, we know the query is
- * not handled yet */
- GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
- }
+ g_cond_wait (&queue->query_handled, &queue->qlock);
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
res = queue->last_query;
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* ERRORS */
out_flushing:
{
- gint index;
-
GST_DEBUG_OBJECT (queue, "we are flushing");
-
- /* Remove query from queue if still there, since we hold no ref to it */
- index = gst_queue_array_find (queue->queue, NULL, query);
-
- if (index >= 0)
- gst_queue_array_drop_element (queue->queue, index);
-
GST_QUEUE_MUTEX_UNLOCK (queue);
return FALSE;
}
static gboolean
gst_queue_is_empty (GstQueue * queue)
{
- GstMiniObject *head;
+ GstQueueItem *head;
if (gst_queue_array_is_empty (queue->queue))
return TRUE;
* we would block forever on serialized queries.
*/
head = gst_queue_array_peek_head (queue->queue);
- if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head))
+ if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item))
return FALSE;
/* It is possible that a max size is reached before all min thresholds are.
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping query %p because of EOS", query);
queue->last_query = FALSE;
+ g_cond_signal (&queue->query_handled);
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
GstQuery *query = GST_QUERY_CAST (data);
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
+ g_cond_signal (&queue->query_handled);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"did query %p, return %d", query, queue->last_query);
}