From 42ec9085a27df682584e6c2eee111aef1a6c1e55 Mon Sep 17 00:00:00 2001 From: Erik Walthinsen Date: Sat, 27 Oct 2001 20:28:31 +0000 Subject: [PATCH] added taaz's threading patch, including queue events Original commit message from CVS: added taaz's threading patch, including queue events --- gst/elements/gstfakesrc.c | 2 + gst/elements/gststatistics.c | 15 +-- gst/gstpad.h | 2 +- gst/gstqueue.c | 206 +++++++++++++++++++++++---------------- gst/gstqueue.h | 9 +- gst/gstscheduler.c | 1 + gst/gstthread.c | 16 +-- plugins/elements/gstfakesrc.c | 2 + plugins/elements/gstqueue.c | 206 +++++++++++++++++++++++---------------- plugins/elements/gstqueue.h | 9 +- plugins/elements/gststatistics.c | 15 +-- 11 files changed, 286 insertions(+), 197 deletions(-) diff --git a/gst/elements/gstfakesrc.c b/gst/elements/gstfakesrc.c index bed56e4..8dece3e 100644 --- a/gst/elements/gstfakesrc.c +++ b/gst/elements/gstfakesrc.c @@ -679,8 +679,10 @@ gst_fakesrc_get(GstPad *pad) g_print("fakesrc: get ******* (%s:%s)> (%d bytes, %llu) \n", GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n"); g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0, buf, pad); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n"); return buf; } diff --git a/gst/elements/gststatistics.c b/gst/elements/gststatistics.c index 398c119..fbfdbd9 100644 --- a/gst/elements/gststatistics.c +++ b/gst/elements/gststatistics.c @@ -37,7 +37,6 @@ GstElementDetails gst_statistics_details = { /* Statistics signals and args */ enum { SIGNAL_UPDATE, - /* FILL ME */ LAST_SIGNAL }; @@ -66,7 +65,7 @@ static void gst_statistics_reset (GstStatistics *statistics); static void gst_statistics_print (GstStatistics *statistics); static GstElementClass *parent_class = NULL; -static guint gst_statistics_signals[LAST_SIGNAL] = { 0 }; +static guint gst_statistics_signals[LAST_SIGNAL] = { 0, }; static stats zero_stats = { 0, }; @@ -131,7 +130,7 @@ gst_statistics_class_init (GstStatisticsClass *klass) gst_statistics_signals[SIGNAL_UPDATE] = g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL, - g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0); + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property); @@ -283,10 +282,12 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf) if (GST_IS_EVENT(buf)) { GstEvent *event = GST_EVENT (buf); - gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED); statistics->stats.events += 1; - if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) { - update = TRUE; + if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) { + gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED); + if (statistics->update_on_eos) { + update = TRUE; + } } if (statistics->update_freq.events) { statistics->update_count.events += 1; @@ -317,7 +318,9 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf) if (update) { if (statistics->update) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n"); g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n"); } if (!statistics->silent) { gst_statistics_print(statistics); diff --git a/gst/gstpad.h b/gst/gstpad.h index bfeddb6..be75ca9 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -257,7 +257,7 @@ struct _GstGhostPadClass { #define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad) /* Generic */ -#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad)) +#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad)) #define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad)) #define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad)) #define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad)) diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 08cc5f8..6a25133 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -81,6 +81,7 @@ static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static GstBuffer * gst_queue_get (GstPad *pad); static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +static void gst_queue_locked_flush (GstQueue *queue); static void gst_queue_flush (GstQueue *queue); static GstElementStateReturn gst_queue_change_state (GstElement *element); @@ -180,9 +181,12 @@ gst_queue_init (GstQueue *queue) queue->size_bytes = 100 * 1024; // 100KB queue->size_time = 1000000000LL; // 1sec - queue->emptycond = g_cond_new (); - queue->fullcond = g_cond_new (); - GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n"); + queue->qlock = g_mutex_new (); + queue->reader = FALSE; + queue->writer = FALSE; + queue->not_empty = g_cond_new (); + queue->not_full = g_cond_new (); + GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); } static GstBufferPool* @@ -215,41 +219,18 @@ gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data) return gst_pad_negotiate_proxy (pad, queue->srcpad, caps); } -static gboolean -gst_queue_handle_event (GstPad *pad) -{ - GstQueue *queue; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue)); - - GST_LOCK (queue); - GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue), - queue->level_buffers); - - GST_FLAG_SET (pad, GST_PAD_EOS); - - g_cond_signal (queue->emptycond); - - GST_UNLOCK (queue); - - return TRUE; -} - static void gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) { - GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data); - + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data); gst_buffer_unref (GST_BUFFER (data)); } static void -gst_queue_flush (GstQueue *queue) +gst_queue_locked_flush (GstQueue *queue) { g_slist_foreach (queue->queue, gst_queue_cleanup_buffers, - (char *) GST_ELEMENT_NAME (queue)); + (gpointer) queue); g_slist_free (queue->queue); queue->queue = NULL; @@ -258,39 +239,62 @@ gst_queue_flush (GstQueue *queue) } static void +gst_queue_flush (GstQueue *queue) +{ + g_mutex_lock (queue->qlock); + gst_queue_locked_flush (queue); + g_mutex_unlock (queue->qlock); +} + +static void gst_queue_chain (GstPad *pad, GstBuffer *buf) { GstQueue *queue; - const guchar *name; + gboolean reader; g_return_if_fail (pad != NULL); g_return_if_fail (GST_IS_PAD (pad)); g_return_if_fail (buf != NULL); queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - name = GST_ELEMENT_NAME (queue); - /* we have to lock the queue since we span threads */ + reader = FALSE; -// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name); - GST_LOCK (queue); + /* we have to lock the queue since we span threads */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n"); - gst_queue_flush (queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); + g_mutex_lock (queue->qlock); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ()); + + if (GST_IS_EVENT(buf)) { + GstEvent *event = GST_EVENT(buf); + switch (GST_EVENT_TYPE(event)) { + case GST_EVENT_FLUSH: + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n"); + gst_queue_locked_flush (queue); + break; + default: + break; + } } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf)); - if (queue->level_buffers >= queue->size_buffers) { + if (queue->level_buffers == queue->size_buffers) { // if this is a leaky queue... if (queue->leaky) { + // FIXME don't want to leak events! // if we leak on the upstream side, drop the current buffer if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); + if (GST_IS_EVENT (buf)) + fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", + GST_ELEMENT_NAME(GST_ELEMENT(queue)), + GST_EVENT_TYPE(GST_EVENT(buf))); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); gst_buffer_unref(buf); // now we have to clean up and exit right away - GST_UNLOCK (queue); + g_mutex_unlock (queue->qlock); return; } // otherwise we have to push a buffer off the other end @@ -300,6 +304,10 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); front = queue->queue; leakbuf = (GstBuffer *)(front->data); + if (GST_IS_EVENT (leakbuf)) + fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", + GST_ELEMENT_NAME(GST_ELEMENT(queue)), + GST_EVENT_TYPE(GST_EVENT(leakbuf))); queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); gst_buffer_unref(leakbuf); @@ -308,7 +316,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) } } - while (queue->level_buffers >= queue->size_buffers) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", + queue->level_buffers, queue->size_buffers); + while (queue->level_buffers == queue->size_buffers) { // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING || @@ -316,36 +326,45 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != GST_STATE_VOID_PENDING) { - GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); - GST_UNLOCK(queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); + g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers); - g_cond_signal (queue->emptycond); - g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); + if (queue->writer) + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n"); + queue->writer = TRUE; + g_cond_wait (queue->not_full, queue->qlock); + queue->writer = FALSE; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", + queue->level_buffers, queue->size_buffers); } /* put the buffer on the tail of the list */ queue->queue = g_slist_append (queue->queue, buf); queue->level_buffers++; queue->level_bytes += GST_BUFFER_SIZE(buf); -// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad)); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n", + GST_DEBUG_PAD_NAME(pad), + queue->level_buffers, queue->size_buffers); + + /* reader waiting on an empty queue */ + reader = queue->reader; - /* if we were empty, but aren't any more, signal a condition */ - if (queue->level_buffers == 1) + g_mutex_unlock (queue->qlock); + + if (reader) { - GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name); - g_cond_signal (queue->emptycond); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n"); + g_cond_signal (queue->not_empty); } - - GST_UNLOCK (queue); } static GstBuffer * @@ -354,7 +373,7 @@ gst_queue_get (GstPad *pad) GstQueue *queue; GstBuffer *buf = NULL; GSList *front; - const guchar *name; + gboolean writer; g_assert(pad != NULL); g_assert(GST_IS_PAD(pad)); @@ -362,22 +381,16 @@ gst_queue_get (GstPad *pad) g_return_val_if_fail (GST_IS_PAD (pad), NULL); queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - name = GST_ELEMENT_NAME (queue); + + writer = FALSE; /* have to lock for thread-safety */ - GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name); - GST_LOCK (queue); - GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); - GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name); - - while (!queue->level_buffers) { - if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) { - GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name); - GST_UNLOCK(queue); - // this return NULL shouldn't hurt anything... - return NULL; - } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); + g_mutex_lock (queue->qlock); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); + while (queue->level_buffers == 0) { // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING || @@ -385,37 +398,60 @@ gst_queue_get (GstPad *pad) GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != GST_STATE_VOID_PENDING) { - GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); - GST_UNLOCK(queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); + g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); } - g_cond_signal (queue->fullcond); - g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); + if (queue->reader) + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n"); + queue->reader = TRUE; + g_cond_wait (queue->not_empty, queue->qlock); + queue->reader = FALSE; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n"); } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); front = queue->queue; buf = (GstBuffer *)(front->data); - GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf); queue->queue = g_slist_remove_link (queue->queue, front); g_slist_free (front); -// if (queue->level_buffers < queue->size_buffers) - if (queue->level_buffers == queue->size_buffers) - { - GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name); - g_cond_signal (queue->fullcond); - } - queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(buf); - GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n", + GST_DEBUG_PAD_NAME(pad), + queue->level_buffers, queue->size_buffers); + + /* writer waiting on a full queue */ + writer = queue->writer; + + g_mutex_unlock (queue->qlock); - GST_UNLOCK(queue); + if (writer) + { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n"); + g_cond_signal (queue->not_full); + } + + // FIXME where should this be? locked? + if (GST_IS_EVENT(buf)) { + GstEvent *event = GST_EVENT(buf); + switch (GST_EVENT_TYPE(event)) { + case GST_EVENT_EOS: + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n"); + gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED); + break; + default: + break; + } + } return buf; } diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 02b73f2..fb091e1 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -75,9 +75,12 @@ struct _GstQueue { gint leaky; /* whether the queue is leaky, and if so at which end */ -// GMutex *lock; (optimization?) - GCond *emptycond; - GCond *fullcond; + GMutex *qlock; /* lock for queue (vs object lock) */ + /* we are single reader and single writer queue */ + gboolean reader; /* reader waiting on empty queue */ + gboolean writer; /* writer waiting on full queue */ + GCond *not_empty; /* signals buffers now available for reading */ + GCond *not_full; /* signals space now available for writing */ GTimeVal *timeval; /* the timeout for the queue locking */ }; diff --git a/gst/gstscheduler.c b/gst/gstscheduler.c index 1c10f13..4b69e11 100644 --- a/gst/gstscheduler.c +++ b/gst/gstscheduler.c @@ -1490,6 +1490,7 @@ GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_ } else { GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!"); +gst_schedule_show(sched); //eos = TRUE; } } else { diff --git a/gst/gstthread.c b/gst/gstthread.c index 608e1f2..65a3793 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -382,10 +382,10 @@ gst_thread_change_state (GstElement *element) // //FIXME also make this more efficient by keeping list of managed queues THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e)); - GST_LOCK(e); - g_cond_signal((GST_QUEUE(e)->emptycond)); - g_cond_signal((GST_QUEUE(e)->fullcond)); - GST_UNLOCK(e); + //GST_LOCK(e); + g_cond_signal((GST_QUEUE(e)->not_empty)); + g_cond_signal((GST_QUEUE(e)->not_full)); + //GST_UNLOCK(e); } else { @@ -417,10 +417,10 @@ gst_thread_change_state (GstElement *element) if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread)) { THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e)); - GST_LOCK(peerelement); - g_cond_signal(GST_QUEUE(peerelement)->emptycond); - g_cond_signal(GST_QUEUE(peerelement)->fullcond); - GST_UNLOCK(peerelement); + //GST_LOCK(peerelement); + g_cond_signal(GST_QUEUE(peerelement)->not_empty); + g_cond_signal(GST_QUEUE(peerelement)->not_full); + //GST_UNLOCK(peerelement); } } } diff --git a/plugins/elements/gstfakesrc.c b/plugins/elements/gstfakesrc.c index bed56e4..8dece3e 100644 --- a/plugins/elements/gstfakesrc.c +++ b/plugins/elements/gstfakesrc.c @@ -679,8 +679,10 @@ gst_fakesrc_get(GstPad *pad) g_print("fakesrc: get ******* (%s:%s)> (%d bytes, %llu) \n", GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n"); g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0, buf, pad); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n"); return buf; } diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 08cc5f8..6a25133 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -81,6 +81,7 @@ static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static GstBuffer * gst_queue_get (GstPad *pad); static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +static void gst_queue_locked_flush (GstQueue *queue); static void gst_queue_flush (GstQueue *queue); static GstElementStateReturn gst_queue_change_state (GstElement *element); @@ -180,9 +181,12 @@ gst_queue_init (GstQueue *queue) queue->size_bytes = 100 * 1024; // 100KB queue->size_time = 1000000000LL; // 1sec - queue->emptycond = g_cond_new (); - queue->fullcond = g_cond_new (); - GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n"); + queue->qlock = g_mutex_new (); + queue->reader = FALSE; + queue->writer = FALSE; + queue->not_empty = g_cond_new (); + queue->not_full = g_cond_new (); + GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); } static GstBufferPool* @@ -215,41 +219,18 @@ gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data) return gst_pad_negotiate_proxy (pad, queue->srcpad, caps); } -static gboolean -gst_queue_handle_event (GstPad *pad) -{ - GstQueue *queue; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue)); - - GST_LOCK (queue); - GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue), - queue->level_buffers); - - GST_FLAG_SET (pad, GST_PAD_EOS); - - g_cond_signal (queue->emptycond); - - GST_UNLOCK (queue); - - return TRUE; -} - static void gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) { - GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data); - + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data); gst_buffer_unref (GST_BUFFER (data)); } static void -gst_queue_flush (GstQueue *queue) +gst_queue_locked_flush (GstQueue *queue) { g_slist_foreach (queue->queue, gst_queue_cleanup_buffers, - (char *) GST_ELEMENT_NAME (queue)); + (gpointer) queue); g_slist_free (queue->queue); queue->queue = NULL; @@ -258,39 +239,62 @@ gst_queue_flush (GstQueue *queue) } static void +gst_queue_flush (GstQueue *queue) +{ + g_mutex_lock (queue->qlock); + gst_queue_locked_flush (queue); + g_mutex_unlock (queue->qlock); +} + +static void gst_queue_chain (GstPad *pad, GstBuffer *buf) { GstQueue *queue; - const guchar *name; + gboolean reader; g_return_if_fail (pad != NULL); g_return_if_fail (GST_IS_PAD (pad)); g_return_if_fail (buf != NULL); queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - name = GST_ELEMENT_NAME (queue); - /* we have to lock the queue since we span threads */ + reader = FALSE; -// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name); - GST_LOCK (queue); + /* we have to lock the queue since we span threads */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n"); - gst_queue_flush (queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); + g_mutex_lock (queue->qlock); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ()); + + if (GST_IS_EVENT(buf)) { + GstEvent *event = GST_EVENT(buf); + switch (GST_EVENT_TYPE(event)) { + case GST_EVENT_FLUSH: + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n"); + gst_queue_locked_flush (queue); + break; + default: + break; + } } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf)); - if (queue->level_buffers >= queue->size_buffers) { + if (queue->level_buffers == queue->size_buffers) { // if this is a leaky queue... if (queue->leaky) { + // FIXME don't want to leak events! // if we leak on the upstream side, drop the current buffer if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); + if (GST_IS_EVENT (buf)) + fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", + GST_ELEMENT_NAME(GST_ELEMENT(queue)), + GST_EVENT_TYPE(GST_EVENT(buf))); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); gst_buffer_unref(buf); // now we have to clean up and exit right away - GST_UNLOCK (queue); + g_mutex_unlock (queue->qlock); return; } // otherwise we have to push a buffer off the other end @@ -300,6 +304,10 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); front = queue->queue; leakbuf = (GstBuffer *)(front->data); + if (GST_IS_EVENT (leakbuf)) + fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", + GST_ELEMENT_NAME(GST_ELEMENT(queue)), + GST_EVENT_TYPE(GST_EVENT(leakbuf))); queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); gst_buffer_unref(leakbuf); @@ -308,7 +316,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) } } - while (queue->level_buffers >= queue->size_buffers) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", + queue->level_buffers, queue->size_buffers); + while (queue->level_buffers == queue->size_buffers) { // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING || @@ -316,36 +326,45 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != GST_STATE_VOID_PENDING) { - GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); - GST_UNLOCK(queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); + g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers); - g_cond_signal (queue->emptycond); - g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); + if (queue->writer) + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n"); + queue->writer = TRUE; + g_cond_wait (queue->not_full, queue->qlock); + queue->writer = FALSE; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", + queue->level_buffers, queue->size_buffers); } /* put the buffer on the tail of the list */ queue->queue = g_slist_append (queue->queue, buf); queue->level_buffers++; queue->level_bytes += GST_BUFFER_SIZE(buf); -// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad)); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n", + GST_DEBUG_PAD_NAME(pad), + queue->level_buffers, queue->size_buffers); + + /* reader waiting on an empty queue */ + reader = queue->reader; - /* if we were empty, but aren't any more, signal a condition */ - if (queue->level_buffers == 1) + g_mutex_unlock (queue->qlock); + + if (reader) { - GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name); - g_cond_signal (queue->emptycond); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n"); + g_cond_signal (queue->not_empty); } - - GST_UNLOCK (queue); } static GstBuffer * @@ -354,7 +373,7 @@ gst_queue_get (GstPad *pad) GstQueue *queue; GstBuffer *buf = NULL; GSList *front; - const guchar *name; + gboolean writer; g_assert(pad != NULL); g_assert(GST_IS_PAD(pad)); @@ -362,22 +381,16 @@ gst_queue_get (GstPad *pad) g_return_val_if_fail (GST_IS_PAD (pad), NULL); queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - name = GST_ELEMENT_NAME (queue); + + writer = FALSE; /* have to lock for thread-safety */ - GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name); - GST_LOCK (queue); - GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); - GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name); - - while (!queue->level_buffers) { - if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) { - GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name); - GST_UNLOCK(queue); - // this return NULL shouldn't hurt anything... - return NULL; - } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); + g_mutex_lock (queue->qlock); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); + while (queue->level_buffers == 0) { // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING || @@ -385,37 +398,60 @@ gst_queue_get (GstPad *pad) GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != GST_STATE_VOID_PENDING) { - GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n"); if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING) - GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); - GST_UNLOCK(queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n"); + g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); } - g_cond_signal (queue->fullcond); - g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); + if (queue->reader) + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n"); + queue->reader = TRUE; + g_cond_wait (queue->not_empty, queue->qlock); + queue->reader = FALSE; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n"); } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); front = queue->queue; buf = (GstBuffer *)(front->data); - GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf); queue->queue = g_slist_remove_link (queue->queue, front); g_slist_free (front); -// if (queue->level_buffers < queue->size_buffers) - if (queue->level_buffers == queue->size_buffers) - { - GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name); - g_cond_signal (queue->fullcond); - } - queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(buf); - GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n", + GST_DEBUG_PAD_NAME(pad), + queue->level_buffers, queue->size_buffers); + + /* writer waiting on a full queue */ + writer = queue->writer; + + g_mutex_unlock (queue->qlock); - GST_UNLOCK(queue); + if (writer) + { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n"); + g_cond_signal (queue->not_full); + } + + // FIXME where should this be? locked? + if (GST_IS_EVENT(buf)) { + GstEvent *event = GST_EVENT(buf); + switch (GST_EVENT_TYPE(event)) { + case GST_EVENT_EOS: + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n"); + gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED); + break; + default: + break; + } + } return buf; } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 02b73f2..fb091e1 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -75,9 +75,12 @@ struct _GstQueue { gint leaky; /* whether the queue is leaky, and if so at which end */ -// GMutex *lock; (optimization?) - GCond *emptycond; - GCond *fullcond; + GMutex *qlock; /* lock for queue (vs object lock) */ + /* we are single reader and single writer queue */ + gboolean reader; /* reader waiting on empty queue */ + gboolean writer; /* writer waiting on full queue */ + GCond *not_empty; /* signals buffers now available for reading */ + GCond *not_full; /* signals space now available for writing */ GTimeVal *timeval; /* the timeout for the queue locking */ }; diff --git a/plugins/elements/gststatistics.c b/plugins/elements/gststatistics.c index 398c119..fbfdbd9 100644 --- a/plugins/elements/gststatistics.c +++ b/plugins/elements/gststatistics.c @@ -37,7 +37,6 @@ GstElementDetails gst_statistics_details = { /* Statistics signals and args */ enum { SIGNAL_UPDATE, - /* FILL ME */ LAST_SIGNAL }; @@ -66,7 +65,7 @@ static void gst_statistics_reset (GstStatistics *statistics); static void gst_statistics_print (GstStatistics *statistics); static GstElementClass *parent_class = NULL; -static guint gst_statistics_signals[LAST_SIGNAL] = { 0 }; +static guint gst_statistics_signals[LAST_SIGNAL] = { 0, }; static stats zero_stats = { 0, }; @@ -131,7 +130,7 @@ gst_statistics_class_init (GstStatisticsClass *klass) gst_statistics_signals[SIGNAL_UPDATE] = g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL, - g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0); + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property); @@ -283,10 +282,12 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf) if (GST_IS_EVENT(buf)) { GstEvent *event = GST_EVENT (buf); - gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED); statistics->stats.events += 1; - if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) { - update = TRUE; + if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) { + gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED); + if (statistics->update_on_eos) { + update = TRUE; + } } if (statistics->update_freq.events) { statistics->update_count.events += 1; @@ -317,7 +318,9 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf) if (update) { if (statistics->update) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n"); g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n"); } if (!statistics->silent) { gst_statistics_print(statistics); -- 2.7.4