static void gst_queue2_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
-static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
-static GstFlowReturn gst_queue2_chain_list (GstPad * pad,
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
++static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list);
-static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
- guint size, GstCaps * caps, GstBuffer ** buf);
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
+ typedef enum
+ {
+ GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
+ GST_QUEUE2_ITEM_TYPE_BUFFER,
+ GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
+ GST_QUEUE2_ITEM_TYPE_EVENT
+ } GstQueue2ItemType;
+
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
-static void
-gst_queue2_base_init (gpointer g_class)
-{
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
-
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&srctemplate));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
-
- gst_element_class_set_details_simple (gstelement_class, "Queue 2",
- "Generic",
- "Simple data queue",
- "Erik Walthinsen <omega@cse.ogi.edu>, "
- "Wim Taymans <wim.taymans@gmail.com>");
-}
-
static void
gst_queue2_class_init (GstQueue2Class * klass)
{
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_chain));
- gst_pad_set_activatepush_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
+ gst_pad_set_chain_list_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
+ gst_pad_set_activatemode_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
- gst_pad_set_getcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
- gst_pad_set_bufferalloc_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
+ gst_pad_set_query_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
+ GST_OBJECT_FLAG_SET (queue->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
guint size;
buffer = GST_BUFFER_CAST (item);
- size = GST_BUFFER_SIZE (buffer);
+ size = gst_buffer_get_size (buffer);
- *is_buffer = TRUE;
+ *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer);
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
out_unexpected:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "exit because we received UNEXPECTED");
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
}
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+ static GstFlowReturn
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
++gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+ {
+ GstQueue2 *queue;
+
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
++ queue = GST_QUEUE2 (parent);
+
-gst_queue2_chain_list (GstPad * pad, GstBufferList * buffer_list)
++ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
++ "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+ GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+ return gst_queue2_chain_buffer_or_buffer_list (queue,
+ GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+ }
+
+ static GstFlowReturn
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
++gst_queue2_chain_list (GstPad * pad, GstObject * parent,
++ GstBufferList * buffer_list)
+ {
+ GstQueue2 *queue;
+
-gst_queue2_dequeue_on_unexpected (GstQueue2 * queue,
- GstQueue2ItemType * item_type)
++ queue = GST_QUEUE2 (parent);
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "received buffer list %p", buffer_list);
+
+ return gst_queue2_chain_buffer_or_buffer_list (queue,
+ GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+ }
+
+ static GstMiniObject *
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream");
++gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
+ {
+ GstMiniObject *data;
+
- * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
++ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
+
+ /* stop pushing buffers, we dequeue all items until we see an item that we
- * buffers with an UNEXPECTED return value until we receive something
++ * can push again, which is EOS or SEGMENT. If there is nothing in the
+ * queue we can push, we set a flag to make the sinkpad refuse more
- "dropping UNEXPECTED buffer %p", data);
++ * buffers with an EOS return value until we receive something
+ * pushable again or we get flushed. */
+ while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
+ if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
++ "dropping EOS buffer %p", data);
+ gst_buffer_unref (GST_BUFFER_CAST (data));
+ } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
+ GstEvent *event = GST_EVENT_CAST (data);
+ GstEventType type = GST_EVENT_TYPE (event);
+
- "pushing pushable event %s after UNEXPECTED",
- GST_EVENT_TYPE_NAME (event));
++ if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+ /* we found a pushable item in the queue, push it out */
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED event %p", event);
++ "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
+ return data;
+ }
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED buffer list %p", data);
++ "dropping EOS event %p", event);
+ gst_event_unref (event);
+ } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
++ "dropping EOS buffer list %p", data);
+ gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
+ }
+ }
+ /* no more items in the queue. Set the unexpected flag so that upstream
+ * make us refuse any more buffers on the sinkpad. Since we will still
++ * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
+ * task function does not shut down. */
+ queue->unexpected = TRUE;
+ return NULL;
+ }
+
/* dequeue an item from the queue an push it downstream. This functions returns
* the result of the push. */
static GstFlowReturn
next:
GST_QUEUE2_MUTEX_UNLOCK (queue);
- if (is_buffer) {
+ if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
+#if 0
GstCaps *caps;
+#endif
buffer = GST_BUFFER_CAST (data);
+#if 0
caps = GST_BUFFER_CAPS (buffer);
+#endif
+#if 0
/* set caps before pushing the buffer so that core does not try to do
* something fancy to check if this is possible. */
if (caps && caps != GST_PAD_CAPS (queue->srcpad))
/* need to check for srcresult here as well */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
- if (result == GST_FLOW_UNEXPECTED) {
- data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+ if (result == GST_FLOW_EOS) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
- /* stop pushing buffers, we dequeue all items until we see an item that we
- * can push again, which is EOS or SEGMENT. If there is nothing in the
- * queue we can push, we set a flag to make the sinkpad refuse more
- * buffers with an EOS return value until we receive something
- * pushable again or we get flushed. */
- while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
- if (is_buffer) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping EOS buffer %p", data);
- gst_buffer_unref (GST_BUFFER_CAST (data));
- } else if (GST_IS_EVENT (data)) {
- GstEvent *event = GST_EVENT_CAST (data);
- GstEventType type = GST_EVENT_TYPE (event);
-
- if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
- /* we found a pushable item in the queue, push it out */
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "pushing pushable event %s after EOS",
- GST_EVENT_TYPE_NAME (event));
- goto next;
- }
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping EOS event %p", event);
- gst_event_unref (event);
- }
- }
- /* no more items in the queue. Set the unexpected flag so that upstream
- * make us refuse any more buffers on the sinkpad. Since we will still
- * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
- * task function does not shut down. */
- queue->unexpected = TRUE;
- result = GST_FLOW_OK;
++ data = gst_queue2_dequeue_on_eos (queue, &item_type);
+ if (data != NULL)
+ goto next;
}
- } else if (GST_IS_EVENT (data)) {
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
}
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
- if (result == GST_FLOW_UNEXPECTED) {
- data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list;
+ GstBuffer *first_buf;
+ GstCaps *caps;
+
+ buffer_list = GST_BUFFER_LIST_CAST (data);
+
+ first_buf = gst_buffer_list_get (buffer_list, 0, 0);
+ caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
+
+ /* set caps before pushing the buffer so that core does not try to do
+ * something fancy to check if this is possible. */
+ if (caps && caps != GST_PAD_CAPS (queue->srcpad))
+ gst_pad_set_caps (queue->srcpad, caps);
+
+ result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+ /* need to check for srcresult here as well */
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
++ if (result == GST_FLOW_EOS) {
++ data = gst_queue2_dequeue_on_eos (queue, &item_type);
+ if (data != NULL)
+ goto next;
+ }
}
return result;