gst/gstevent.c: Fix docs typo
authorJan Schmidt <thaytan@mad.scientist.com>
Thu, 19 Jan 2006 13:30:31 +0000 (13:30 +0000)
committerJan Schmidt <thaytan@mad.scientist.com>
Thu, 19 Jan 2006 13:30:31 +0000 (13:30 +0000)
Original commit message from CVS:
* gst/gstevent.c:
Fix docs typo

* plugins/elements/gstqueue.c: (gst_queue_handle_sink_event),
(gst_queue_chain), (gst_queue_push_one), (gst_queue_loop):
Do some refactoring. Doesn't actually change functionality,
but makes landing the DRAIN event easier later.

ChangeLog
gst/gstevent.c
plugins/elements/gstqueue.c

index e7e0215dc37beeec4143a4205f62880afc41e76e..92fdd11a1b1d5a8f45cba44278fe19ba9ee8e8d4 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,13 @@
+2006-01-19  Jan Schmidt  <thaytan@mad.scientist.com>
+
+       * gst/gstevent.c:
+         Fix docs typo
+
+       * plugins/elements/gstqueue.c: (gst_queue_handle_sink_event),
+       (gst_queue_chain), (gst_queue_push_one), (gst_queue_loop):
+         Do some refactoring. Doesn't actually change functionality,
+         but makes landing the DRAIN event easier later.
+
 2006-01-19  Tim-Philipp Müller  <tim at centricular dot net>
 
        * docs/pwg/advanced-scheduling.xml:
index 71af9051bf42dccd826dae9f2f2dea4476b85601..824aaf21267b5b6abd513d1be032cdd43716d357 100644 (file)
@@ -388,8 +388,9 @@ gst_event_new_flush_stop (void)
  * event on a pad can return UNEXPECTED as a GstFlowReturn when data
  * after the EOS event arrives.
  *
- * The EOS event will travel up to the sink elements in the pipeline
- * which will then post the GST_MESSAGE_EOS on the bus.
+ * The EOS event will travel down to the sink elements in the pipeline
+ * which will then post the GST_MESSAGE_EOS on the bus after they have
+ * finished playing any buffered data.
  *
  * When all sinks have posted an EOS message, the EOS message is
  * forwarded to the application.
index 8f527ebbbed0940c39b136a0b2d9d5c46009cb47..2cea209f4aea65fa73a0ad46c3da667bc51aca46 100644 (file)
@@ -58,7 +58,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_debug);
 #define GST_CAT_DEFAULT (queue_debug)
 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
 
-#define STATUS(queue, msg) \
+#define STATUS(queue, pad, msg) \
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
@@ -145,6 +145,7 @@ static void gst_queue_get_property (GObject * object,
 static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer);
 static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset,
     guint size, GstCaps * caps, GstBuffer ** buf);
+static gboolean gst_queue_push_one (GstQueue * queue);
 static void gst_queue_loop (GstPad * pad);
 
 static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
@@ -162,6 +163,8 @@ static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
 static GstStateChangeReturn gst_queue_change_state (GstElement * element,
     GstStateChange transition);
 
+static gboolean gst_queue_is_empty (GstQueue * queue);
+static gboolean gst_queue_is_filled (GstQueue * queue);
 
 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
 
@@ -495,23 +498,6 @@ gst_queue_locked_flush (GstQueue * queue)
   g_cond_signal (queue->item_del);
 }
 
-#define STATUS(queue, msg) \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
-                      "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
-                      "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
-                      "-%" G_GUINT64_FORMAT " ns, %u elements", \
-                      GST_DEBUG_PAD_NAME (pad), \
-                      queue->cur_level.buffers, \
-                      queue->min_threshold.buffers, \
-                      queue->max_size.buffers, \
-                      queue->cur_level.bytes, \
-                      queue->min_threshold.bytes, \
-                      queue->max_size.bytes, \
-                      queue->cur_level.time, \
-                      queue->min_threshold.time, \
-                      queue->max_size.time, \
-                      queue->queue->length)
-
 static gboolean
 gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
 {
@@ -522,7 +508,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      STATUS (queue, "received flush start event");
+      STATUS (queue, pad, "received flush start event");
       /* forward event */
       gst_pad_push_event (queue->srcpad, event);
 
@@ -539,7 +525,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
       goto done;
     case GST_EVENT_FLUSH_STOP:
-      STATUS (queue, "received flush stop event");
+      STATUS (queue, pad, "received flush stop event");
       /* forward event */
       gst_pad_push_event (queue->srcpad, event);
 
@@ -554,10 +540,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       }
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
-      STATUS (queue, "after flush");
+      STATUS (queue, pad, "after flush");
       goto done;
     case GST_EVENT_EOS:
-      STATUS (queue, "received EOS");
+      STATUS (queue, pad, "received EOS");
       have_eos = TRUE;
       break;
     default:
@@ -685,10 +671,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
         /* don't leak. Instead, wait for space to be available */
       case GST_QUEUE_NO_LEAK:
-        STATUS (queue, "pre-full wait");
+        STATUS (queue, pad, "pre-full wait");
 
         while (gst_queue_is_filled (queue)) {
-          STATUS (queue, "waiting for item_del signal from thread using qlock");
+          STATUS (queue, pad,
+              "waiting for item_del signal from thread using qlock");
           g_cond_wait (queue->item_del, queue->qlock);
 
           if (queue->srcresult != GST_FLOW_OK)
@@ -697,10 +684,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
           /* if there's a pending state change for this queue
            * or its manager, switch back to iterator so bottom
            * half of state change executes */
-          STATUS (queue, "received item_del signal from thread using qlock");
+          STATUS (queue, pad,
+              "received item_del signal from thread using qlock");
         }
 
-        STATUS (queue, "post-full wait");
+        STATUS (queue, pad, "post-full wait");
         GST_QUEUE_MUTEX_UNLOCK (queue);
         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@@ -717,7 +705,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
     queue->cur_level.time += GST_BUFFER_DURATION (buffer);
 
-  STATUS (queue, "+ level");
+  STATUS (queue, pad, "+ level");
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
   g_cond_signal (queue->item_add);
@@ -748,47 +736,11 @@ out_flushing:
   }
 }
 
-static void
-gst_queue_loop (GstPad * pad)
+static gboolean
+gst_queue_push_one (GstQueue * queue)
 {
-  GstQueue *queue;
-  GstMiniObject *data;
   gboolean restart = TRUE;
-
-  queue = GST_QUEUE (GST_PAD_PARENT (pad));
-
-  /* have to lock for thread-safety */
-  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-
-restart:
-  while (gst_queue_is_empty (queue)) {
-    GST_QUEUE_MUTEX_UNLOCK (queue);
-    g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
-    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-
-    STATUS (queue, "pre-empty wait");
-    while (gst_queue_is_empty (queue)) {
-      STATUS (queue, "waiting for item_add");
-
-      GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
-          g_thread_self ());
-      g_cond_wait (queue->item_add, queue->qlock);
-
-      /* we released the lock in the g_cond above so we might be
-       * flushing now */
-      if (queue->srcresult != GST_FLOW_OK)
-        goto out_flushing;
-
-      GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
-          g_thread_self ());
-      STATUS (queue, "got item_add signal");
-    }
-
-    STATUS (queue, "post-empty wait");
-    GST_QUEUE_MUTEX_UNLOCK (queue);
-    g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
-    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-  }
+  GstMiniObject *data;
 
   /* There's something in the list now, whatever it is */
   data = g_queue_pop_head (queue->queue);
@@ -805,7 +757,7 @@ restart:
       queue->cur_level.time -= GST_BUFFER_DURATION (data);
 
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    result = gst_pad_push (pad, GST_BUFFER (data));
+    result = gst_pad_push (queue->srcpad, GST_BUFFER (data));
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     /* else result of push indicates what happens */
@@ -840,16 +792,71 @@ restart:
     gst_pad_push_event (queue->srcpad, GST_EVENT (data));
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     if (restart == TRUE)
-      goto restart;
+      return TRUE;
   } else {
     g_warning ("Unexpected object in queue %s (refcounting problem?)",
         GST_OBJECT_NAME (queue));
   }
 
-  STATUS (queue, "after _get()");
+  STATUS (queue, queue->srcpad, "after _get()");
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
+
+  return FALSE;
+
+out_flushing:
+  gst_pad_pause_task (queue->srcpad);
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "exit because task paused, reason:  %s",
+      gst_flow_get_name (queue->srcresult));
+
+  return FALSE;                 /* FALSE == no restart */
+}
+
+static void
+gst_queue_loop (GstPad * pad)
+{
+  GstQueue *queue;
+
+  queue = GST_QUEUE (GST_PAD_PARENT (pad));
+
+  /* have to lock for thread-safety */
+  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+
+restart:
+  while (gst_queue_is_empty (queue)) {
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+
+    STATUS (queue, pad, "pre-empty wait");
+    while (gst_queue_is_empty (queue)) {
+      STATUS (queue, pad, "waiting for item_add");
+
+      GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
+          g_thread_self ());
+      g_cond_wait (queue->item_add, queue->qlock);
+
+      /* we released the lock in the g_cond above so we might be
+       * flushing now */
+      if (queue->srcresult != GST_FLOW_OK)
+        goto out_flushing;
+
+      GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
+          g_thread_self ());
+      STATUS (queue, pad, "got item_add signal");
+    }
+
+    STATUS (queue, pad, "post-empty wait");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+  }
+
+  if (gst_queue_push_one (queue))
+    goto restart;
+
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return;