Another big set of changes. Connections are now also pullfunc based. gstqueue has...
authorErik Walthinsen <omega@temple-baptist.org>
Fri, 8 Dec 2000 10:33:01 +0000 (10:33 +0000)
committerErik Walthinsen <omega@temple-baptist.org>
Fri, 8 Dec 2000 10:33:01 +0000 (10:33 +0000)
Original commit message from CVS:
Another big set of changes.  Connections are now also pullfunc based.
gstqueue has been updated, I don't know of any other connections offhand.

There are still a few things that need doing, specifically the concept
of a source or connection with connections to multiple thread contexts is
not dealt with.  This may force us to move the threadstate from the
element to the pad, maybe keeping the element's copy for simple cases.
Then the Bin would create a structure to pass to the cothreaded _wrappers
of any such elements, which would detail the pads that are to be dealt with
by this particular cothread context.

That will speed things up to, since we don't have to look through the list
of all pads for every Src or Connection element for every iteration, we can
simply step through the list provided by the plan.  Special case might even
have a single pad pointer sitting there to trump the list, if there's only
one (the common case anyway).

Task 23098 is tracking these changes.  The main task 22588 depends on that
subtask, as well as 22240, which is a consistency check on PAD_DISABLED.

gst/elements/gstqueue.c
gst/gstbin.c
gst/gstconnection.c
gst/gstconnection.h
gst/gstdebug.h
gst/gstelement.c
gst/gstpad.c
plugins/elements/gstqueue.c

index ec142722f92de837d4c92db08936ff026ea3c755..7fc2d0a2423793357f8d776c5a78368e827e067a 100644 (file)
@@ -58,7 +58,7 @@ static void                   gst_queue_init          (GstQueue *queue);
 static void                    gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
 static void                    gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
 
-static void                    gst_queue_push          (GstConnection *connection);
+static void                    gst_queue_pull          (GstPad *pad);
 static void                    gst_queue_chain         (GstPad *pad, GstBuffer *buf);
 
 static void                    gst_queue_flush         (GstQueue *queue);
@@ -107,8 +107,6 @@ gst_queue_class_init (GstQueueClass *klass)
   gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
                            GTK_ARG_READWRITE, ARG_MAX_LEVEL);
 
-  gstconnection_class->push = gst_queue_push;
-
   gtkobject_class->set_arg = gst_queue_set_arg;  
   gtkobject_class->get_arg = gst_queue_get_arg;
 
@@ -123,6 +121,7 @@ gst_queue_init (GstQueue *queue)
   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
 
   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
+  gst_pad_set_pull_function (queue->srcpad, gst_queue_pull);
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
 
   queue->queue = NULL;
@@ -219,14 +218,14 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
 }
 
 static void 
-gst_queue_push (GstConnection *connection
+gst_queue_pull (GstPad *pad
 {
-  GstQueue *queue = GST_QUEUE (connection);
+  GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
   GstBuffer *buf = NULL;
   GSList *front;
   gboolean tosignal = FALSE;
   const guchar *name;
-  
+
   name = gst_element_get_name (GST_ELEMENT (queue));
 
   /* have to lock for thread-safety */
index 84d912c0baf5ab563ac85e73315b68a3a25b85ad..6aa516305ae994b5ec495adbaee642516f0f527e 100644 (file)
@@ -479,14 +479,14 @@ gst_bin_iterate (GstBin *bin)
 {
   GstBinClass *oclass;
 
-  DEBUG_ENTER("('%s')",gst_element_get_name(GST_ELEMENT(bin)));
+  DEBUG_ENTER("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
 
   oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass);
   
   if (oclass->iterate)
     (oclass->iterate) (bin);
 
-  DEBUG_LEAVE("('%s')",gst_element_get_name(GST_ELEMENT(bin)));
+  DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
 }
 
 /**
@@ -531,29 +531,21 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[])
     DEBUG("element %s ended loop function\n", name);
   } else {
     DEBUG("element %s is chain-based\n", name);
-    if (GST_IS_CONNECTION (element) && argc == 1) {
-      while (1) {
-        DEBUG("calling push function of connection %s\n", name);
-        gst_connection_push (GST_CONNECTION (element));
-        DEBUG("calling push function of connection %s done\n", name);
-      }
-    } else {
-      DEBUG("stepping through pads\n");
-      do {
-        pads = element->pads;
-        while (pads) {
-          pad = GST_PAD (pads->data);
-          if (pad->direction == GST_PAD_SINK) {
-            DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
-           buf = gst_pad_pull (pad);
-            DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
-            (pad->chainfunc) (pad,buf);
-            DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
-          }
-          pads = g_list_next (pads);
+    DEBUG("stepping through pads\n");
+    do {
+      pads = element->pads;
+      while (pads) {
+        pad = GST_PAD (pads->data);
+        if (pad->direction == GST_PAD_SINK) {
+          DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
+          buf = gst_pad_pull (pad);
+          DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
+          (pad->chainfunc) (pad,buf);
+          DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
         }
-      } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
-    }
+        pads = g_list_next (pads);
+      }
+    } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
   }
   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
 
@@ -562,39 +554,57 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[])
 }
 
 static int
-gst_bin_pullsrc_wrapper (int argc,char *argv[]) 
+gst_bin_src_wrapper (int argc,char *argv[]) 
 {
   GstElement *element = GST_ELEMENT (argv);
   GList *pads;
   GstPad *pad;
   G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
 
-  DEBUG("entering gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name);
+  DEBUG_ENTER("(%d,\"%s\")",argc,name);
 
   do {
     pads = element->pads;
     while (pads) {
       pad = GST_PAD (pads->data);
       if (pad->direction == GST_PAD_SRC) {
-        region_struct *region = cothread_get_data (element->threadstate, "region");
-        if (region) {
-         //gst_src_push_region (GST_SRC (element), region->offset, region->size);
-          if (pad->pullregionfunc == NULL) 
-           fprintf(stderr,"error, no pullregionfunc in \"%s\"\n", name);
-          (pad->pullregionfunc)(pad, region->offset, region->size);
-       }
-       else {
-          if (pad->pullfunc == NULL) 
-           fprintf(stderr,"error, no pullfunc in \"%s\"\n", name);
-          (pad->pullfunc)(pad);
-       }
+        if (pad->pullfunc == NULL) DEBUG("error, no pullfunc in \"%s\"\n", name);
+        (pad->pullfunc)(pad);
       }
       pads = g_list_next(pads);
     }
   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
 
-  DEBUG("leaving gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name);
+  DEBUG_LEAVE("");
+  return 0;
+}
+
+static int
+gst_bin_connection_wrapper (int argc,char *argv[]) 
+{
+  GstElement *element = GST_ELEMENT (argv);
+  GList *pads;
+  GstPad *pad;
+  G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
+
+  DEBUG_ENTER("(%d,\"%s\")",argc,name);
+
+  do {
+    pads = element->pads;
+    while (pads) {
+      pad = GST_PAD (pads->data);
+      if (pad->direction == GST_PAD_SRC) {
+        DEBUG("pullfunc for %s:%s(%p) at %p is set to %p\n",GST_DEBUG_PAD_NAME(pad),pad,&pad->pullfunc,pad->pullfunc);
+        if (pad->pullfunc == NULL) fprintf(stderr,"error, no pullfunc in \"%s\"\n", name);
+        (pad->pullfunc)(pad);
+      }
+      pads = g_list_next(pads);
+    }
+  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+  GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
+
+  DEBUG_LEAVE("");
   return 0;
 }
 
@@ -632,14 +642,17 @@ gst_bin_pushfunc_proxy (GstPad *pad)
 static void 
 gst_bin_create_plan_func (GstBin *bin) 
 {
+  const gchar *binname = gst_element_get_name(GST_ELEMENT(bin));
   GList *elements;
   GstElement *element;
   int sink_pads;
   GList *pads;
-  GstPad *pad, *opad, *peer;
+  GstPad *pad, *peer;
   GstElement *outside;
 
-  g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin)));
+  DEBUG_SET_STRING("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
+  DEBUG_ENTER_STRING;
+//  g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin)));
 
   // first loop through all children to see if we need cothreads
   // we break immediately when we find we need to, why keep searching?
@@ -647,25 +660,25 @@ gst_bin_create_plan_func (GstBin *bin)
   while (elements) {
     element = GST_ELEMENT (elements->data);
     
-    g_print("gstbin: found element \"%s\" in bin \"%s\"\n", 
-                   gst_element_get_name (element), 
-                   gst_element_get_name (GST_ELEMENT (bin)));
+    DEBUG("found element \"%s\" in bin \"%s\"\n", 
+         gst_element_get_name (element), 
+         gst_element_get_name (GST_ELEMENT (bin)));
     
     // if it's a loop-based element, use cothreads
     if (element->loopfunc != NULL) {
-      g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", 
-                     gst_element_get_name (element), 
-                     gst_element_get_name (GST_ELEMENT (bin)));
+      DEBUG("loop based element \"%s\" in bin \"%s\"\n", 
+            gst_element_get_name (element), 
+            gst_element_get_name (GST_ELEMENT (bin)));
       
       bin->need_cothreads = TRUE;
       break;
     }
     // if it's a complex element, use cothreads
     else if (GST_ELEMENT_IS_MULTI_IN (element)) {
-      g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", 
-                     gst_element_get_name (element), 
-                     gst_element_get_name (GST_ELEMENT (bin)));
-      
+      DEBUG("complex element \"%s\" in bin \"%s\"\n", 
+            gst_element_get_name (element), 
+            gst_element_get_name (GST_ELEMENT (bin)));
+
       bin->need_cothreads = TRUE;
       break;
     }
@@ -679,9 +692,9 @@ gst_bin_create_plan_func (GstBin *bin)
       pads = g_list_next (pads);
     }
     if (sink_pads > 1) {
-      g_print("gstbin: more than 1 sinkpad for element \"%s\" in bin \"%s\"\n", 
-                     gst_element_get_name (element), 
-                     gst_element_get_name (GST_ELEMENT (bin)));
+      DEBUG("more than 1 sinkpad for element \"%s\" in bin \"%s\"\n", 
+            gst_element_get_name (element),
+            gst_element_get_name (GST_ELEMENT (bin)));
       
       bin->need_cothreads = TRUE;
       break;
@@ -700,12 +713,12 @@ gst_bin_create_plan_func (GstBin *bin)
   bin->numentries = 0;
 
   if (bin->need_cothreads) {
-    g_print("gstbin: need cothreads\n");
+    DEBUG("NEED COTHREADS\n");
 
     // first create thread context
     if (bin->threadcontext == NULL) {
       bin->threadcontext = cothread_init ();
-      g_print("gstbin: initialized cothread context\n");
+      DEBUG("initialized cothread context\n");
     }
 
     // walk through all the children
@@ -718,59 +731,68 @@ gst_bin_create_plan_func (GstBin *bin)
         element->threadstate = cothread_create (bin->threadcontext);
         cothread_setfunc (element->threadstate, gst_bin_loopfunc_wrapper,
                           0, (char **)element);
+        DEBUG("created element threadstate %p for \"%s\"\n",element->threadstate,
+              gst_element_get_name(element));
       }
       if (GST_IS_BIN (element)) {
         gst_bin_create_plan (GST_BIN (element));
       }
 
       if (GST_IS_SRC (element)) {
-        g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name (element));
+        DEBUG("adding '%s' as entry point\n",gst_element_get_name (element));
         bin->entries = g_list_prepend (bin->entries,element);
         bin->numentries++;
-        cothread_setfunc(element->threadstate,gst_bin_pullsrc_wrapper,0,(char **)element);
+        cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
       }
 
       pads = gst_element_get_pad_list (element);
       while (pads) {
         pad = GST_PAD(pads->data);
-        g_print("gstbin: setting push&pull handlers for %s:%s\n",
-                gst_element_get_name (element), gst_pad_get_name (pad));
-
-       // an internal connection will push outside this bin.
-       if (!GST_IS_CONNECTION (element)) {
-          pad->pushfunc = gst_bin_pushfunc_proxy;
-       }
-        if (!pad->pullfunc)
-          pad->pullfunc = gst_bin_pullfunc_proxy;
-        if (!pad->pullregionfunc)
-          pad->pullregionfunc = gst_bin_pullregionfunc_proxy;
-
-        /* we only worry about sink pads */
-        if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
-          /* get the pad's peer */
+//        DEBUG("setting push&pull handlers for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+
+        if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+          DEBUG("checking/setting push proxy for srcpad %s:%s\n",
+                 GST_DEBUG_PAD_NAME(pad));
+          // set the proxy functions
+          if (!pad->pushfunc)
+            pad->pushfunc = gst_bin_pushfunc_proxy;
+
+        } else if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+          DEBUG("checking/setting pull proxies for sinkpad %s:%s\n",
+                 GST_DEBUG_PAD_NAME(pad));
+          // set the proxy functions
+          if (!pad->pullfunc)
+            pad->pullfunc = gst_bin_pullfunc_proxy;
+          if (!pad->pullfunc)
+            pad->pullregionfunc = gst_bin_pullregionfunc_proxy;
+
+          // ***** check for possible connections outside
+          // get the pad's peer
           peer = gst_pad_get_peer (pad);
+          // FIXME this should be an error condition, if not disabled
           if (!peer) break;
-          /* get the parent of the peer of the pad */
+          // get the parent of the peer of the pad
           outside = GST_ELEMENT (gst_pad_get_parent (peer));
+          // FIXME this should *really* be an error condition
           if (!outside) break;
           /* if it's a connection and it's not ours... */
           if (GST_IS_CONNECTION (outside) &&
                (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
+
+/***** this is irrelevant now *****
            GList *connection_pads = gst_element_get_pad_list (outside);
             while (connection_pads) {
               opad = GST_PAD (connection_pads->data);
               if (gst_pad_get_direction (opad) == GST_PAD_SRC) {
                 g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n",
-                       gst_element_get_name (outside),gst_pad_get_name (opad), opad, opad->pullfunc);
+                        GST_DEBUG_PAD_NAME(pad), opad, opad->pullfunc);
 
                 opad->pushfunc = gst_bin_pushfunc_proxy;
                 opad->pullfunc = gst_bin_pullfunc_proxy;
                opad->pullregionfunc = gst_bin_pullregionfunc_proxy;
 
-                if (outside->threadstate == NULL) {
-                  outside->threadstate = cothread_create (bin->threadcontext);
-                  cothread_setfunc (outside->threadstate, gst_bin_loopfunc_wrapper,
-                          1, (char **)outside);
+                cothread_setfunc (outside->threadstate, gst_bin_loopfunc_wrapper,
+                        1, (char **)outside);
                 }
              }
               connection_pads = g_list_next (connection_pads);
@@ -779,8 +801,19 @@ gst_bin_create_plan_func (GstBin *bin)
                    "for internal element \"%s\"\n",
                          gst_element_get_name (GST_ELEMENT (outside)),
                          gst_element_get_name (GST_ELEMENT (element)));
+*****/
+            DEBUG("connection '%s' outside bin is an entry\n",gst_element_get_name(outside));
            bin->entries = g_list_prepend (bin->entries,outside);
            bin->numentries++;
+// FIXME: this won't work in the case where a connection feeds multiple contexts.
+// have to have a list of internal structures of connection entries, struct passed to
+// the cothread wrapper, struct listing pads that enter that context.
+            if (outside->threadstate == NULL) {
+              outside->threadstate = cothread_create (bin->threadcontext);
+              DEBUG("created element threadstate %p for \"%s\"\n",outside->threadstate,
+                    gst_element_get_name(outside));
+            }
+            cothread_setfunc(outside->threadstate,gst_bin_connection_wrapper,0,(char **)outside);
          }
        }
         pads = g_list_next (pads);
@@ -808,7 +841,7 @@ gst_bin_create_plan_func (GstBin *bin)
           pad = GST_PAD (pads->data);
           /* we only worry about sink pads */
           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
-           g_print("gstbin: found SINK pad %s\n", gst_pad_get_name (pad));
+           g_print("gstbin '%s': found SINK pad %s:%s\n", binname, GST_DEBUG_PAD_NAME(pad));
             /* get the pad's peer */
             peer = gst_pad_get_peer (pad);
             if (!peer) {
@@ -848,7 +881,8 @@ gst_bin_iterate_func (GstBin *bin)
   GList *pads;
   GstPad *pad;
 
-  DEBUG_ENTER("(%s)", gst_element_get_name (GST_ELEMENT (bin)));
+  DEBUG_SET_STRING("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin)));
+  DEBUG_ENTER_STRING;
 
   g_return_if_fail (bin != NULL);
   g_return_if_fail (GST_IS_BIN (bin));
@@ -887,9 +921,9 @@ gst_bin_iterate_func (GstBin *bin)
           }
           pads = g_list_next (pads);
         }
-      } else if (GST_IS_CONNECTION (entry))
-        gst_connection_push (GST_CONNECTION (entry));
-      else if (GST_IS_BIN (entry))
+//      } else if (GST_IS_CONNECTION (entry)) {
+//        gst_connection_push (GST_CONNECTION (entry));
+      else if (GST_IS_BIN (entry))
         gst_bin_iterate (GST_BIN (entry));
       else {
        fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry));
index b4037cb9d29bac597e796a3c779b6a91344f6010..540375580cca88bf376cb30652696207d444b3d3 100644 (file)
@@ -91,24 +91,3 @@ gst_connection_new (gchar *name)
   
   return connection;
 }
-
-/**
- * gst_connection_push:
- * @connection: the connection to push
- *
- * Push a buffer along a connection
- */
-void 
-gst_connection_push (GstConnection *connection) 
-{
-  GstConnectionClass *oclass;
-
-  g_return_if_fail (connection != NULL);
-  g_return_if_fail (GST_IS_CONNECTION (connection));
-
-  oclass = (GstConnectionClass *)(GTK_OBJECT (connection)->klass);
-
-  g_return_if_fail (oclass->push != NULL);
-
-  (oclass->push)(connection);
-}
index 6295fe4833cf2b04ebb1fa6bc8ed670163c4c8e0..1640ff4ee0e5f8efb0844a3772b40981895b73ce 100644 (file)
@@ -50,16 +50,11 @@ struct _GstConnection {
 
 struct _GstConnectionClass {
   GstElementClass parent_class;
-
-  /* push function */
-  void (*push) (GstConnection *connection);
 };
 
 GtkType        gst_connection_get_type         (void);
 GstElement*    gst_connection_new              (gchar *name);
 
-void           gst_connection_push             (GstConnection *connection);
-
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */
index b168e928f9de8be7c392c6e1d6dc0bf9fe6aea28..5b5894e6829b1b8f4488d63f8385d2554b361de4 100644 (file)
@@ -92,13 +92,28 @@ G_GNUC_UNUSED static GModule *_debug_self_module = NULL;
   }
 
 #ifdef GST_DEBUG_ENABLED
-#define DEBUG(format,args...) fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args ))
-#define DEBUG_ENTER(format, args...) fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args ))
-#define DEBUG_LEAVE(format, args...) fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args ))
+#define DEBUG(format,args...) \
+  (_debug_string != NULL) ? \
+    fprintf(stderr,GST_DEBUG_PREFIX("%s: "format , _debug_string , ## args )) : \
+    fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args ))
+#define DEBUG_ENTER(format, args...) \
+  fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args ))
+#define DEBUG_SET_STRING(format, args...) \
+  gchar *_debug_string = g_strdup_printf(format , ## args )
+#define DEBUG_ENTER_STRING DEBUG_ENTER("%s",_debug_string)
+#define DEBUG_LEAVE(format, args...) \
+  if (_debug_string != NULL) g_free(_debug_string),\
+fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args ))
 #else
 #define DEBUG(format, args...)
 #define DEBUG_ENTER(format, args...)
 #define DEBUG_LEAVE(format, args...)
 #endif
 
+
+
+/********** some convenience macros for debugging **********/
+#define GST_DEBUG_PAD_NAME(pad) \
+  ((pad)->parent != NULL) ? gst_element_get_name(GST_ELEMENT((pad)->parent)) : "''", gst_pad_get_name(pad)
+
 #endif /* __GST_H__ */
index 9a565145da99e4fc78859ba7ca375bcf3c22ab22..c931af0301abd0ad0792e46c65d167e41fc73d74 100644 (file)
@@ -111,6 +111,7 @@ gst_element_init (GstElement *element)
   element->numpads = 0;
   element->pads = NULL;
   element->loopfunc = NULL;
+  element->threadstate = NULL;
 }
 
 /**
index 86be1ba1108365359e4e7d545b58c28fba836576..f9732f2ff73ba45e454f003b7f147353de1841ee 100644 (file)
@@ -240,9 +240,11 @@ gst_pad_set_pull_function (GstPad *pad,
   g_return_if_fail (pad != NULL);
   g_return_if_fail (GST_IS_PAD (pad));
 
-  g_print("gstpad: pad setting pull function\n");
+  // the if and such should optimize out when DEBUG is off
+  DEBUG("setting pull function for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
 
   pad->pullfunc = pull;
+  DEBUG("pullfunc for %s:%s(%p) at %p is set to %p\n",GST_DEBUG_PAD_NAME(pad),pad,&pad->pullfunc,pull);
 }
 
 /**
index ec142722f92de837d4c92db08936ff026ea3c755..7fc2d0a2423793357f8d776c5a78368e827e067a 100644 (file)
@@ -58,7 +58,7 @@ static void                   gst_queue_init          (GstQueue *queue);
 static void                    gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
 static void                    gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
 
-static void                    gst_queue_push          (GstConnection *connection);
+static void                    gst_queue_pull          (GstPad *pad);
 static void                    gst_queue_chain         (GstPad *pad, GstBuffer *buf);
 
 static void                    gst_queue_flush         (GstQueue *queue);
@@ -107,8 +107,6 @@ gst_queue_class_init (GstQueueClass *klass)
   gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
                            GTK_ARG_READWRITE, ARG_MAX_LEVEL);
 
-  gstconnection_class->push = gst_queue_push;
-
   gtkobject_class->set_arg = gst_queue_set_arg;  
   gtkobject_class->get_arg = gst_queue_get_arg;
 
@@ -123,6 +121,7 @@ gst_queue_init (GstQueue *queue)
   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
 
   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
+  gst_pad_set_pull_function (queue->srcpad, gst_queue_pull);
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
 
   queue->queue = NULL;
@@ -219,14 +218,14 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
 }
 
 static void 
-gst_queue_push (GstConnection *connection
+gst_queue_pull (GstPad *pad
 {
-  GstQueue *queue = GST_QUEUE (connection);
+  GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
   GstBuffer *buf = NULL;
   GSList *front;
   gboolean tosignal = FALSE;
   const guchar *name;
-  
+
   name = gst_element_get_name (GST_ELEMENT (queue));
 
   /* have to lock for thread-safety */