gst/base/gstbasesrc.*: Add a gboolean to decide when to push out a discont.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 20 Jul 2005 10:58:10 +0000 (10:58 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 20 Jul 2005 10:58:10 +0000 (10:58 +0000)
Original commit message from CVS:
* gst/base/gstbasesrc.c: (gst_base_src_init),
(gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start):
* gst/base/gstbasesrc.h:
Add a gboolean to decide when to push out a discont.

* gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_loop), (gst_queue_handle_src_query),
(gst_queue_sink_activate_push), (gst_queue_src_activate_push),
(gst_queue_set_property), (gst_queue_get_property):
Some cleanups.

* tests/threadstate/threadstate1.c: (main):
Make a thread test compile and run... very silly..

ChangeLog
gst/base/gstbasesrc.c
gst/base/gstbasesrc.h
gst/gstqueue.c
libs/gst/base/gstbasesrc.c
libs/gst/base/gstbasesrc.h
plugins/elements/gstqueue.c
tests/threadstate/threadstate1.c

index 9a28551..c296dd1 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,20 @@
+2005-07-20  Wim Taymans  <wim@fluendo.com>
+
+       * gst/base/gstbasesrc.c: (gst_base_src_init),
+       (gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start):
+       * gst/base/gstbasesrc.h:
+       Add a gboolean to decide when to push out a discont.
+
+       * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
+       (gst_queue_loop), (gst_queue_handle_src_query),
+       (gst_queue_sink_activate_push), (gst_queue_src_activate_push),
+       (gst_queue_set_property), (gst_queue_get_property):
+       Some cleanups.
+
+       * tests/threadstate/threadstate1.c: (main):
+       Make a thread test compile and run... very silly..
+
+
 2005-07-20  Ronald S. Bultje  <rbultje@ronald.bitfreak.net>
 
        * docs/manual/appendix-porting.xml:
index 6d830d3..cdacbd0 100644 (file)
@@ -195,6 +195,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
 
   basesrc->segment_start = -1;
   basesrc->segment_end = -1;
+  basesrc->need_discont = TRUE;
   basesrc->blocksize = DEFAULT_BLOCKSIZE;
   basesrc->clock_id = NULL;
 
@@ -432,8 +433,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event)
       goto error;
   }
 
-  /* now send discont */
-  gst_base_src_send_discont (src);
+  /* now make sure the discont will be send */
+  src->need_discont = TRUE;
 
   /* and restart the task */
   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
@@ -669,9 +670,10 @@ gst_base_src_loop (GstPad * pad)
 
   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
 
-  if (src->offset == 0) {
+  if (src->need_discont) {
     /* now send discont */
     gst_base_src_send_discont (src);
+    src->need_discont = FALSE;
   }
 
   ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf);
@@ -871,7 +873,6 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   /* start in the beginning */
   basesrc->offset = 0;
-  basesrc->segment_start = 0;
 
   /* figure out the size */
   if (bclass->get_size) {
@@ -886,7 +887,9 @@ gst_base_src_start (GstBaseSrc * basesrc)
   GST_DEBUG ("size %d %lld", result, basesrc->size);
 
   /* we always run to the end */
+  basesrc->segment_start = 0;
   basesrc->segment_end = basesrc->size;
+  basesrc->need_discont = TRUE;
 
   /* check if we can seek, updates ->seekable */
   gst_base_src_is_seekable (basesrc);
index f8713b1..0676b91 100644 (file)
@@ -92,6 +92,7 @@ struct _GstBaseSrc {
   gint64        segment_start; /* start and end positions for seeking */
   gint64        segment_end;
   gboolean      segment_loop;
+  gboolean      need_discont;
 
   guint64       offset;        /* current offset in the resource */
   guint64        size;         /* total size of the resource */
index f8d3708..971d4c4 100644 (file)
@@ -95,21 +95,27 @@ enum
       /* FILL ME */
 };
 
-#define GST_QUEUE_MUTEX_LOCK G_STMT_START {                            \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,                           \
+#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                         \
+  GST_CAT_LOG_OBJECT (queue_dataflow, q,                               \
       "locking qlock from thread %p",                                  \
       g_thread_self ());                                               \
-  g_mutex_lock (queue->qlock);                                         \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,                           \
+  g_mutex_lock (q->qlock);                                             \
+  GST_CAT_LOG_OBJECT (queue_dataflow, q,                               \
       "locked qlock from thread %p",                                   \
       g_thread_self ());                                               \
 } G_STMT_END
 
-#define GST_QUEUE_MUTEX_UNLOCK G_STMT_START {                          \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,                           \
+#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START {             \
+  GST_QUEUE_MUTEX_LOCK (q);                                            \
+  if (q->srcresult != GST_FLOW_OK)                                     \
+    goto label;                                                                \
+} G_STMT_END
+
+#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                       \
+  GST_CAT_LOG_OBJECT (queue_dataflow, q,                               \
       "unlocking qlock from thread %p",                                        \
       g_thread_self ());                                               \
-  g_mutex_unlock (queue->qlock);                                       \
+  g_mutex_unlock (q->qlock);                                           \
 } G_STMT_END
 
 
@@ -468,25 +474,25 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH:
       STATUS (queue, "received flush event");
-      /* forward event */
+      /* forward event, re first as we're going to use it still */
       gst_event_ref (event);
       gst_pad_push_event (queue->srcpad, event);
       if (GST_EVENT_FLUSH_DONE (event)) {
-        GST_QUEUE_MUTEX_LOCK;
+        GST_QUEUE_MUTEX_LOCK (queue);
         gst_queue_locked_flush (queue);
         queue->srcresult = GST_FLOW_OK;
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
             queue->srcpad);
-        GST_QUEUE_MUTEX_UNLOCK;
+        GST_QUEUE_MUTEX_UNLOCK (queue);
+
+        STATUS (queue, "after flush");
       } else {
         /* now unblock the chain function */
-        GST_QUEUE_MUTEX_LOCK;
+        GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_WRONG_STATE;
         /* unblock the loop function */
         g_cond_signal (queue->item_add);
-        GST_QUEUE_MUTEX_UNLOCK;
-
-        STATUS (queue, "after flush");
+        GST_QUEUE_MUTEX_UNLOCK (queue);
 
         /* make sure it pauses */
         gst_pad_pause_task (queue->srcpad);
@@ -504,11 +510,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       break;
   }
 
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK (queue);
   g_queue_push_tail (queue->queue, event);
   g_cond_signal (queue->item_add);
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 
-  GST_QUEUE_MUTEX_UNLOCK;
 done:
 
   return TRUE;
@@ -546,10 +552,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   /* we have to lock the queue since we span threads */
-  GST_QUEUE_MUTEX_LOCK;
-
-  if (queue->srcresult != GST_FLOW_OK)
-    goto out_flushing;
+  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
       "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
@@ -558,12 +561,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
    * the user defined as "full". Note that this only applies to buffers.
    * We always handle events and they don't count in our statistics. */
   while (gst_queue_is_filled (queue)) {
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
-    GST_QUEUE_MUTEX_LOCK;
-
-    if (queue->srcresult != GST_FLOW_OK)
-      goto out_flushing;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
     /* how are we going to make space for this buffer? */
     switch (queue->leaky) {
@@ -637,12 +637,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
         }
 
         STATUS (queue, "post-full wait");
-        GST_QUEUE_MUTEX_UNLOCK;
+        GST_QUEUE_MUTEX_UNLOCK (queue);
         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
-        GST_QUEUE_MUTEX_LOCK;
-
-        if (queue->srcresult != GST_FLOW_OK)
-          goto out_flushing;
+        GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
         break;
     }
@@ -660,14 +657,14 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
   g_cond_signal (queue->item_add);
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
 
   /* special conditions */
 out_unref:
   {
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     gst_buffer_unref (buffer);
 
@@ -679,7 +676,7 @@ out_flushing:
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason:  %d", ret);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     gst_buffer_unref (buffer);
 
@@ -697,19 +694,13 @@ gst_queue_loop (GstPad * pad)
   queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
   /* have to lock for thread-safety */
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
 restart:
-  if (queue->srcresult != GST_FLOW_OK)
-    goto out_flushing;
-
   while (gst_queue_is_empty (queue)) {
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
-    GST_QUEUE_MUTEX_LOCK;
-
-    if (queue->srcresult != GST_FLOW_OK)
-      goto out_flushing;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
     STATUS (queue, "pre-empty wait");
     while (gst_queue_is_empty (queue)) {
@@ -730,12 +721,9 @@ restart:
     }
 
     STATUS (queue, "post-empty wait");
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
-    GST_QUEUE_MUTEX_LOCK;
-
-    if (queue->srcresult != GST_FLOW_OK)
-      goto out_flushing;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
   }
 
   /* There's something in the list now, whatever it is */
@@ -752,9 +740,9 @@ restart:
     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
       queue->cur_level.time -= GST_BUFFER_DURATION (data);
 
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     result = gst_pad_push (pad, GST_BUFFER (data));
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     /* can opt to check for srcresult here but the push should
      * return an error value that is more accurate */
     if (result != GST_FLOW_OK) {
@@ -775,9 +763,9 @@ restart:
       gst_pad_pause_task (queue->srcpad);
       restart = FALSE;
     }
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_pad_push_event (queue->srcpad, GST_EVENT (data));
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     if (restart == TRUE)
       goto restart;
   }
@@ -786,7 +774,7 @@ restart:
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return;
 
@@ -794,7 +782,7 @@ out_flushing:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason:  %d", queue->srcresult);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     return;
   }
@@ -877,11 +865,11 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
     result = TRUE;
   } else {
     /* step 1, unblock chain and loop functions */
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
     g_cond_signal (queue->item_del);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     /* step 2, make sure streaming finishes */
     result = gst_pad_stop_task (pad);
@@ -900,16 +888,16 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_OK;
     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
     /* step 1, unblock chain and loop functions */
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     g_cond_signal (queue->item_add);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     /* step 2, make sure streaming finishes */
     result = gst_pad_stop_task (pad);
@@ -965,7 +953,7 @@ gst_queue_set_property (GObject * object,
 
   /* someone could change levels here, and since this
    * affects the get/put funcs, we need to lock for safety. */
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK (queue);
 
   switch (prop_id) {
     case ARG_MAX_SIZE_BYTES:
@@ -1000,7 +988,7 @@ gst_queue_set_property (GObject * object,
       break;
   }
 
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 }
 
 static void
@@ -1009,7 +997,7 @@ gst_queue_get_property (GObject * object,
 {
   GstQueue *queue = GST_QUEUE (object);
 
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK (queue);
 
   switch (prop_id) {
     case ARG_CUR_LEVEL_BYTES:
@@ -1053,5 +1041,5 @@ gst_queue_get_property (GObject * object,
       break;
   }
 
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 }
index 6d830d3..cdacbd0 100644 (file)
@@ -195,6 +195,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
 
   basesrc->segment_start = -1;
   basesrc->segment_end = -1;
+  basesrc->need_discont = TRUE;
   basesrc->blocksize = DEFAULT_BLOCKSIZE;
   basesrc->clock_id = NULL;
 
@@ -432,8 +433,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event)
       goto error;
   }
 
-  /* now send discont */
-  gst_base_src_send_discont (src);
+  /* now make sure the discont will be send */
+  src->need_discont = TRUE;
 
   /* and restart the task */
   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
@@ -669,9 +670,10 @@ gst_base_src_loop (GstPad * pad)
 
   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
 
-  if (src->offset == 0) {
+  if (src->need_discont) {
     /* now send discont */
     gst_base_src_send_discont (src);
+    src->need_discont = FALSE;
   }
 
   ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf);
@@ -871,7 +873,6 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   /* start in the beginning */
   basesrc->offset = 0;
-  basesrc->segment_start = 0;
 
   /* figure out the size */
   if (bclass->get_size) {
@@ -886,7 +887,9 @@ gst_base_src_start (GstBaseSrc * basesrc)
   GST_DEBUG ("size %d %lld", result, basesrc->size);
 
   /* we always run to the end */
+  basesrc->segment_start = 0;
   basesrc->segment_end = basesrc->size;
+  basesrc->need_discont = TRUE;
 
   /* check if we can seek, updates ->seekable */
   gst_base_src_is_seekable (basesrc);
index f8713b1..0676b91 100644 (file)
@@ -92,6 +92,7 @@ struct _GstBaseSrc {
   gint64        segment_start; /* start and end positions for seeking */
   gint64        segment_end;
   gboolean      segment_loop;
+  gboolean      need_discont;
 
   guint64       offset;        /* current offset in the resource */
   guint64        size;         /* total size of the resource */
index f8d3708..971d4c4 100644 (file)
@@ -95,21 +95,27 @@ enum
       /* FILL ME */
 };
 
-#define GST_QUEUE_MUTEX_LOCK G_STMT_START {                            \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,                           \
+#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                         \
+  GST_CAT_LOG_OBJECT (queue_dataflow, q,                               \
       "locking qlock from thread %p",                                  \
       g_thread_self ());                                               \
-  g_mutex_lock (queue->qlock);                                         \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,                           \
+  g_mutex_lock (q->qlock);                                             \
+  GST_CAT_LOG_OBJECT (queue_dataflow, q,                               \
       "locked qlock from thread %p",                                   \
       g_thread_self ());                                               \
 } G_STMT_END
 
-#define GST_QUEUE_MUTEX_UNLOCK G_STMT_START {                          \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,                           \
+#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START {             \
+  GST_QUEUE_MUTEX_LOCK (q);                                            \
+  if (q->srcresult != GST_FLOW_OK)                                     \
+    goto label;                                                                \
+} G_STMT_END
+
+#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                       \
+  GST_CAT_LOG_OBJECT (queue_dataflow, q,                               \
       "unlocking qlock from thread %p",                                        \
       g_thread_self ());                                               \
-  g_mutex_unlock (queue->qlock);                                       \
+  g_mutex_unlock (q->qlock);                                           \
 } G_STMT_END
 
 
@@ -468,25 +474,25 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH:
       STATUS (queue, "received flush event");
-      /* forward event */
+      /* forward event, re first as we're going to use it still */
       gst_event_ref (event);
       gst_pad_push_event (queue->srcpad, event);
       if (GST_EVENT_FLUSH_DONE (event)) {
-        GST_QUEUE_MUTEX_LOCK;
+        GST_QUEUE_MUTEX_LOCK (queue);
         gst_queue_locked_flush (queue);
         queue->srcresult = GST_FLOW_OK;
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
             queue->srcpad);
-        GST_QUEUE_MUTEX_UNLOCK;
+        GST_QUEUE_MUTEX_UNLOCK (queue);
+
+        STATUS (queue, "after flush");
       } else {
         /* now unblock the chain function */
-        GST_QUEUE_MUTEX_LOCK;
+        GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_WRONG_STATE;
         /* unblock the loop function */
         g_cond_signal (queue->item_add);
-        GST_QUEUE_MUTEX_UNLOCK;
-
-        STATUS (queue, "after flush");
+        GST_QUEUE_MUTEX_UNLOCK (queue);
 
         /* make sure it pauses */
         gst_pad_pause_task (queue->srcpad);
@@ -504,11 +510,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       break;
   }
 
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK (queue);
   g_queue_push_tail (queue->queue, event);
   g_cond_signal (queue->item_add);
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 
-  GST_QUEUE_MUTEX_UNLOCK;
 done:
 
   return TRUE;
@@ -546,10 +552,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   /* we have to lock the queue since we span threads */
-  GST_QUEUE_MUTEX_LOCK;
-
-  if (queue->srcresult != GST_FLOW_OK)
-    goto out_flushing;
+  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
       "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
@@ -558,12 +561,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
    * the user defined as "full". Note that this only applies to buffers.
    * We always handle events and they don't count in our statistics. */
   while (gst_queue_is_filled (queue)) {
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
-    GST_QUEUE_MUTEX_LOCK;
-
-    if (queue->srcresult != GST_FLOW_OK)
-      goto out_flushing;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
     /* how are we going to make space for this buffer? */
     switch (queue->leaky) {
@@ -637,12 +637,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
         }
 
         STATUS (queue, "post-full wait");
-        GST_QUEUE_MUTEX_UNLOCK;
+        GST_QUEUE_MUTEX_UNLOCK (queue);
         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
-        GST_QUEUE_MUTEX_LOCK;
-
-        if (queue->srcresult != GST_FLOW_OK)
-          goto out_flushing;
+        GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
         break;
     }
@@ -660,14 +657,14 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
   g_cond_signal (queue->item_add);
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
 
   /* special conditions */
 out_unref:
   {
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     gst_buffer_unref (buffer);
 
@@ -679,7 +676,7 @@ out_flushing:
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason:  %d", ret);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     gst_buffer_unref (buffer);
 
@@ -697,19 +694,13 @@ gst_queue_loop (GstPad * pad)
   queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
   /* have to lock for thread-safety */
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
 restart:
-  if (queue->srcresult != GST_FLOW_OK)
-    goto out_flushing;
-
   while (gst_queue_is_empty (queue)) {
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
-    GST_QUEUE_MUTEX_LOCK;
-
-    if (queue->srcresult != GST_FLOW_OK)
-      goto out_flushing;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
     STATUS (queue, "pre-empty wait");
     while (gst_queue_is_empty (queue)) {
@@ -730,12 +721,9 @@ restart:
     }
 
     STATUS (queue, "post-empty wait");
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
-    GST_QUEUE_MUTEX_LOCK;
-
-    if (queue->srcresult != GST_FLOW_OK)
-      goto out_flushing;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
   }
 
   /* There's something in the list now, whatever it is */
@@ -752,9 +740,9 @@ restart:
     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
       queue->cur_level.time -= GST_BUFFER_DURATION (data);
 
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     result = gst_pad_push (pad, GST_BUFFER (data));
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     /* can opt to check for srcresult here but the push should
      * return an error value that is more accurate */
     if (result != GST_FLOW_OK) {
@@ -775,9 +763,9 @@ restart:
       gst_pad_pause_task (queue->srcpad);
       restart = FALSE;
     }
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_pad_push_event (queue->srcpad, GST_EVENT (data));
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     if (restart == TRUE)
       goto restart;
   }
@@ -786,7 +774,7 @@ restart:
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return;
 
@@ -794,7 +782,7 @@ out_flushing:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason:  %d", queue->srcresult);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     return;
   }
@@ -877,11 +865,11 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
     result = TRUE;
   } else {
     /* step 1, unblock chain and loop functions */
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
     g_cond_signal (queue->item_del);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     /* step 2, make sure streaming finishes */
     result = gst_pad_stop_task (pad);
@@ -900,16 +888,16 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_OK;
     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
     /* step 1, unblock chain and loop functions */
-    GST_QUEUE_MUTEX_LOCK;
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     g_cond_signal (queue->item_add);
-    GST_QUEUE_MUTEX_UNLOCK;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
 
     /* step 2, make sure streaming finishes */
     result = gst_pad_stop_task (pad);
@@ -965,7 +953,7 @@ gst_queue_set_property (GObject * object,
 
   /* someone could change levels here, and since this
    * affects the get/put funcs, we need to lock for safety. */
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK (queue);
 
   switch (prop_id) {
     case ARG_MAX_SIZE_BYTES:
@@ -1000,7 +988,7 @@ gst_queue_set_property (GObject * object,
       break;
   }
 
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 }
 
 static void
@@ -1009,7 +997,7 @@ gst_queue_get_property (GObject * object,
 {
   GstQueue *queue = GST_QUEUE (object);
 
-  GST_QUEUE_MUTEX_LOCK;
+  GST_QUEUE_MUTEX_LOCK (queue);
 
   switch (prop_id) {
     case ARG_CUR_LEVEL_BYTES:
@@ -1053,5 +1041,5 @@ gst_queue_get_property (GObject * object,
       break;
   }
 
-  GST_QUEUE_MUTEX_UNLOCK;
+  GST_QUEUE_MUTEX_UNLOCK (queue);
 }
index 2b1e5fa..58d8b87 100644 (file)
@@ -10,13 +10,13 @@ int
 main (int argc, char *argv[])
 {
   GstElement *fakesrc, *fakesink;
-  GstElement *thread;
+  GstElement *pipeline;
   gint x;
 
   gst_init (&argc, &argv);
 
-  thread = gst_thread_new ("thread");
-  g_assert (thread != NULL);
+  pipeline = gst_pipeline_new ("pipeline");
+  g_assert (pipeline != NULL);
 
   fakesrc = gst_element_factory_make ("fakesrc", "fake_source");
   g_assert (fakesrc != NULL);
@@ -24,16 +24,16 @@ main (int argc, char *argv[])
   fakesink = gst_element_factory_make ("fakesink", "fake_sink");
   g_assert (fakesink != NULL);
 
-  gst_bin_add_many (GST_BIN (thread), fakesrc, fakesink, NULL);
+  gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL);
   gst_element_link (fakesrc, fakesink);
 
   for (x = 0; x < 10; x++) {
     g_print ("playing %d\n", x);
-    gst_element_set_state (GST_ELEMENT (thread), GST_STATE_PLAYING);
+    gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
     g_usleep (G_USEC_PER_SEC);
 
     g_print ("pausing %d\n", x);
-    gst_element_set_state (GST_ELEMENT (thread), GST_STATE_PAUSED);
+    gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED);
     g_usleep (G_USEC_PER_SEC);
   }