gst/gstqueue.c: implement timeout for sending events. Workaround for if the pipeline...
authorRonald S. Bultje <rbultje@ronald.bitfreak.net>
Sun, 21 Dec 2003 18:59:06 +0000 (18:59 +0000)
committerRonald S. Bultje <rbultje@ronald.bitfreak.net>
Sun, 21 Dec 2003 18:59:06 +0000 (18:59 +0000)
Original commit message from CVS:
2003-12-21  Ronald Bultje  <rbultje@ronald.bitfreak.net>

* gst/gstqueue.c: (gst_queue_handle_pending_events),
(gst_queue_chain), (gst_queue_handle_src_event):
implement timeout for sending events. Workaround for if the
pipeline on this queue is not passing any data.

ChangeLog
gst/gstqueue.c
plugins/elements/gstqueue.c

index c5dc1dd..b1c9a61 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+2003-12-21  Ronald Bultje  <rbultje@ronald.bitfreak.net>
+
+       * gst/gstqueue.c: (gst_queue_handle_pending_events),
+       (gst_queue_chain), (gst_queue_handle_src_event):
+         implement timeout for sending events. Workaround for if the
+         pipeline on this queue is not passing any data.
+
 2003-12-21  Ronald Bultje <rbultje@ronald.bitfreak.net>
                                                                                 
         * ChangeLog: moved to gstreamer/docs/random/old/ChangeLog.gstreamer
index b9a6d94..5888c0b 100644 (file)
@@ -384,7 +384,7 @@ gst_queue_handle_pending_events (GstQueue *queue)
   while (!g_queue_is_empty (queue->events)){
     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
-    er->ret = gst_pad_event_default (GST_PAD_PEER (queue->sinkpad), er->event);
+    er->ret = gst_pad_event_default (queue->srcpad, er->event);
     er->handled = TRUE;
     g_cond_signal (queue->event_done);
     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
@@ -436,6 +436,7 @@ restart:
       case GST_EVENT_FLUSH:
         STATUS (queue, "received flush event");
         gst_queue_locked_flush (queue);
+        STATUS (queue, "after flush");
        break;
       case GST_EVENT_EOS:
        STATUS (queue, "received EOS");
@@ -777,7 +778,22 @@ gst_queue_handle_src_event (GstPad   *pad,
     /* see the chain function on why this is here - it prevents a deadlock */
     g_cond_signal (queue->item_del);
     while (!er.handled) {
-      g_cond_wait (queue->event_done, queue->qlock);
+      GTimeVal timeout;
+      g_get_current_time (&timeout);
+      g_time_val_add (&timeout, 500 * 1000); /* half a second */
+      if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
+          !er.handled) {
+        GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+                               "timeout in upstream event handling");
+        /* remove ourselves from the pending list. Since we're
+         * locked, others cannot reference this anymore. */
+        queue->queue->head = g_list_remove (queue->queue->head, &er);
+        queue->queue->head = g_list_first (queue->queue->head);
+        queue->queue->tail = g_list_last (queue->queue->head);
+        queue->queue->length--;
+        res = FALSE;
+        goto handled;
+      }
     }
     GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
                            "Event handled");
@@ -799,10 +815,9 @@ gst_queue_handle_src_event (GstPad   *pad,
         break;
     }
   }
-
+handled:
   g_mutex_unlock (queue->qlock);
 
-  /* we have to claim success, but we don't really know */
   return res;
 }
 
index b9a6d94..5888c0b 100644 (file)
@@ -384,7 +384,7 @@ gst_queue_handle_pending_events (GstQueue *queue)
   while (!g_queue_is_empty (queue->events)){
     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
-    er->ret = gst_pad_event_default (GST_PAD_PEER (queue->sinkpad), er->event);
+    er->ret = gst_pad_event_default (queue->srcpad, er->event);
     er->handled = TRUE;
     g_cond_signal (queue->event_done);
     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
@@ -436,6 +436,7 @@ restart:
       case GST_EVENT_FLUSH:
         STATUS (queue, "received flush event");
         gst_queue_locked_flush (queue);
+        STATUS (queue, "after flush");
        break;
       case GST_EVENT_EOS:
        STATUS (queue, "received EOS");
@@ -777,7 +778,22 @@ gst_queue_handle_src_event (GstPad   *pad,
     /* see the chain function on why this is here - it prevents a deadlock */
     g_cond_signal (queue->item_del);
     while (!er.handled) {
-      g_cond_wait (queue->event_done, queue->qlock);
+      GTimeVal timeout;
+      g_get_current_time (&timeout);
+      g_time_val_add (&timeout, 500 * 1000); /* half a second */
+      if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
+          !er.handled) {
+        GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+                               "timeout in upstream event handling");
+        /* remove ourselves from the pending list. Since we're
+         * locked, others cannot reference this anymore. */
+        queue->queue->head = g_list_remove (queue->queue->head, &er);
+        queue->queue->head = g_list_first (queue->queue->head);
+        queue->queue->tail = g_list_last (queue->queue->head);
+        queue->queue->length--;
+        res = FALSE;
+        goto handled;
+      }
     }
     GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
                            "Event handled");
@@ -799,10 +815,9 @@ gst_queue_handle_src_event (GstPad   *pad,
         break;
     }
   }
-
+handled:
   g_mutex_unlock (queue->qlock);
 
-  /* we have to claim success, but we don't really know */
   return res;
 }