Fix multiqueue leaking buffers and events when downstream or the queue are flushing...
authorTim-Philipp Müller <tim@centricular.net>
Wed, 6 Jun 2007 18:11:10 +0000 (18:11 +0000)
committerTim-Philipp Müller <tim@centricular.net>
Wed, 6 Jun 2007 18:11:10 +0000 (18:11 +0000)
Original commit message from CVS:
* libs/gst/base/gstdataqueue.c:
* libs/gst/base/gstdataqueue.h:
* plugins/elements/gstmultiqueue.c: (gst_single_queue_push_one),
(gst_multi_queue_item_new), (gst_multi_queue_chain),
(gst_multi_queue_sink_event):
* tests/check/elements/multiqueue.c: (multiqueue_suite):
Fix multiqueue leaking buffers and events when downstream or the
queue are flushing. Make refcounting assumptions explicit and
document them (shouldn't break existing code that uses it other than
maybe leak miniobjects, but that already happens anyway). Add unit
test for the most common flushing case. Fixes #423700.

ChangeLog
libs/gst/base/gstdataqueue.c
libs/gst/base/gstdataqueue.h
plugins/elements/gstmultiqueue.c
tests/check/elements/multiqueue.c

index f810346..6c66d79 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,17 @@
+2007-06-06  Tim-Philipp Müller  <tim at centricular dot net>
+
+       * libs/gst/base/gstdataqueue.c:
+       * libs/gst/base/gstdataqueue.h:
+       * plugins/elements/gstmultiqueue.c: (gst_single_queue_push_one),
+       (gst_multi_queue_item_new), (gst_multi_queue_chain),
+       (gst_multi_queue_sink_event):
+       * tests/check/elements/multiqueue.c: (multiqueue_suite):
+         Fix multiqueue leaking buffers and events when downstream or the
+         queue are flushing. Make refcounting assumptions explicit and
+         document them (shouldn't break existing code that uses it other than
+         maybe leak miniobjects, but that already happens anyway). Add unit
+         test for the most common flushing case. Fixes #423700.
+         
 2007-06-06  Sebastian Dröge  <slomo@circular-chaos.org>
 
        * libs/gst/controller/gstcontroller.c:
index 5c009d4..a3bfde2 100644 (file)
@@ -397,6 +397,11 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
  * available, OR the @queue is set to flushing state.
  * MT safe.
  *
+ * Note that this function has slightly different semantics than gst_pad_push()
+ * and gst_pad_push_event(): this function only takes ownership of @item and
+ * the #GstMiniObject contained in @item if the push was successful. If FALSE
+ * is returned, the caller is responsible for freeing @item and its contents.
+ *
  * Returns: #TRUE if the @item was successfully pushed on the @queue.
  */
 gboolean
index b3cd87b..503e44c 100644 (file)
@@ -48,7 +48,9 @@ typedef struct _GstDataQueueItem GstDataQueueItem;
  * @duration: the duration in #GstClockTime of the miniobject. Can not be
  * #GST_CLOCK_TIME_NONE.
  * @visible: #TRUE if @object should be considered as a visible object.
- * @destroy: The #GDestroyNotify to use to free the #GstDataQueueItem.
+ * @destroy: The #GDestroyNotify function to use to free the #GstDataQueueItem.
+ * This function should also drop the reference to @object the owner of the
+ * #GstDataQueueItem is assumed to hold.
  *
  * Structure used by #GstDataQueue. You can supply a different structure, as
  * long as the top of the structure is identical to this structure.
index 3aca642..d4208af 100644 (file)
@@ -427,7 +427,10 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     GstMiniObject * object)
 {
   if (GST_IS_BUFFER (object)) {
-    sq->srcresult = gst_pad_push (sq->srcpad, GST_BUFFER (object));
+    GstBuffer *buf;
+
+    buf = gst_buffer_ref (GST_BUFFER_CAST (object));
+    sq->srcresult = gst_pad_push (sq->srcpad, buf);
 
     if ((sq->srcresult != GST_FLOW_OK)
         && (sq->srcresult != GST_FLOW_NOT_LINKED)) {
@@ -437,7 +440,10 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
       gst_pad_pause_task (sq->srcpad);
     }
   } else if (GST_IS_EVENT (object)) {
-    if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
+    GstEvent *event;
+
+    event = gst_event_ref (GST_EVENT_CAST (object));
+    if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
       sq->srcresult = GST_FLOW_UNEXPECTED;
 
       GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, got EOS",
@@ -445,7 +451,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
       gst_data_queue_set_flushing (sq->queue, TRUE);
       gst_pad_pause_task (sq->srcpad);
     }
-    gst_pad_push_event (sq->srcpad, GST_EVENT (object));
+    gst_pad_push_event (sq->srcpad, event);
   } else {
     g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
         sq->id);
@@ -461,13 +467,14 @@ gst_multi_queue_item_destroy (GstMultiQueueItem * item)
   g_free (item);
 }
 
+/* takes ownership of passed mini object! */
 static GstMultiQueueItem *
 gst_multi_queue_item_new (GstMiniObject * object)
 {
   GstMultiQueueItem *item;
 
   item = g_new0 (GstMultiQueueItem, 1);
-  item->object = gst_mini_object_ref (object);
+  item->object = object;
   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
 
   if (GST_IS_BUFFER (object)) {
@@ -621,7 +628,6 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
         sq->id, gst_flow_get_name (sq->srcresult));
     gst_multi_queue_item_destroy (item);
-    gst_buffer_unref (buffer);
     ret = sq->srcresult;
   }
 
@@ -718,7 +724,6 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
         sq->id, gst_flow_get_name (sq->srcresult));
     gst_multi_queue_item_destroy (item);
-    gst_event_unref (event);
   }
 
 done:
index ddf01ae..e6826e2 100644 (file)
@@ -259,11 +259,7 @@ multiqueue_suite (void)
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_simple_create_destroy);
   tcase_add_test (tc_chain, test_simple_pipeline);
-
-  if (0) {
-    /* FIXME: this leaks buffers, disabled for now */
-    tcase_add_test (tc_chain, test_simple_shutdown_while_running);
-  }
+  tcase_add_test (tc_chain, test_simple_shutdown_while_running);
 
   /* FIXME: test_request_pads() needs some more fixes, see comments there */
   tcase_add_test (tc_chain, test_request_pads);