fix queue event code
authorThomas Vander Stichele <thomas@apestaart.org>
Fri, 11 Jun 2004 15:18:58 +0000 (15:18 +0000)
committerThomas Vander Stichele <thomas@apestaart.org>
Fri, 11 Jun 2004 15:18:58 +0000 (15:18 +0000)
Original commit message from CVS:
fix queue event code

ChangeLog
gst/gstqueue.c
gst/gstqueue.h
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h

index 7f624d2..089c346 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+2004-06-11  Thomas Vander Stichele  <thomas at apestaart dot org>
+
+       * gst/gstqueue.c:
+       * gst/gstqueue.h:
+         fix removing from the wrong queue on event timeout
+         fix disposing of the event queue by casting correctly
+         add mutexes for handling the event queue
+         someone was sleeping when fixing queue last time around :)
+
 2004-06-10  Johan Dahlin  <johan@gnome.org>
 
        * gst/gst.c (gst_init_check_with_popt_table): Do not fail on
index 0f6c076..094464f 100644 (file)
@@ -288,6 +288,7 @@ gst_queue_init (GstQueue * queue)
   queue->item_del = g_cond_new ();
   queue->event_done = g_cond_new ();
   queue->events = g_queue_new ();
+  queue->event_lock = g_mutex_new ();
   queue->queue = g_queue_new ();
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
@@ -311,11 +312,14 @@ gst_queue_dispose (GObject * object)
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
   g_cond_free (queue->event_done);
+  g_mutex_lock (queue->event_lock);
   while (!g_queue_is_empty (queue->events)) {
-    GstEvent *event = g_queue_pop_head (queue->events);
+    GstQueueEventResponse *er = g_queue_pop_head (queue->events);
 
-    gst_event_unref (event);
+    gst_event_unref (er->event);
   }
+  g_mutex_unlock (queue->event_lock);
+  g_mutex_free (queue->event_lock);
   g_queue_free (queue->events);
 
   if (G_OBJECT_CLASS (parent_class)->dispose)
@@ -390,15 +394,33 @@ static void
 gst_queue_handle_pending_events (GstQueue * queue)
 {
   /* check for events to send upstream */
+  GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+      "handling pending events, events queue of size %d",
+      g_queue_get_length (queue->events));
+  g_mutex_lock (queue->event_lock);
   while (!g_queue_is_empty (queue->events)) {
-    GstQueueEventResponse *er = g_queue_pop_head (queue->events);
-
-    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream");
+    GstQueueEventResponse *er;
+
+    er = g_queue_pop_head (queue->events);
+
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+        "sending event %p (%d) from event response %p upstream",
+        er->event, GST_EVENT_TYPE (er->event), er);
+    if (er->handled) {
+      /* change this to an assert when this file gets reviewed properly. */
+      GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
+          ("already handled event %p (%d) from event response %p upstream",
+              er->event, GST_EVENT_TYPE (er->event), er));
+      break;
+    }
+    g_mutex_unlock (queue->event_lock);
     er->ret = gst_pad_event_default (queue->srcpad, er->event);
     er->handled = TRUE;
     g_cond_signal (queue->event_done);
+    g_mutex_lock (queue->event_lock);
     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
   }
+  g_mutex_unlock (queue->event_lock);
 }
 
 #define STATUS(queue, msg) \
@@ -770,6 +792,8 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
   gboolean res;
 
+  GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
+      event, GST_EVENT_TYPE (event));
   g_mutex_lock (queue->qlock);
 
   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
@@ -778,7 +802,12 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
     /* push the event to the queue and wait for upstream consumption */
     er.event = event;
     er.handled = FALSE;
+    g_mutex_lock (queue->event_lock);
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+        "putting event %p (%d) on internal queue", event,
+        GST_EVENT_TYPE (event));
     g_queue_push_tail (queue->events, &er);
+    g_mutex_unlock (queue->event_lock);
     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
         "Preparing for loop for event handler");
     /* see the chain function on why this is here - it prevents a deadlock */
@@ -791,13 +820,17 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
           !er.handled) {
         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
-            "timeout in upstream event handling");
-        /* remove ourselves from the pending list. Since we're
-         * locked, others cannot reference this anymore. */
-        queue->queue->head = g_list_remove (queue->queue->head, &er);
-        queue->queue->head = g_list_first (queue->queue->head);
-        queue->queue->tail = g_list_last (queue->queue->head);
-        queue->queue->length--;
+            "timeout in upstream event handling, dropping event %p (%s)",
+            er.event, GST_EVENT_TYPE (er.event));
+        g_mutex_lock (queue->event_lock);
+        /* since this queue is for src events (ie upstream), this thread is
+         * the only one that is pushing stuff on it, so we're sure that
+         * it's still the tail element.  FIXME: But in practice, we should use
+         * GList instead of GQueue for this so we can remove any element in
+         * the list. */
+        g_queue_pop_tail (queue->events);
+        g_mutex_unlock (queue->event_lock);
+        gst_event_unref (er.event);
         res = FALSE;
         goto handled;
       }
index 8177566..aab0c3b 100644 (file)
@@ -91,6 +91,7 @@ struct _GstQueue {
 
   GTimeVal *timeval;   /* the timeout for the queue locking */
   GQueue *events;      /* upstream events get decoupled here */
+  GMutex *event_lock;  /* lock when handling the events queue */
 
   GstCaps *negotiated_caps;
 
index 0f6c076..094464f 100644 (file)
@@ -288,6 +288,7 @@ gst_queue_init (GstQueue * queue)
   queue->item_del = g_cond_new ();
   queue->event_done = g_cond_new ();
   queue->events = g_queue_new ();
+  queue->event_lock = g_mutex_new ();
   queue->queue = g_queue_new ();
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
@@ -311,11 +312,14 @@ gst_queue_dispose (GObject * object)
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
   g_cond_free (queue->event_done);
+  g_mutex_lock (queue->event_lock);
   while (!g_queue_is_empty (queue->events)) {
-    GstEvent *event = g_queue_pop_head (queue->events);
+    GstQueueEventResponse *er = g_queue_pop_head (queue->events);
 
-    gst_event_unref (event);
+    gst_event_unref (er->event);
   }
+  g_mutex_unlock (queue->event_lock);
+  g_mutex_free (queue->event_lock);
   g_queue_free (queue->events);
 
   if (G_OBJECT_CLASS (parent_class)->dispose)
@@ -390,15 +394,33 @@ static void
 gst_queue_handle_pending_events (GstQueue * queue)
 {
   /* check for events to send upstream */
+  GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+      "handling pending events, events queue of size %d",
+      g_queue_get_length (queue->events));
+  g_mutex_lock (queue->event_lock);
   while (!g_queue_is_empty (queue->events)) {
-    GstQueueEventResponse *er = g_queue_pop_head (queue->events);
-
-    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream");
+    GstQueueEventResponse *er;
+
+    er = g_queue_pop_head (queue->events);
+
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+        "sending event %p (%d) from event response %p upstream",
+        er->event, GST_EVENT_TYPE (er->event), er);
+    if (er->handled) {
+      /* change this to an assert when this file gets reviewed properly. */
+      GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
+          ("already handled event %p (%d) from event response %p upstream",
+              er->event, GST_EVENT_TYPE (er->event), er));
+      break;
+    }
+    g_mutex_unlock (queue->event_lock);
     er->ret = gst_pad_event_default (queue->srcpad, er->event);
     er->handled = TRUE;
     g_cond_signal (queue->event_done);
+    g_mutex_lock (queue->event_lock);
     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
   }
+  g_mutex_unlock (queue->event_lock);
 }
 
 #define STATUS(queue, msg) \
@@ -770,6 +792,8 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
   gboolean res;
 
+  GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
+      event, GST_EVENT_TYPE (event));
   g_mutex_lock (queue->qlock);
 
   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
@@ -778,7 +802,12 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
     /* push the event to the queue and wait for upstream consumption */
     er.event = event;
     er.handled = FALSE;
+    g_mutex_lock (queue->event_lock);
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+        "putting event %p (%d) on internal queue", event,
+        GST_EVENT_TYPE (event));
     g_queue_push_tail (queue->events, &er);
+    g_mutex_unlock (queue->event_lock);
     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
         "Preparing for loop for event handler");
     /* see the chain function on why this is here - it prevents a deadlock */
@@ -791,13 +820,17 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
           !er.handled) {
         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
-            "timeout in upstream event handling");
-        /* remove ourselves from the pending list. Since we're
-         * locked, others cannot reference this anymore. */
-        queue->queue->head = g_list_remove (queue->queue->head, &er);
-        queue->queue->head = g_list_first (queue->queue->head);
-        queue->queue->tail = g_list_last (queue->queue->head);
-        queue->queue->length--;
+            "timeout in upstream event handling, dropping event %p (%s)",
+            er.event, GST_EVENT_TYPE (er.event));
+        g_mutex_lock (queue->event_lock);
+        /* since this queue is for src events (ie upstream), this thread is
+         * the only one that is pushing stuff on it, so we're sure that
+         * it's still the tail element.  FIXME: But in practice, we should use
+         * GList instead of GQueue for this so we can remove any element in
+         * the list. */
+        g_queue_pop_tail (queue->events);
+        g_mutex_unlock (queue->event_lock);
+        gst_event_unref (er.event);
         res = FALSE;
         goto handled;
       }
index 8177566..aab0c3b 100644 (file)
@@ -91,6 +91,7 @@ struct _GstQueue {
 
   GTimeVal *timeval;   /* the timeout for the queue locking */
   GQueue *events;      /* upstream events get decoupled here */
+  GMutex *event_lock;  /* lock when handling the events queue */
 
   GstCaps *negotiated_caps;