A first attempt to fix the queues in a cothreaded pipeline.
authorWim Taymans <wim.taymans@gmail.com>
Fri, 22 Sep 2000 23:35:14 +0000 (23:35 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Fri, 22 Sep 2000 23:35:14 +0000 (23:35 +0000)
Original commit message from CVS:
A first attempt to fix the queues in a cothreaded pipeline.
Some fixes to the thread handling.
Fix a bug in gstreamer-config : gthread was not included.
gst_bin_create_plan() is now done in the READY state.
a bin with only another bin in it will now work with gst_bin_iterate.
Added some examples for the queues.

35 files changed:
examples/helloworld/helloworld.c
examples/helloworld2/helloworld2.c
examples/queue/.gitignore [new file with mode: 0644]
examples/queue/queue.c [new file with mode: 0644]
examples/queue2/.gitignore [new file with mode: 0644]
examples/queue2/queue2.c [new file with mode: 0644]
examples/queue3/.gitignore [new file with mode: 0644]
examples/queue3/queue3.c [new file with mode: 0644]
examples/thread/thread.c
gst/cothreads.c
gst/cothreads.h
gst/elements/Makefile.am
gst/elements/gstqueue.c
gst/gstbin.c
gst/gstpad.c
gst/gstpipeline.c
gst/gstthread.c
gstplay/Makefile.am
gstplay/avi.c
gstplay/gstplay.c
gstplay/mpeg1.c
gstplay/mpeg2.c
gstreamer-config.in
plugins/elements/Makefile.am
plugins/elements/gstqueue.c
test/mp2tomp1.c
tests/old/examples/helloworld/helloworld.c
tests/old/examples/helloworld2/helloworld2.c
tests/old/examples/queue/.gitignore [new file with mode: 0644]
tests/old/examples/queue/queue.c [new file with mode: 0644]
tests/old/examples/queue2/.gitignore [new file with mode: 0644]
tests/old/examples/queue2/queue2.c [new file with mode: 0644]
tests/old/examples/queue3/.gitignore [new file with mode: 0644]
tests/old/examples/queue3/queue3.c [new file with mode: 0644]
tests/old/examples/thread/thread.c

index 2a3a486..3527cc0 100644 (file)
@@ -50,9 +50,6 @@ int main(int argc,char *argv[])
   gst_pad_connect(gst_element_get_pad(decoder,"src"),
                   gst_element_get_pad(audiosink,"sink"));
 
-  /* find out how to handle this bin */
-  gst_bin_create_plan(GST_BIN(bin));
-
   /* make it ready */
   gst_element_set_state(bin, GST_STATE_READY);
   /* start playing */
index f687ef6..fab7b49 100644 (file)
@@ -46,9 +46,6 @@ int main(int argc,char *argv[])
     exit(-1);
   }
 
-  /* find out how to handle this bin */
-  gst_bin_create_plan(GST_BIN(pipeline));
-
   /* make it ready */
   gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
   /* start playing */
diff --git a/examples/queue/.gitignore b/examples/queue/.gitignore
new file mode 100644 (file)
index 0000000..b7dd527
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+queue
diff --git a/examples/queue/queue.c b/examples/queue/queue.c
new file mode 100644 (file)
index 0000000..9dc42d1
--- /dev/null
@@ -0,0 +1,84 @@
+#include <gst/gst.h>
+
+gboolean playing;
+
+/* eos will be called when the src element has an end of stream */
+void eos(GstSrc *src, gpointer data) 
+{
+  g_print("have eos, quitting\n");
+
+  playing = FALSE;
+}
+
+int main(int argc,char *argv[]) 
+{
+  GstElement *disksrc, *audiosink, *parse, *decode, *queue;
+  GstElement *bin;
+  GstElement *thread;
+
+  if (argc != 2) {
+    g_print("usage: %s <filename>\n", argv[0]);
+    exit(-1);
+  }
+
+  gst_init(&argc,&argv);
+
+  /* create a new thread to hold the elements */
+  thread = gst_thread_new("thread");
+  g_assert(thread != NULL);
+
+  /* create a new bin to hold the elements */
+  bin = gst_bin_new("bin");
+  g_assert(bin != NULL);
+
+  /* create a disk reader */
+  disksrc = gst_elementfactory_make("disksrc", "disk_source");
+  g_assert(disksrc != NULL);
+  gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
+  gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
+                     GTK_SIGNAL_FUNC(eos), thread);
+
+  parse = gst_elementfactory_make("mp3parse", "parse");
+  decode = gst_elementfactory_make("mpg123", "decode");
+
+  queue = gst_elementfactory_make("queue", "queue");
+
+  /* and an audio sink */
+  audiosink = gst_elementfactory_make("audiosink", "play_audio");
+  g_assert(audiosink != NULL);
+
+  /* add objects to the main pipeline */
+  gst_bin_add(GST_BIN(bin), disksrc);
+  gst_bin_add(GST_BIN(bin), parse);
+  gst_bin_add(GST_BIN(bin), decode);
+  gst_bin_add(GST_BIN(bin), queue);
+
+  gst_bin_add(GST_BIN(thread), audiosink);
+
+  gst_bin_add(GST_BIN(bin), thread);
+  
+  gst_pad_connect(gst_element_get_pad(disksrc,"src"),
+                  gst_element_get_pad(parse,"sink"));
+  gst_pad_connect(gst_element_get_pad(parse,"src"),
+                  gst_element_get_pad(decode,"sink"));
+  gst_pad_connect(gst_element_get_pad(decode,"src"),
+                  gst_element_get_pad(queue,"sink"));
+  gst_pad_connect(gst_element_get_pad(queue,"src"),
+                  gst_element_get_pad(audiosink,"sink"));
+
+  /* make it ready */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
+  /* start playing */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
+
+  playing = TRUE;
+
+  while (playing) {
+    gst_bin_iterate(GST_BIN(bin));
+  }
+
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
+
+  exit(0);
+}
+
diff --git a/examples/queue2/.gitignore b/examples/queue2/.gitignore
new file mode 100644 (file)
index 0000000..ca0136a
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+queue2
diff --git a/examples/queue2/queue2.c b/examples/queue2/queue2.c
new file mode 100644 (file)
index 0000000..845f491
--- /dev/null
@@ -0,0 +1,81 @@
+#include <gst/gst.h>
+
+gboolean playing;
+
+/* eos will be called when the src element has an end of stream */
+void eos(GstSrc *src, gpointer data) 
+{
+  g_print("have eos, quitting\n");
+
+  playing = FALSE;
+}
+
+int main(int argc,char *argv[]) 
+{
+  GstElement *disksrc, *audiosink, *queue;
+  GstElement *pipeline;
+  GstElement *thread;
+
+  if (argc != 2) {
+    g_print("usage: %s <filename>\n", argv[0]);
+    exit(-1);
+  }
+
+  gst_init(&argc,&argv);
+
+  /* create a new thread to hold the elements */
+  thread = gst_thread_new("thread");
+  g_assert(thread != NULL);
+
+  /* create a new bin to hold the elements */
+  pipeline = gst_pipeline_new("pipeline");
+  g_assert(pipeline != NULL);
+
+  /* create a disk reader */
+  disksrc = gst_elementfactory_make("disksrc", "disk_source");
+  g_assert(disksrc != NULL);
+  gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
+  gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
+                     GTK_SIGNAL_FUNC(eos), thread);
+
+  queue = gst_elementfactory_make("queue", "queue");
+
+  /* and an audio sink */
+  audiosink = gst_elementfactory_make("audiosink", "play_audio");
+  g_assert(audiosink != NULL);
+
+  /* add objects to the main pipeline */
+  gst_pipeline_add_src(GST_PIPELINE(pipeline), disksrc);
+  gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
+
+  gst_bin_add(GST_BIN(thread), audiosink);
+  
+  gst_pad_connect(gst_element_get_pad(queue,"src"),
+                  gst_element_get_pad(audiosink,"sink"));
+
+  gst_pad_set_type_id(gst_element_get_pad(queue, "sink"),
+       gst_pad_get_type_id(gst_element_get_pad(audiosink, "sink")));
+
+  if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
+    g_print("cannot autoplug pipeline\n");
+    exit(-1);
+  }
+
+  gst_bin_add(GST_BIN(pipeline), thread);
+
+  /* make it ready */
+  gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
+  /* start playing */
+  gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);
+
+  playing = TRUE;
+
+  while (playing) {
+    gst_bin_iterate(GST_BIN(pipeline));
+  }
+
+  gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);
+
+  exit(0);
+}
+
diff --git a/examples/queue3/.gitignore b/examples/queue3/.gitignore
new file mode 100644 (file)
index 0000000..b89de20
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+queue3
diff --git a/examples/queue3/queue3.c b/examples/queue3/queue3.c
new file mode 100644 (file)
index 0000000..fa79a78
--- /dev/null
@@ -0,0 +1,85 @@
+#include <gst/gst.h>
+
+gboolean playing;
+
+/* eos will be called when the src element has an end of stream */
+void eos(GstSrc *src, gpointer data) 
+{
+  g_print("have eos, quitting\n");
+
+  playing = FALSE;
+}
+
+int main(int argc,char *argv[]) 
+{
+  GstElement *disksrc, *audiosink, *queue, *parse, *decode;
+  GstElement *bin;
+  GstElement *thread;
+
+  if (argc != 2) {
+    g_print("usage: %s <filename>\n", argv[0]);
+    exit(-1);
+  }
+
+  gst_init(&argc,&argv);
+
+  /* create a new thread to hold the elements */
+  thread = gst_thread_new("thread");
+  g_assert(thread != NULL);
+
+  /* create a new bin to hold the elements */
+  bin = gst_bin_new("bin");
+  g_assert(bin != NULL);
+
+  /* create a disk reader */
+  disksrc = gst_elementfactory_make("disksrc", "disk_source");
+  g_assert(disksrc != NULL);
+  gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
+  gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
+                     GTK_SIGNAL_FUNC(eos), thread);
+
+  queue = gst_elementfactory_make("queue", "queue");
+
+  /* and an audio sink */
+  audiosink = gst_elementfactory_make("audiosink", "play_audio");
+  g_assert(audiosink != NULL);
+
+  parse = gst_elementfactory_make("mp3parse", "parse");
+  decode = gst_elementfactory_make("mpg123", "decode");
+
+  /* add objects to the main bin */
+  gst_bin_add(GST_BIN(bin), disksrc);
+  gst_bin_add(GST_BIN(bin), queue);
+
+  gst_bin_add(GST_BIN(thread), parse);
+  gst_bin_add(GST_BIN(thread), decode);
+  gst_bin_add(GST_BIN(thread), audiosink);
+  
+  gst_pad_connect(gst_element_get_pad(disksrc,"src"),
+                  gst_element_get_pad(queue,"sink"));
+
+  gst_pad_connect(gst_element_get_pad(queue,"src"),
+                  gst_element_get_pad(parse,"sink"));
+  gst_pad_connect(gst_element_get_pad(parse,"src"),
+                  gst_element_get_pad(decode,"sink"));
+  gst_pad_connect(gst_element_get_pad(decode,"src"),
+                  gst_element_get_pad(audiosink,"sink"));
+
+  gst_bin_add(GST_BIN(bin), thread);
+
+  /* make it ready */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
+  /* start playing */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
+
+  playing = TRUE;
+
+  while (playing) {
+    gst_bin_iterate(GST_BIN(bin));
+  }
+
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
+
+  exit(0);
+}
+
index 4d8073d..4cd7721 100644 (file)
@@ -53,9 +53,9 @@ int main(int argc,char *argv[])
     exit(-1);
   }
 
-  gst_bin_remove(GST_BIN(pipeline), disksrc);
+  //gst_bin_remove(GST_BIN(pipeline), disksrc);
 
-  gst_bin_add(GST_BIN(thread), disksrc);
+  //gst_bin_add(GST_BIN(thread), disksrc);
   gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline));
 
   /* make it ready */
index 32062fe..9ca1242 100644 (file)
@@ -41,6 +41,7 @@ cothread_state *cothread_create(cothread_context *ctx) {
   s->threadnum = ctx->nthreads;
   s->flags = 0;
   s->sp = ((int *)s + COTHREAD_STACKSIZE);
+  s->top_sp = s->sp;
 
   ctx->threads[ctx->nthreads++] = s;
 
@@ -103,6 +104,7 @@ void cothread_stub() {
     thread->func(thread->argc,thread->argv);
   thread->flags &= ~COTHREAD_STARTED;
   thread->pc = 0;
+  thread->sp = thread->top_sp;
   DEBUG("cothread: cothread_stub() exit\n");
   //printf("uh, yeah, we shouldn't be here, but we should deal anyway\n");
 }
@@ -113,8 +115,10 @@ void cothread_switch(cothread_state *thread) {
   int enter;
 //  int i;
 
-  if (thread == NULL)
+  if (thread == NULL) {
+    g_print("cothread: there's no thread, strange...\n");
     return;
+  }
 
   ctx = thread->ctx;
 
@@ -124,12 +128,10 @@ void cothread_switch(cothread_state *thread) {
     exit(2);
   }
 
-  /*
   if (current == thread) {
     g_print("cothread: trying to switch to same thread, legal but not necessary\n");
-    //return;
+    return;
   }
-  */
 
   // find the number of the thread to switch to
   ctx->current = thread->threadnum;
@@ -137,11 +139,14 @@ void cothread_switch(cothread_state *thread) {
 
   /* save the current stack pointer, frame pointer, and pc */
   GET_SP(current->sp);
-  enter = setjmp(current->jmp);
-  DEBUG("cothread: after thread #%d %d\n",ctx->current, enter);
+  enter = sigsetjmp(current->jmp, 1);
   if (enter != 0) {
+    DEBUG("cothread: enter thread #%d %d %p<->%p (%d)\n",current->threadnum, enter, 
+                   current->sp, current->top_sp, current->top_sp-current->sp);
     return;
   }
+  DEBUG("cothread: exit thread #%d %d %p<->%p (%d)\n",current->threadnum, enter, 
+                   current->sp, current->top_sp, current->top_sp-current->sp);
   enter = 1;
 
   DEBUG("cothread: set stack to %p\n", thread->sp);
@@ -150,7 +155,7 @@ void cothread_switch(cothread_state *thread) {
     DEBUG("cothread: in thread \n");
     SET_SP(thread->sp);
     // switch to it
-    longjmp(thread->jmp,1);
+    siglongjmp(thread->jmp,1);
   } else {
     SETUP_STACK(thread->sp);
     SET_SP(thread->sp);
@@ -158,5 +163,6 @@ void cothread_switch(cothread_state *thread) {
     //JUMP(cothread_stub);
     cothread_stub();
     DEBUG("cothread: exit thread \n");
+    ctx->current = 0;
   }
 }
index 63fef42..3a40dd6 100644 (file)
@@ -29,8 +29,9 @@ struct _cothread_state {
 
   int flags;
   int *sp;
+  int *top_sp;
   int *pc;
-  jmp_buf jmp;
+  sigjmp_buf jmp;
 };
 
 struct _cothread_context {
index 420b551..bf57b27 100644 (file)
@@ -42,7 +42,7 @@ noinst_HEADERS =              \
        gsttypefind.h           \
        gstsinesrc.h
 
-CFLAGS += -O2 -Wall
+CFLAGS += -O2 -Wall 
 
 libgstelements_la_LIBADD = $(GLIB_LIBS) $(GTK_LIBS) $(GHTTP_LIBS)
 libgstelements_la_LDFLAGS = -version-info $(STREAMER_CURRENT):$(STREAMER_REVISION):$(STREAMER_AGE)
index 5352ded..b9af36d 100644 (file)
@@ -27,6 +27,7 @@
 
 #include <gstqueue.h>
 
+#include <gst/gstarch.h>
 
 GstElementDetails gst_queue_details = {
   "Queue",
@@ -105,6 +106,7 @@ static void gst_queue_class_init(GstQueueClass *klass) {
 static void gst_queue_init(GstQueue *queue) {
   queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
   gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
+
   gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
   queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
   gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
@@ -150,9 +152,10 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
 
   /* we have to lock the queue since we span threads */
 
-  DEBUG("queue: %s adding buffer %p\n", name, buf);
+  DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self());
   
   GST_LOCK(queue);
+  DEBUG("queue: have queue lock\n");
 
   if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
     g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); 
@@ -164,18 +167,17 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
 
   DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
 
-  if (queue->level_buffers >= queue->max_buffers) {
+  g_mutex_lock(queue->fulllock);
+  while (queue->level_buffers >= queue->max_buffers) {
     DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
-    while (queue->level_buffers >= queue->max_buffers) {
-      GST_UNLOCK(queue);
-      g_mutex_lock(queue->fulllock);
-      STATUS("%s: O\n");
-      g_cond_wait(queue->fullcond,queue->fulllock);
-      g_mutex_unlock(queue->fulllock);
-      GST_LOCK(queue);
-    }
+    STATUS("%s: O\n");
+    GST_UNLOCK(queue);
+    g_cond_wait(queue->fullcond,queue->fulllock);
+    GST_LOCK(queue);
+    STATUS("%s: O+\n");
     DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
   }
+  g_mutex_unlock(queue->fulllock);
   
 
   /* put the buffer on the head of the list */
@@ -192,18 +194,19 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
   STATUS("%s: +\n");
 
   /* if we were empty, but aren't any more, signal a condition */
-  tosignal = (queue->level_buffers <= 0);
+  tosignal = (queue->level_buffers >= 0);
   queue->level_buffers++;
 
   /* we can unlock now */
-  DEBUG("queue: %s chain %d end\n", name, queue->level_buffers);
+  DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
   GST_UNLOCK(queue);
 
   if (tosignal) {
     g_mutex_lock(queue->emptylock);
+    STATUS("%s: >\n");
     g_cond_signal(queue->emptycond);
+    STATUS("%s: >>\n");
     g_mutex_unlock(queue->emptylock);
-    //g_print(">");
   }
 }
 
@@ -216,17 +219,20 @@ void gst_queue_push(GstConnection *connection) {
   
   name = gst_element_get_name(GST_ELEMENT(queue));
 
-  DEBUG("queue: %s push %d\n", name, queue->level_buffers);
+  DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
   /* have to lock for thread-safety */
+  DEBUG("queue: try have queue lock\n");
   GST_LOCK(queue);
+  DEBUG("queue: have queue lock\n");
 
   while (!queue->level_buffers) {
+    STATUS("%s: U released lock\n");
     GST_UNLOCK(queue);
     g_mutex_lock(queue->emptylock);
-    STATUS("%s: U\n");
     g_cond_wait(queue->emptycond,queue->emptylock);
     g_mutex_unlock(queue->emptylock);
     GST_LOCK(queue);
+    STATUS("%s: U- getting lock\n");
   }
 
   front = queue->queue;
@@ -240,13 +246,15 @@ void gst_queue_push(GstConnection *connection) {
 
   if (tosignal) {
     g_mutex_lock(queue->fulllock);
+    STATUS("%s: < \n");
     g_cond_signal(queue->fullcond);
+    STATUS("%s: << \n");
     g_mutex_unlock(queue->fulllock);
   }
 
-  DEBUG("queue: %s pushing %d %p\n", name, queue->level_buffers, buf);
+  //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf);
   gst_pad_push(queue->srcpad,buf);
-  DEBUG("queue: %s pushing %d done\n", name, queue->level_buffers);
+  //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers);
 
   /* unlock now */
 }
index b2748c0..d10051c 100644 (file)
@@ -202,6 +202,9 @@ static GstElementStateReturn gst_bin_change_state(GstElement *element) {
           _gst_print_statename(GST_STATE(element)),GST_STATE_PENDING(element),
           _gst_print_statename(GST_STATE_PENDING(element)));
 
+  if (GST_STATE_PENDING(element) == GST_STATE_READY) {
+    gst_bin_create_plan(bin);
+  }
 //  g_return_val_if_fail(bin->numchildren != 0, GST_STATE_FAILURE);
 
 //  g_print("-->\n");
@@ -226,18 +229,12 @@ static GstElementStateReturn gst_bin_change_state(GstElement *element) {
   }
 //  g_print("<-- \"%s\"\n",gst_object_get_name(GST_OBJECT(bin)));
 
-//  if (GST_STATE_PENDING(element),
 
   return gst_bin_change_state_norecurse(bin);
 }
 
 
 static GstElementStateReturn gst_bin_change_state_norecurse(GstBin *bin) {
-/*
-  if ((state == GST_STATE_READY) && (GST_STATE(bin) < GST_STATE_READY)) {
-//    gst_bin_create_plan(
-  }
-*/
 
   if (GST_ELEMENT_CLASS(parent_class)->change_state)
     return GST_ELEMENT_CLASS(parent_class)->change_state(GST_ELEMENT(bin));
@@ -292,8 +289,8 @@ gboolean gst_bin_set_state_type(GstBin *bin,
                                 GtkType type) {
   GstBinClass *oclass;
 
-//  g_print("gst_bin_set_state_type(\"%s\",%d,%d)\n",
-//          gst_object_get_name(GST_OBJECT(bin)),state,type);
+  DEBUG("gst_bin_set_state_type(\"%s\",%d,%d)\n",
+          gst_element_get_name(GST_ELEMENT(bin)),state,type);
 
   g_return_val_if_fail(bin != NULL, FALSE);
   g_return_val_if_fail(GST_IS_BIN(bin), FALSE);
@@ -310,7 +307,7 @@ void gst_bin_real_destroy(GtkObject *object) {
   GList *children;
   GstElement *child;
 
-//  g_print("in gst_bin_real_destroy()\n");
+  DEBUG("in gst_bin_real_destroy()\n");
 
   children = bin->children;
   while (children) {
@@ -427,32 +424,38 @@ static int gst_bin_loopfunc_wrapper(int argc,char *argv[]) {
   GList *pads;
   GstPad *pad;
   GstBuffer *buf;
+  gchar *name = gst_element_get_name(element);
 
-//  g_print("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n",
-//          argc,gst_element_get_name(element));
+  DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n",
+          argc,gst_element_get_name(element));
 
   if (element->loopfunc != NULL) {
     while (1) {
-      DEBUG("** gst_bin_loopfunc_wrapper(): element has loop function, calling it\n");
+      DEBUG("** gst_bin_loopfunc_wrapper(): element %s has loop function, calling it\n", name);
       (element->loopfunc)(element);
-      DEBUG("** gst_bin_loopfunc_wrapper(): element ended loop function\n");
+      DEBUG("** gst_bin_loopfunc_wrapper(): element %s ended loop function\n", name);
     }
   } else {
-    DEBUG("** gst_bin_loopfunc_wrapper(): element is chain-based, calling in infinite loop\n");
+    DEBUG("** gst_bin_loopfunc_wrapper(): element %s is chain-based, calling in infinite loop\n", name);
     if (GST_IS_SRC(element)) {
-      //while (1) {
-        DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source\n");
-        gst_src_push(GST_SRC(element));
-      //}
+      DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s\n", name);
+      gst_src_push(GST_SRC(element));
+      DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s done\n", name);
+    } else if (GST_IS_CONNECTION(element) && argc == 1) {
+      while (1) {
+        DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s\n", name);
+        gst_connection_push(GST_CONNECTION(element));
+        DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s done\n", name);
+      }
     } else {
       while (1) {
         pads = element->pads;
         while (pads) {
           pad = GST_PAD(pads->data);
           if (pad->direction == GST_PAD_SINK) {
-            DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer\n");
-            buf = gst_pad_pull(pad);
-            DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function\n");
+            DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer from %s:%s\n", name, gst_pad_get_name(pad));
+           buf = gst_pad_pull(pad);
+            DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function of %s:%s\n", name, gst_pad_get_name(pad));
             (pad->chainfunc)(pad,buf);
           }
           pads = g_list_next(pads);
@@ -484,7 +487,7 @@ static void gst_bin_create_plan_func(GstBin *bin) {
   GstElement *element;
   int sink_pads;
   GList *pads;
-  GstPad *pad, *peer;
+  GstPad *pad, *opad, *peer;
   GstElement *outside;
 
   g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin)));
@@ -499,13 +502,11 @@ static void gst_bin_create_plan_func(GstBin *bin) {
     if (element->loopfunc != NULL) {
       g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
       bin->need_cothreads = TRUE;
-      break;
     }
     // if it's a complex element, use cothreads
-    if (GST_ELEMENT_IS_MULTI_IN(element)) {
+    else if (GST_ELEMENT_IS_MULTI_IN(element)) {
       g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
       bin->need_cothreads = TRUE;
-      break;
     }
     // if it has more than one input pad, use cothreads
     sink_pads = 0;
@@ -527,6 +528,11 @@ static void gst_bin_create_plan_func(GstBin *bin) {
   // FIXME
   bin->need_cothreads &= bin->use_cothreads;
 
+  // clear previous plan state
+  g_list_free(bin->entries);
+  bin->entries = NULL;
+  bin->numentries = 0;
+
   if (bin->need_cothreads) {
     g_print("gstbin: need cothreads\n");
 
@@ -551,22 +557,56 @@ static void gst_bin_create_plan_func(GstBin *bin) {
       pads = gst_element_get_pad_list(element);
       while (pads) {
         pad = GST_PAD(pads->data);
-g_print("gstbin: setting push&pull handlers for %s:%s\n",
-gst_element_get_name(element),gst_pad_get_name(pad));
-//        if (pad->direction == GST_PAD_SRC)
+        g_print("gstbin: setting push&pull handlers for %s:%s\n",
+                gst_element_get_name(element),gst_pad_get_name(pad));
+
+       // an internal connection will push outside this bin.
+       if (!GST_IS_CONNECTION(element)) {
           pad->pushfunc = gst_bin_pushfunc_wrapper;
-//        else
-          pad->pullfunc = gst_bin_pullfunc_wrapper;
+       }
+        pad->pullfunc = gst_bin_pullfunc_wrapper;
+
+        /* we only worry about sink pads */
+        if (gst_pad_get_direction(pad) == GST_PAD_SINK) {
+          /* get the pad's peer */
+          peer = gst_pad_get_peer(pad);
+          if (!peer) break;
+          /* get the parent of the peer of the pad */
+          outside = GST_ELEMENT(gst_pad_get_parent(peer));
+          if (!outside) break;
+          /* if it's a connection and it's not ours... */
+          if (GST_IS_CONNECTION(outside) &&
+               (gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) {
+           GList *connection_pads = gst_element_get_pad_list(outside);
+            while (connection_pads) {
+              opad = GST_PAD(connection_pads->data);
+              if (gst_pad_get_direction(opad) == GST_PAD_SRC) {
+                g_print("gstbin: setting push&pull handlers for %s:%s SRC connection\n",
+                       gst_element_get_name(outside),gst_pad_get_name(opad));
+                opad->pushfunc = gst_bin_pushfunc_wrapper;
+                opad->pullfunc = gst_bin_pullfunc_wrapper;
+                if (outside->threadstate == NULL) {
+                  outside->threadstate = cothread_create(bin->threadcontext);
+                  cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper,
+                         1,(char **)outside);
+                }
+             }
+              connection_pads = g_list_next(connection_pads);
+           }
+            gst_info("gstbin: element \"%s\" is the external source Connection "
+                   "for internal element \"%s\"\n",
+                         gst_element_get_name(GST_ELEMENT(outside)),
+                         gst_element_get_name(GST_ELEMENT(element)));
+           bin->entries = g_list_prepend(bin->entries,outside);
+           bin->numentries++;
+         }
+       }
         pads = g_list_next(pads);
       }
       elements = g_list_next(elements);
    }
   } else {
     g_print("gstbin: don't need cothreads, looking for entry points\n");
-    // clear previous plan state
-    g_list_free(bin->entries);
-    bin->entries = NULL;
-    bin->numentries = 0;
     // we have to find which elements will drive an iteration
     elements = bin->children;
     while (elements) {
@@ -592,8 +632,8 @@ gst_element_get_name(element),gst_pad_get_name(pad));
             /* if it's a connection and it's not ours... */
             if (GST_IS_CONNECTION(outside) &&
                  (gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) {
-              gst_info("gstbin: element \"%s\" is the external source Connection \
-                                   for internal element \"%s\"\n",
+              gst_info("gstbin: element \"%s\" is the external source Connection "
+                                   "for internal element \"%s\"\n",
                            gst_element_get_name(GST_ELEMENT(outside)),
                            gst_element_get_name(GST_ELEMENT(element)));
              bin->entries = g_list_prepend(bin->entries,outside);
@@ -617,18 +657,23 @@ void gst_bin_iterate_func(GstBin *bin) {
   g_return_if_fail(bin != NULL);
   g_return_if_fail(GST_IS_BIN(bin));
   g_return_if_fail(GST_STATE(bin) == GST_STATE_PLAYING);
-  g_return_if_fail(bin->numentries > 0);
 
   DEBUG("GstBin: iterating\n");
 
   if (bin->need_cothreads) {
     // all we really have to do is switch to the first child
     // FIXME this should be lots more intelligent about where to start
-  DEBUG("** in gst_bin_iterate_func()==================================%s\n",
+    DEBUG("** in gst_bin_iterate_func()==================================%s\n",
           gst_element_get_name(GST_ELEMENT(bin->children->data)));
     cothread_switch(GST_ELEMENT(bin->children->data)->threadstate);
   } else {
-    entries = bin->entries;
+    if (bin->numentries <= 0) {
+      printf("gstbin: no elements in bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin)));
+      entries = bin->children;
+    }
+    else {
+      entries = bin->entries;
+    }
 
     while (entries) {
       entry = GST_ELEMENT(entries->data);
@@ -636,6 +681,8 @@ void gst_bin_iterate_func(GstBin *bin) {
         gst_src_push(GST_SRC(entry));
       else if (GST_IS_CONNECTION(entry))
         gst_connection_push(GST_CONNECTION(entry));
+      else if (GST_IS_BIN(entry))
+        gst_bin_iterate(GST_BIN(entry));
       else
         g_assert_not_reached();
       entries = g_list_next(entries);
index cf3bf9d..ad3f5dd 100644 (file)
@@ -183,16 +183,11 @@ void gst_pad_set_qos_function(GstPad *pad,GstPadQoSFunction qos) {
 void gst_pad_push(GstPad *pad,GstBuffer *buffer) {
   g_return_if_fail(pad != NULL);
   g_return_if_fail(GST_IS_PAD(pad));
-//  g_return_if_fail(GST_PAD_CONNECTED(pad));
+  g_return_if_fail(GST_PAD_CONNECTED(pad));
   g_return_if_fail(buffer != NULL);
 
   gst_trace_add_entry(NULL,0,buffer,"push buffer");
 
-  // FIXME we should probably make some noise here...
-  if (!GST_PAD_CONNECTED(pad)) return;
-
-//  g_return_if_fail(pad->pushfunc != NULL);
-
   // first check to see if there's a push handler
   if (pad->pushfunc != NULL) {
     //g_print("-- gst_pad_push(): putting buffer in pen and calling push handler\n");
@@ -230,7 +225,7 @@ GstBuffer *gst_pad_pull(GstPad *pad) {
 //      g_print("-- gst_pad_pull(): calling pull handler\n");
       (pad->pullfunc)(pad->peer);
     } else {
-//      g_print("-- gst_pad_pull(): no buffer in pen, and no handler to get one there!!!\n");
+      g_print("-- gst_pad_pull(): no buffer in pen, and no handler to get one there!!!\n");
     }
   }
 
@@ -242,7 +237,7 @@ GstBuffer *gst_pad_pull(GstPad *pad) {
     return buf;
   // else we have a big problem...
   } else {
-//    g_print("-- gst_pad_pull(): uh, nothing in pen and no handler\n");
+    g_print("-- gst_pad_pull(): uh, nothing in pen and no handler\n");
     return NULL;
   }
 
index adcc010..3b52084 100644 (file)
@@ -347,10 +347,27 @@ gboolean gst_pipeline_autoplug(GstPipeline *pipeline) {
 
   elements = pipeline->sinks;
 
-  // fase 2, find all the sinks.. 
+  // fase 2, loop over all the sinks.. 
   while (elements) {
+    GList *pads;
+    GstPad *pad;
+
     element = GST_ELEMENT(elements->data);
 
+    pads = gst_element_get_pad_list(element);
+
+    while (pads) {
+      pad = (GstPad *)pads->data;
+
+      if (pad->direction == GST_PAD_SINK) {
+       sink_type = gst_pad_get_type_id(pad);
+        sinkelement = element;
+       break;
+      }
+
+      pads = g_list_next(pads);
+    }
+    /*
     if (GST_IS_SINK(element)) {
       g_print("GstPipeline: found sink \"%s\"\n", gst_element_get_name(element));
 
@@ -369,6 +386,7 @@ gboolean gst_pipeline_autoplug(GstPipeline *pipeline) {
                        gst_element_get_name(element), sink_type);
       }
     }
+    */
 
     elements = g_list_next(elements);
   }
index 41adceb..b84533d 100644 (file)
@@ -105,9 +105,6 @@ gst_thread_class_init(GstThreadClass *klass) {
 static void gst_thread_init(GstThread *thread) {
   GST_FLAG_SET(thread,GST_THREAD_CREATE);
 
-//  thread->entries = NULL;
-//  thread->numentries = 0;
-
   thread->lock = g_mutex_new();
   thread->cond = g_cond_new();
 }
@@ -178,10 +175,13 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) {
 
   pending = GST_STATE_PENDING(element);
 
+  if (pending == GST_STATE(element)) return GST_STATE_SUCCESS;
+
   if (GST_ELEMENT_CLASS(parent_class)->change_state)
     stateset = GST_ELEMENT_CLASS(parent_class)->change_state(element);
   
-  gst_info("gstthread: stateset %d %d\n", stateset, GST_STATE_PENDING(element));
+  gst_info("gstthread: stateset %d %d %d\n", GST_STATE(element), stateset, GST_STATE_PENDING(element));
+
 
   switch (pending) {
     case GST_STATE_READY:
@@ -189,13 +189,10 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) {
       // we want to prepare our internal state for doing the iterations
       gst_info("gstthread: preparing thread \"%s\" for iterations:\n",
                gst_element_get_name(GST_ELEMENT(element)));
-      //gst_thread_prepare(thread);
       
-      gst_bin_create_plan(GST_BIN(thread));
-//      if (thread->numentries == 0)
-//        return FALSE;
       // set the state to idle
       GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING);
+      GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING);
       // create the thread if that's what we're supposed to do
       gst_info("gstthread: flags are 0x%08x\n",GST_FLAGS(thread));
       if (GST_FLAG_IS_SET(thread,GST_THREAD_CREATE)) {
@@ -214,12 +211,14 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) {
       gst_info("gstthread: starting thread \"%s\"\n",
               gst_element_get_name(GST_ELEMENT(element)));
       GST_FLAG_SET(thread,GST_THREAD_STATE_SPINNING);
+      GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING);
       gst_thread_signal_thread(thread);
       break;  
     case GST_STATE_PAUSED:
       gst_info("gstthread: pausing thread \"%s\"\n",
               gst_element_get_name(GST_ELEMENT(element)));
       GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING);
+      GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING);
       gst_thread_signal_thread(thread);
       break;
     case GST_STATE_NULL:
index 63981b3..5147c0f 100644 (file)
@@ -1,7 +1,7 @@
 ## Process this file with automake to produce Makefile.in
 
 INCLUDES = $(GLIB_CFLAGS) $(GTK_CFLAGS) -I$(top_srcdir) \
-                  $(shell gnome-config --cflags gnomeui)
+                  $(shell gnome-config --cflags gnomeui) $(shell gstreamer-config --cflags)
 
 
 bin_PROGRAMS = gstplay
@@ -19,8 +19,10 @@ noinst_HEADERS = codecs.h
 
 CFLAGS += -O2 -Wall -DDATADIR=\""$(gladedir)/"\"
 
-gstplay_CFLAGS = $(shell gnome-config --cflags gnomeui) $(shell libglade-config --cflags gnome) 
-gstplay_LDFLAGS = $(shell gnome-config --libs gnomeui) $(shell libglade-config --libs gnome)
+gstplay_CFLAGS = $(shell gnome-config --cflags gnomeui) $(shell libglade-config --cflags gnome) \
+                $(shell gstreamer-config --cflags ) 
+gstplay_LDFLAGS = $(shell gnome-config --libs gnomeui) $(shell libglade-config --libs gnome) \
+                $(shell gstreamer-config --libs ) 
 
 if HAVE_LIBXV
 xvlibs=-lXv
index 034e9af..2b22bbf 100644 (file)
@@ -17,12 +17,14 @@ void avi_new_pad_created(GstElement *parse,GstPad *pad,GstElement *pipeline)
   //if (0) {
   if (strncmp(gst_pad_get_name(pad), "audio_", 6) == 0) {
 
+    gst_bin_add(GST_BIN(pipeline), audio_render_queue);
     gst_pad_connect(pad,
                     gst_element_get_pad(audio_render_queue,"sink"));
 
   } else if (strncmp(gst_pad_get_name(pad), "video_", 6) == 0) {
   //} else if (0) {
 
+    gst_bin_add(GST_BIN(pipeline), video_render_queue);
     gst_pad_connect(pad,
                     gst_element_get_pad(video_render_queue,"sink"));
   }
index 3d3ba93..8ffd8ff 100644 (file)
@@ -27,7 +27,7 @@ gboolean idle_func(gpointer data);
 GstElement *show, *video_render_queue;
 GstElement *audio_play, *audio_render_queue;
 GstElement *src;
-GstPipeline *pipeline;
+GstElement *pipeline;
 GstElement *parse = NULL;
 GstElement *typefind;
 GstElement *video_render_thread;
@@ -254,6 +254,9 @@ static void have_type(GstSink *sink) {
   }
   gtk_object_set(GTK_OBJECT(src),"offset",0,NULL);
 
+  gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_thread));
+  gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_thread));
+
   g_print("setting to READY state\n");
   gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY);
   g_print("setting to PLAYING state\n");
@@ -286,7 +289,6 @@ gint start_from_file(guchar *filename)
 
   g_print("setting to READY state\n");
 
-  gst_bin_create_plan(GST_BIN(pipeline));
   gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY);
 
   state = GSTPLAY_STOPPED;
@@ -363,17 +365,13 @@ main (int argc, char *argv[])
   gnome_dock_set_client_area(GNOME_DOCK(glade_xml_get_widget(xml, "dock1")),
                  gst_util_get_widget_arg(GTK_OBJECT(show),"widget"));
   gst_bin_add(GST_BIN(video_render_thread),GST_ELEMENT(show));
-  gst_element_add_ghost_pad(GST_ELEMENT(video_render_thread),
-                               gst_element_get_pad(show,"sink"));
 
   glade_xml_signal_autoconnect(xml);
 
   video_render_queue = gst_elementfactory_make("queue","video_render_queue");
   gtk_object_set(GTK_OBJECT(video_render_queue),"max_level",BUFFER,NULL);
-  gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_queue));
-  gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_thread));
   gst_pad_connect(gst_element_get_pad(video_render_queue,"src"),
-                  gst_element_get_pad(video_render_thread,"sink"));
+                  gst_element_get_pad(show,"sink"));
   gtk_object_set(GTK_OBJECT(video_render_thread),"create_thread",TRUE,NULL);
 
 
@@ -381,15 +379,11 @@ main (int argc, char *argv[])
   g_return_val_if_fail(audio_render_thread != NULL, -1);
   audio_play = gst_elementfactory_make("audiosink","play_audio");
   gst_bin_add(GST_BIN(audio_render_thread),GST_ELEMENT(audio_play));
-  gst_element_add_ghost_pad(GST_ELEMENT(audio_render_thread),
-                               gst_element_get_pad(audio_play,"sink"));
 
   audio_render_queue = gst_elementfactory_make("queue","audio_render_queue");
   gtk_object_set(GTK_OBJECT(audio_render_queue),"max_level",BUFFER,NULL);
-  gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_queue));
-  gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_thread));
   gst_pad_connect(gst_element_get_pad(audio_render_queue,"src"),
-                  gst_element_get_pad(audio_render_thread,"sink"));
+                  gst_element_get_pad(audio_play,"sink"));
   gtk_object_set(GTK_OBJECT(audio_render_thread),"create_thread",TRUE,NULL);
 
   if (argc > 1) {
index 4febea6..4c04b5c 100644 (file)
@@ -56,6 +56,7 @@ void mpeg1_setup_audio_thread(GstPad *pad, GstElement *audio_render_queue, GstEl
   g_return_if_fail(audio_thread != NULL);
   gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio));
   gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode));
+  gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(audio_render_queue));
 
   // set up pad connections
   gst_element_add_ghost_pad(GST_ELEMENT(audio_thread),
@@ -100,6 +101,7 @@ void mpeg1_setup_video_thread(GstPad *pad, GstElement *video_render_queue, GstEl
   g_return_if_fail(video_thread != NULL);
   gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video));
   gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video));
+  gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(video_render_queue));
 
   // set up pad connections
   gst_element_add_ghost_pad(GST_ELEMENT(video_thread),
index 990f318..aefdd4a 100644 (file)
@@ -66,6 +66,7 @@ void mpeg2_new_pad_created(GstElement *parse,GstPad *pad,GstElement *pipeline)
   g_return_if_fail(audio_thread != NULL);
   gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio));
   gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode));
+  gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(audio_render_queue));
 
   // set up pad connections
   gst_element_add_ghost_pad(GST_ELEMENT(audio_thread),
@@ -116,6 +117,7 @@ void mpeg2_setup_video_thread(GstPad *pad, GstElement *show, GstElement *pipelin
   gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video));
   gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video));
   gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(merge_subtitles));
+  gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(video_render_queue));
   gst_bin_use_cothreads(GST_BIN(video_thread), FALSE);
 
   // set up pad connections
index ad9c048..6811a7f 100644 (file)
@@ -47,14 +47,14 @@ while test $# -gt 0; do
       elif test @includedir@ != /usr/include ; then
         includes=-I@includedir@ 
       fi
-      echo $includes `gtk-config --cflags`
+      echo $includes `gtk-config --cflags gtk gthread`
       ;;
     --libs)
       if test $prefix -ef @builddir@ ; then
-        echo @builddir@/libgst.la `gtk-config --libs`
+        echo @builddir@/libgst.la `gtk-config --libs gtk gthread`
       else
         libdirs=-L@libdir@
-        echo $libdirs -lgst `gtk-config --libs`
+        echo $libdirs -lgst `gtk-config --libs gtk gthread` 
       fi
       ;;
     *)
index 420b551..bf57b27 100644 (file)
@@ -42,7 +42,7 @@ noinst_HEADERS =              \
        gsttypefind.h           \
        gstsinesrc.h
 
-CFLAGS += -O2 -Wall
+CFLAGS += -O2 -Wall 
 
 libgstelements_la_LIBADD = $(GLIB_LIBS) $(GTK_LIBS) $(GHTTP_LIBS)
 libgstelements_la_LDFLAGS = -version-info $(STREAMER_CURRENT):$(STREAMER_REVISION):$(STREAMER_AGE)
index 5352ded..b9af36d 100644 (file)
@@ -27,6 +27,7 @@
 
 #include <gstqueue.h>
 
+#include <gst/gstarch.h>
 
 GstElementDetails gst_queue_details = {
   "Queue",
@@ -105,6 +106,7 @@ static void gst_queue_class_init(GstQueueClass *klass) {
 static void gst_queue_init(GstQueue *queue) {
   queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
   gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
+
   gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
   queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
   gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
@@ -150,9 +152,10 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
 
   /* we have to lock the queue since we span threads */
 
-  DEBUG("queue: %s adding buffer %p\n", name, buf);
+  DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self());
   
   GST_LOCK(queue);
+  DEBUG("queue: have queue lock\n");
 
   if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
     g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); 
@@ -164,18 +167,17 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
 
   DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
 
-  if (queue->level_buffers >= queue->max_buffers) {
+  g_mutex_lock(queue->fulllock);
+  while (queue->level_buffers >= queue->max_buffers) {
     DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
-    while (queue->level_buffers >= queue->max_buffers) {
-      GST_UNLOCK(queue);
-      g_mutex_lock(queue->fulllock);
-      STATUS("%s: O\n");
-      g_cond_wait(queue->fullcond,queue->fulllock);
-      g_mutex_unlock(queue->fulllock);
-      GST_LOCK(queue);
-    }
+    STATUS("%s: O\n");
+    GST_UNLOCK(queue);
+    g_cond_wait(queue->fullcond,queue->fulllock);
+    GST_LOCK(queue);
+    STATUS("%s: O+\n");
     DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
   }
+  g_mutex_unlock(queue->fulllock);
   
 
   /* put the buffer on the head of the list */
@@ -192,18 +194,19 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
   STATUS("%s: +\n");
 
   /* if we were empty, but aren't any more, signal a condition */
-  tosignal = (queue->level_buffers <= 0);
+  tosignal = (queue->level_buffers >= 0);
   queue->level_buffers++;
 
   /* we can unlock now */
-  DEBUG("queue: %s chain %d end\n", name, queue->level_buffers);
+  DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
   GST_UNLOCK(queue);
 
   if (tosignal) {
     g_mutex_lock(queue->emptylock);
+    STATUS("%s: >\n");
     g_cond_signal(queue->emptycond);
+    STATUS("%s: >>\n");
     g_mutex_unlock(queue->emptylock);
-    //g_print(">");
   }
 }
 
@@ -216,17 +219,20 @@ void gst_queue_push(GstConnection *connection) {
   
   name = gst_element_get_name(GST_ELEMENT(queue));
 
-  DEBUG("queue: %s push %d\n", name, queue->level_buffers);
+  DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
   /* have to lock for thread-safety */
+  DEBUG("queue: try have queue lock\n");
   GST_LOCK(queue);
+  DEBUG("queue: have queue lock\n");
 
   while (!queue->level_buffers) {
+    STATUS("%s: U released lock\n");
     GST_UNLOCK(queue);
     g_mutex_lock(queue->emptylock);
-    STATUS("%s: U\n");
     g_cond_wait(queue->emptycond,queue->emptylock);
     g_mutex_unlock(queue->emptylock);
     GST_LOCK(queue);
+    STATUS("%s: U- getting lock\n");
   }
 
   front = queue->queue;
@@ -240,13 +246,15 @@ void gst_queue_push(GstConnection *connection) {
 
   if (tosignal) {
     g_mutex_lock(queue->fulllock);
+    STATUS("%s: < \n");
     g_cond_signal(queue->fullcond);
+    STATUS("%s: << \n");
     g_mutex_unlock(queue->fulllock);
   }
 
-  DEBUG("queue: %s pushing %d %p\n", name, queue->level_buffers, buf);
+  //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf);
   gst_pad_push(queue->srcpad,buf);
-  DEBUG("queue: %s pushing %d done\n", name, queue->level_buffers);
+  //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers);
 
   /* unlock now */
 }
index 0e27bdf..144027a 100644 (file)
@@ -158,7 +158,7 @@ void mp2tomp1(GstElement *parser,GstPad *pad, GstElement *pipeline) {
     gtk_object_set(GTK_OBJECT(smooth),"active",FALSE,NULL);
     encode = gst_elementfactory_make("mpeg2enc","encode");
     g_return_if_fail(encode != NULL);
-    gtk_object_set(GTK_OBJECT(encode),"frames_per_second",25.0,NULL);
+    gtk_object_set(GTK_OBJECT(encode),"frames_per_second",29.97,NULL);
     //encode = gst_elementfactory_make("mpeg1encoder","encode");
     //gtk_object_set(GTK_OBJECT(show),"width",640, "height", 480,NULL);
 
index 2a3a486..3527cc0 100644 (file)
@@ -50,9 +50,6 @@ int main(int argc,char *argv[])
   gst_pad_connect(gst_element_get_pad(decoder,"src"),
                   gst_element_get_pad(audiosink,"sink"));
 
-  /* find out how to handle this bin */
-  gst_bin_create_plan(GST_BIN(bin));
-
   /* make it ready */
   gst_element_set_state(bin, GST_STATE_READY);
   /* start playing */
index f687ef6..fab7b49 100644 (file)
@@ -46,9 +46,6 @@ int main(int argc,char *argv[])
     exit(-1);
   }
 
-  /* find out how to handle this bin */
-  gst_bin_create_plan(GST_BIN(pipeline));
-
   /* make it ready */
   gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
   /* start playing */
diff --git a/tests/old/examples/queue/.gitignore b/tests/old/examples/queue/.gitignore
new file mode 100644 (file)
index 0000000..b7dd527
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+queue
diff --git a/tests/old/examples/queue/queue.c b/tests/old/examples/queue/queue.c
new file mode 100644 (file)
index 0000000..9dc42d1
--- /dev/null
@@ -0,0 +1,84 @@
+#include <gst/gst.h>
+
+gboolean playing;
+
+/* eos will be called when the src element has an end of stream */
+void eos(GstSrc *src, gpointer data) 
+{
+  g_print("have eos, quitting\n");
+
+  playing = FALSE;
+}
+
+int main(int argc,char *argv[]) 
+{
+  GstElement *disksrc, *audiosink, *parse, *decode, *queue;
+  GstElement *bin;
+  GstElement *thread;
+
+  if (argc != 2) {
+    g_print("usage: %s <filename>\n", argv[0]);
+    exit(-1);
+  }
+
+  gst_init(&argc,&argv);
+
+  /* create a new thread to hold the elements */
+  thread = gst_thread_new("thread");
+  g_assert(thread != NULL);
+
+  /* create a new bin to hold the elements */
+  bin = gst_bin_new("bin");
+  g_assert(bin != NULL);
+
+  /* create a disk reader */
+  disksrc = gst_elementfactory_make("disksrc", "disk_source");
+  g_assert(disksrc != NULL);
+  gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
+  gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
+                     GTK_SIGNAL_FUNC(eos), thread);
+
+  parse = gst_elementfactory_make("mp3parse", "parse");
+  decode = gst_elementfactory_make("mpg123", "decode");
+
+  queue = gst_elementfactory_make("queue", "queue");
+
+  /* and an audio sink */
+  audiosink = gst_elementfactory_make("audiosink", "play_audio");
+  g_assert(audiosink != NULL);
+
+  /* add objects to the main pipeline */
+  gst_bin_add(GST_BIN(bin), disksrc);
+  gst_bin_add(GST_BIN(bin), parse);
+  gst_bin_add(GST_BIN(bin), decode);
+  gst_bin_add(GST_BIN(bin), queue);
+
+  gst_bin_add(GST_BIN(thread), audiosink);
+
+  gst_bin_add(GST_BIN(bin), thread);
+  
+  gst_pad_connect(gst_element_get_pad(disksrc,"src"),
+                  gst_element_get_pad(parse,"sink"));
+  gst_pad_connect(gst_element_get_pad(parse,"src"),
+                  gst_element_get_pad(decode,"sink"));
+  gst_pad_connect(gst_element_get_pad(decode,"src"),
+                  gst_element_get_pad(queue,"sink"));
+  gst_pad_connect(gst_element_get_pad(queue,"src"),
+                  gst_element_get_pad(audiosink,"sink"));
+
+  /* make it ready */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
+  /* start playing */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
+
+  playing = TRUE;
+
+  while (playing) {
+    gst_bin_iterate(GST_BIN(bin));
+  }
+
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
+
+  exit(0);
+}
+
diff --git a/tests/old/examples/queue2/.gitignore b/tests/old/examples/queue2/.gitignore
new file mode 100644 (file)
index 0000000..ca0136a
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+queue2
diff --git a/tests/old/examples/queue2/queue2.c b/tests/old/examples/queue2/queue2.c
new file mode 100644 (file)
index 0000000..845f491
--- /dev/null
@@ -0,0 +1,81 @@
+#include <gst/gst.h>
+
+gboolean playing;
+
+/* eos will be called when the src element has an end of stream */
+void eos(GstSrc *src, gpointer data) 
+{
+  g_print("have eos, quitting\n");
+
+  playing = FALSE;
+}
+
+int main(int argc,char *argv[]) 
+{
+  GstElement *disksrc, *audiosink, *queue;
+  GstElement *pipeline;
+  GstElement *thread;
+
+  if (argc != 2) {
+    g_print("usage: %s <filename>\n", argv[0]);
+    exit(-1);
+  }
+
+  gst_init(&argc,&argv);
+
+  /* create a new thread to hold the elements */
+  thread = gst_thread_new("thread");
+  g_assert(thread != NULL);
+
+  /* create a new bin to hold the elements */
+  pipeline = gst_pipeline_new("pipeline");
+  g_assert(pipeline != NULL);
+
+  /* create a disk reader */
+  disksrc = gst_elementfactory_make("disksrc", "disk_source");
+  g_assert(disksrc != NULL);
+  gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
+  gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
+                     GTK_SIGNAL_FUNC(eos), thread);
+
+  queue = gst_elementfactory_make("queue", "queue");
+
+  /* and an audio sink */
+  audiosink = gst_elementfactory_make("audiosink", "play_audio");
+  g_assert(audiosink != NULL);
+
+  /* add objects to the main pipeline */
+  gst_pipeline_add_src(GST_PIPELINE(pipeline), disksrc);
+  gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
+
+  gst_bin_add(GST_BIN(thread), audiosink);
+  
+  gst_pad_connect(gst_element_get_pad(queue,"src"),
+                  gst_element_get_pad(audiosink,"sink"));
+
+  gst_pad_set_type_id(gst_element_get_pad(queue, "sink"),
+       gst_pad_get_type_id(gst_element_get_pad(audiosink, "sink")));
+
+  if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
+    g_print("cannot autoplug pipeline\n");
+    exit(-1);
+  }
+
+  gst_bin_add(GST_BIN(pipeline), thread);
+
+  /* make it ready */
+  gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
+  /* start playing */
+  gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);
+
+  playing = TRUE;
+
+  while (playing) {
+    gst_bin_iterate(GST_BIN(pipeline));
+  }
+
+  gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);
+
+  exit(0);
+}
+
diff --git a/tests/old/examples/queue3/.gitignore b/tests/old/examples/queue3/.gitignore
new file mode 100644 (file)
index 0000000..b89de20
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+queue3
diff --git a/tests/old/examples/queue3/queue3.c b/tests/old/examples/queue3/queue3.c
new file mode 100644 (file)
index 0000000..fa79a78
--- /dev/null
@@ -0,0 +1,85 @@
+#include <gst/gst.h>
+
+gboolean playing;
+
+/* eos will be called when the src element has an end of stream */
+void eos(GstSrc *src, gpointer data) 
+{
+  g_print("have eos, quitting\n");
+
+  playing = FALSE;
+}
+
+int main(int argc,char *argv[]) 
+{
+  GstElement *disksrc, *audiosink, *queue, *parse, *decode;
+  GstElement *bin;
+  GstElement *thread;
+
+  if (argc != 2) {
+    g_print("usage: %s <filename>\n", argv[0]);
+    exit(-1);
+  }
+
+  gst_init(&argc,&argv);
+
+  /* create a new thread to hold the elements */
+  thread = gst_thread_new("thread");
+  g_assert(thread != NULL);
+
+  /* create a new bin to hold the elements */
+  bin = gst_bin_new("bin");
+  g_assert(bin != NULL);
+
+  /* create a disk reader */
+  disksrc = gst_elementfactory_make("disksrc", "disk_source");
+  g_assert(disksrc != NULL);
+  gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
+  gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
+                     GTK_SIGNAL_FUNC(eos), thread);
+
+  queue = gst_elementfactory_make("queue", "queue");
+
+  /* and an audio sink */
+  audiosink = gst_elementfactory_make("audiosink", "play_audio");
+  g_assert(audiosink != NULL);
+
+  parse = gst_elementfactory_make("mp3parse", "parse");
+  decode = gst_elementfactory_make("mpg123", "decode");
+
+  /* add objects to the main bin */
+  gst_bin_add(GST_BIN(bin), disksrc);
+  gst_bin_add(GST_BIN(bin), queue);
+
+  gst_bin_add(GST_BIN(thread), parse);
+  gst_bin_add(GST_BIN(thread), decode);
+  gst_bin_add(GST_BIN(thread), audiosink);
+  
+  gst_pad_connect(gst_element_get_pad(disksrc,"src"),
+                  gst_element_get_pad(queue,"sink"));
+
+  gst_pad_connect(gst_element_get_pad(queue,"src"),
+                  gst_element_get_pad(parse,"sink"));
+  gst_pad_connect(gst_element_get_pad(parse,"src"),
+                  gst_element_get_pad(decode,"sink"));
+  gst_pad_connect(gst_element_get_pad(decode,"src"),
+                  gst_element_get_pad(audiosink,"sink"));
+
+  gst_bin_add(GST_BIN(bin), thread);
+
+  /* make it ready */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
+  /* start playing */
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
+
+  playing = TRUE;
+
+  while (playing) {
+    gst_bin_iterate(GST_BIN(bin));
+  }
+
+  gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
+
+  exit(0);
+}
+
index 4d8073d..4cd7721 100644 (file)
@@ -53,9 +53,9 @@ int main(int argc,char *argv[])
     exit(-1);
   }
 
-  gst_bin_remove(GST_BIN(pipeline), disksrc);
+  //gst_bin_remove(GST_BIN(pipeline), disksrc);
 
-  gst_bin_add(GST_BIN(thread), disksrc);
+  //gst_bin_add(GST_BIN(thread), disksrc);
   gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline));
 
   /* make it ready */