Provide intrastructure to not have to pass NULL buffers on errors and interrupts...
authorWim Taymans <wim.taymans@gmail.com>
Wed, 8 Jan 2003 21:33:20 +0000 (21:33 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 8 Jan 2003 21:33:20 +0000 (21:33 +0000)
Original commit message from CVS:
Provide intrastructure to not have to pass NULL buffers on errors and
interrupts, this should fix some issues with the optimal scheduler.

gst/elements/gstidentity.c
gst/gstevent.h
gst/gstpad.c
gst/gstqueue.c
gst/schedulers/gstoptimalscheduler.c
plugins/elements/gstidentity.c
plugins/elements/gstqueue.c

index 1308f5a..0589362 100644 (file)
@@ -265,10 +265,18 @@ gst_identity_loop (GstElement *element)
   
   buf = gst_pad_pull (identity->sinkpad);
   if (GST_IS_EVENT (buf)) {
-    gst_pad_event_default (identity->sinkpad, GST_EVENT (buf));
-  }
+    GstEvent *event = GST_EVENT (buf);
 
-  gst_identity_chain (identity->sinkpad, buf);
+    if (GST_EVENT_IS_INTERRUPT (event)) {
+      gst_event_unref (event);
+    }
+    else {
+      gst_pad_event_default (identity->sinkpad, event);
+    }
+  }
+  else {
+    gst_identity_chain (identity->sinkpad, buf);
+  }
 }
 
 static void 
index 36aa287..122af2d 100644 (file)
@@ -45,7 +45,8 @@ typedef enum {
   GST_EVENT_SIZE,
   GST_EVENT_RATE,
   GST_EVENT_FILLER,
-  GST_EVENT_TS_OFFSET
+  GST_EVENT_TS_OFFSET,
+  GST_EVENT_INTERRUPT
 } GstEventType;
 
 extern GType _gst_event_type;
@@ -58,6 +59,8 @@ extern GType _gst_event_type;
 #define GST_EVENT_TIMESTAMP(event)     (GST_EVENT(event)->timestamp)
 #define GST_EVENT_SRC(event)           (GST_EVENT(event)->src)
 
+#define GST_EVENT_IS_INTERRUPT(event) (GST_EVENT_TYPE (event) == GST_EVENT_INTERRUPT)
+
 #define GST_SEEK_FORMAT_SHIFT  0
 #define GST_SEEK_METHOD_SHIFT  16
 #define GST_SEEK_FLAGS_SHIFT   20
index 62296f5..d6583ee 100644 (file)
@@ -2193,7 +2193,8 @@ gst_pad_pull (GstPad *pad)
   
   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
 
-  g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL);
+  g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, 
+                       GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)));
 
   peer = GST_RPAD_PEER (pad);
 
@@ -2204,6 +2205,7 @@ gst_pad_pull (GstPad *pad)
                       GST_PAD_NAME (pad), NULL);
   }
   else {
+restart:
     if (peer->gethandler) {
       GstBuffer *buf;
       gboolean active = GST_PAD_IS_ACTIVE (peer);
@@ -2216,12 +2218,12 @@ gst_pad_pull (GstPad *pad)
 
       if (buf) {
         if (!gst_probe_dispatcher_dispatch (&peer->probedisp, GST_DATA (buf)))
-          return NULL;
+          goto restart;
 
         if (!GST_IS_EVENT (buf) && !active) {
           g_warning ("pull on pad %s:%s but it is not active", 
                 GST_DEBUG_PAD_NAME (peer));
-          return NULL;
+          return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
         }
         return buf;
       }
@@ -2239,7 +2241,7 @@ gst_pad_pull (GstPad *pad)
                         NULL);
     }
   }
-  return NULL;
+  return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
 }
 
 /**
index c9fff22..7b23eca 100644 (file)
@@ -485,7 +485,7 @@ restart:
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
       g_mutex_unlock (queue->qlock);
       if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
-        return NULL;
+        return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
       goto restart;
     }
     if (GST_STATE (queue) != GST_STATE_PLAYING) {
index 6f0658b..6f8394d 100644 (file)
@@ -60,6 +60,7 @@ typedef enum {
   GST_OPT_SCHEDULER_STATE_STOPPED,
   GST_OPT_SCHEDULER_STATE_ERROR,
   GST_OPT_SCHEDULER_STATE_RUNNING,
+  GST_OPT_SCHEDULER_STATE_INTERRUPTED
 } GstOptSchedulerState;
 
 struct _GstOptScheduler {
@@ -768,8 +769,13 @@ get_group_schedule_function (int argc, char *argv[])
              GST_DEBUG_PAD_NAME (pad), group);
 
     buffer = GST_RPAD_GETFUNC (pad) (pad);
-    if (buffer)
+    if (buffer) {
+      if (GST_EVENT_IS_INTERRUPT (buffer)) {
+       gst_event_unref (GST_EVENT (buffer));
+       break;
+      }
       gst_pad_push (pad, buffer);
+    }
   }
 
   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
@@ -879,9 +885,14 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
     else {
       g_warning ("deadlock detected, disabling group %p", group);
       group_error_handler (group);
-      return NULL;
+      return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
     }
 #endif
+    /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
+     * >        * loop based element */
+    if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
+      return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
+    }
     
     if (GST_PAD_BUFLIST (srcpad)) {
       buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data;
@@ -1219,10 +1230,18 @@ gst_opt_scheduler_yield (GstScheduler *sched, GstElement *element)
 static gboolean
 gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
 {
+  GST_INFO (GST_CAT_SCHEDULING, "interrupt from \"%s\"", 
+            GST_ELEMENT_NAME (element));
+
 #ifdef USE_COTHREADS
   do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
   return FALSE;
 #else
+  {
+    GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+    osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
+  }
   return TRUE;
 #endif
 }
@@ -1564,6 +1583,10 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
         schedule_chain (chain);
         scheduled = TRUE;
       }
+
+      /* don't schedule any more chains when interrupted or in error */
+      if (osched->state != GST_OPT_SCHEDULER_STATE_RUNNING)
+        break;
     }
 
     /* at this point it's possible that the scheduler state is
index 1308f5a..0589362 100644 (file)
@@ -265,10 +265,18 @@ gst_identity_loop (GstElement *element)
   
   buf = gst_pad_pull (identity->sinkpad);
   if (GST_IS_EVENT (buf)) {
-    gst_pad_event_default (identity->sinkpad, GST_EVENT (buf));
-  }
+    GstEvent *event = GST_EVENT (buf);
 
-  gst_identity_chain (identity->sinkpad, buf);
+    if (GST_EVENT_IS_INTERRUPT (event)) {
+      gst_event_unref (event);
+    }
+    else {
+      gst_pad_event_default (identity->sinkpad, event);
+    }
+  }
+  else {
+    gst_identity_chain (identity->sinkpad, buf);
+  }
 }
 
 static void 
index c9fff22..7b23eca 100644 (file)
@@ -485,7 +485,7 @@ restart:
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
       g_mutex_unlock (queue->qlock);
       if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
-        return NULL;
+        return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
       goto restart;
     }
     if (GST_STATE (queue) != GST_STATE_PLAYING) {