g_print("fakesrc: get ******* (%s:%s)> (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n");
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n");
return buf;
}
/* Statistics signals and args */
enum {
SIGNAL_UPDATE,
- /* FILL ME */
LAST_SIGNAL
};
static void gst_statistics_print (GstStatistics *statistics);
static GstElementClass *parent_class = NULL;
-static guint gst_statistics_signals[LAST_SIGNAL] = { 0 };
+static guint gst_statistics_signals[LAST_SIGNAL] = { 0, };
static stats zero_stats = { 0, };
gst_statistics_signals[SIGNAL_UPDATE] =
g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL,
- g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0);
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property);
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT (buf);
- gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
statistics->stats.events += 1;
- if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) {
- update = TRUE;
+ if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
+ gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
+ if (statistics->update_on_eos) {
+ update = TRUE;
+ }
}
if (statistics->update_freq.events) {
statistics->update_count.events += 1;
if (update) {
if (statistics->update) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n");
g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n");
}
if (!statistics->silent) {
gst_statistics_print(statistics);
#define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad)
/* Generic */
-#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
+#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
#define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad))
static GstBuffer * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+static void gst_queue_locked_flush (GstQueue *queue);
static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
- queue->emptycond = g_cond_new ();
- queue->fullcond = g_cond_new ();
- GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
+ queue->qlock = g_mutex_new ();
+ queue->reader = FALSE;
+ queue->writer = FALSE;
+ queue->not_empty = g_cond_new ();
+ queue->not_full = g_cond_new ();
+ GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static GstBufferPool*
return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
}
-static gboolean
-gst_queue_handle_event (GstPad *pad)
-{
- GstQueue *queue;
-
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-
- GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
-
- GST_LOCK (queue);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
- queue->level_buffers);
-
- GST_FLAG_SET (pad, GST_PAD_EOS);
-
- g_cond_signal (queue->emptycond);
-
- GST_UNLOCK (queue);
-
- return TRUE;
-}
-
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
- GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
-
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
gst_buffer_unref (GST_BUFFER (data));
}
static void
-gst_queue_flush (GstQueue *queue)
+gst_queue_locked_flush (GstQueue *queue)
{
g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
- (char *) GST_ELEMENT_NAME (queue));
+ (gpointer) queue);
g_slist_free (queue->queue);
queue->queue = NULL;
queue->timeval = NULL;
}
+static void
+gst_queue_flush (GstQueue *queue)
+{
+ g_mutex_lock (queue->qlock);
+ gst_queue_locked_flush (queue);
+ g_mutex_unlock (queue->qlock);
+}
+
static void
gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
- const guchar *name;
+ gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- name = GST_ELEMENT_NAME (queue);
- /* we have to lock the queue since we span threads */
+ reader = FALSE;
-// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
- GST_LOCK (queue);
+ /* we have to lock the queue since we span threads */
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
- gst_queue_flush (queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+ g_mutex_lock (queue->qlock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
+
+ if (GST_IS_EVENT(buf)) {
+ GstEvent *event = GST_EVENT(buf);
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_FLUSH:
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
+ gst_queue_locked_flush (queue);
+ break;
+ default:
+ break;
+ }
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
- if (queue->level_buffers >= queue->size_buffers) {
+ if (queue->level_buffers == queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
+ // FIXME don't want to leak events!
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+ if (GST_IS_EVENT (buf))
+ fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+ GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+ GST_EVENT_TYPE(GST_EVENT(buf)));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
- GST_UNLOCK (queue);
+ g_mutex_unlock (queue->qlock);
return;
}
// otherwise we have to push a buffer off the other end
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
+ if (GST_IS_EVENT (leakbuf))
+ fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+ GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+ GST_EVENT_TYPE(GST_EVENT(leakbuf)));
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
}
}
- while (queue->level_buffers >= queue->size_buffers) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+ queue->level_buffers, queue->size_buffers);
+ while (queue->level_buffers == queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_VOID_PENDING)
{
- GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
- GST_UNLOCK(queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+ g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
- g_cond_signal (queue->emptycond);
- g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ if (queue->writer)
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
+ queue->writer = TRUE;
+ g_cond_wait (queue->not_full, queue->qlock);
+ queue->writer = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
+ queue->level_buffers, queue->size_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
-// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
+ GST_DEBUG_PAD_NAME(pad),
+ queue->level_buffers, queue->size_buffers);
+
+ /* reader waiting on an empty queue */
+ reader = queue->reader;
- /* if we were empty, but aren't any more, signal a condition */
- if (queue->level_buffers == 1)
+ g_mutex_unlock (queue->qlock);
+
+ if (reader)
{
- GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
- g_cond_signal (queue->emptycond);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
+ g_cond_signal (queue->not_empty);
}
-
- GST_UNLOCK (queue);
}
static GstBuffer *
GstQueue *queue;
GstBuffer *buf = NULL;
GSList *front;
- const guchar *name;
+ gboolean writer;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- name = GST_ELEMENT_NAME (queue);
+
+ writer = FALSE;
/* have to lock for thread-safety */
- GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
- GST_LOCK (queue);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
-
- while (!queue->level_buffers) {
- if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
- GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
- GST_UNLOCK(queue);
- // this return NULL shouldn't hurt anything...
- return NULL;
- }
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+ g_mutex_lock (queue->qlock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ while (queue->level_buffers == 0) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_VOID_PENDING)
{
- GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
- GST_UNLOCK(queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+ g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
- g_cond_signal (queue->fullcond);
- g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ if (queue->reader)
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
+ queue->reader = TRUE;
+ g_cond_wait (queue->not_empty, queue->qlock);
+ queue->reader = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
- GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
-// if (queue->level_buffers < queue->size_buffers)
- if (queue->level_buffers == queue->size_buffers)
- {
- GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
- g_cond_signal (queue->fullcond);
- }
-
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
- GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
+ GST_DEBUG_PAD_NAME(pad),
+ queue->level_buffers, queue->size_buffers);
+
+ /* writer waiting on a full queue */
+ writer = queue->writer;
+
+ g_mutex_unlock (queue->qlock);
- GST_UNLOCK(queue);
+ if (writer)
+ {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
+ g_cond_signal (queue->not_full);
+ }
+
+ // FIXME where should this be? locked?
+ if (GST_IS_EVENT(buf)) {
+ GstEvent *event = GST_EVENT(buf);
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_EOS:
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
+ gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
+ break;
+ default:
+ break;
+ }
+ }
return buf;
}
gint leaky; /* whether the queue is leaky, and if so at which end */
-// GMutex *lock; (optimization?)
- GCond *emptycond;
- GCond *fullcond;
+ GMutex *qlock; /* lock for queue (vs object lock) */
+ /* we are single reader and single writer queue */
+ gboolean reader; /* reader waiting on empty queue */
+ gboolean writer; /* writer waiting on full queue */
+ GCond *not_empty; /* signals buffers now available for reading */
+ GCond *not_full; /* signals space now available for writing */
GTimeVal *timeval; /* the timeout for the queue locking */
};
} else {
GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
+gst_schedule_show(sched);
//eos = TRUE;
}
} else {
//
//FIXME also make this more efficient by keeping list of managed queues
THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
- GST_LOCK(e);
- g_cond_signal((GST_QUEUE(e)->emptycond));
- g_cond_signal((GST_QUEUE(e)->fullcond));
- GST_UNLOCK(e);
+ //GST_LOCK(e);
+ g_cond_signal((GST_QUEUE(e)->not_empty));
+ g_cond_signal((GST_QUEUE(e)->not_full));
+ //GST_UNLOCK(e);
}
else
{
if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
{
THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
- GST_LOCK(peerelement);
- g_cond_signal(GST_QUEUE(peerelement)->emptycond);
- g_cond_signal(GST_QUEUE(peerelement)->fullcond);
- GST_UNLOCK(peerelement);
+ //GST_LOCK(peerelement);
+ g_cond_signal(GST_QUEUE(peerelement)->not_empty);
+ g_cond_signal(GST_QUEUE(peerelement)->not_full);
+ //GST_UNLOCK(peerelement);
}
}
}
g_print("fakesrc: get ******* (%s:%s)> (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n");
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n");
return buf;
}
static GstBuffer * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+static void gst_queue_locked_flush (GstQueue *queue);
static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
- queue->emptycond = g_cond_new ();
- queue->fullcond = g_cond_new ();
- GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
+ queue->qlock = g_mutex_new ();
+ queue->reader = FALSE;
+ queue->writer = FALSE;
+ queue->not_empty = g_cond_new ();
+ queue->not_full = g_cond_new ();
+ GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static GstBufferPool*
return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
}
-static gboolean
-gst_queue_handle_event (GstPad *pad)
-{
- GstQueue *queue;
-
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-
- GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
-
- GST_LOCK (queue);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
- queue->level_buffers);
-
- GST_FLAG_SET (pad, GST_PAD_EOS);
-
- g_cond_signal (queue->emptycond);
-
- GST_UNLOCK (queue);
-
- return TRUE;
-}
-
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
- GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
-
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
gst_buffer_unref (GST_BUFFER (data));
}
static void
-gst_queue_flush (GstQueue *queue)
+gst_queue_locked_flush (GstQueue *queue)
{
g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
- (char *) GST_ELEMENT_NAME (queue));
+ (gpointer) queue);
g_slist_free (queue->queue);
queue->queue = NULL;
queue->timeval = NULL;
}
+static void
+gst_queue_flush (GstQueue *queue)
+{
+ g_mutex_lock (queue->qlock);
+ gst_queue_locked_flush (queue);
+ g_mutex_unlock (queue->qlock);
+}
+
static void
gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
- const guchar *name;
+ gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- name = GST_ELEMENT_NAME (queue);
- /* we have to lock the queue since we span threads */
+ reader = FALSE;
-// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
- GST_LOCK (queue);
+ /* we have to lock the queue since we span threads */
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
- gst_queue_flush (queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+ g_mutex_lock (queue->qlock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
+
+ if (GST_IS_EVENT(buf)) {
+ GstEvent *event = GST_EVENT(buf);
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_FLUSH:
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
+ gst_queue_locked_flush (queue);
+ break;
+ default:
+ break;
+ }
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
- if (queue->level_buffers >= queue->size_buffers) {
+ if (queue->level_buffers == queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
+ // FIXME don't want to leak events!
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+ if (GST_IS_EVENT (buf))
+ fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+ GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+ GST_EVENT_TYPE(GST_EVENT(buf)));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
- GST_UNLOCK (queue);
+ g_mutex_unlock (queue->qlock);
return;
}
// otherwise we have to push a buffer off the other end
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
+ if (GST_IS_EVENT (leakbuf))
+ fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+ GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+ GST_EVENT_TYPE(GST_EVENT(leakbuf)));
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
}
}
- while (queue->level_buffers >= queue->size_buffers) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+ queue->level_buffers, queue->size_buffers);
+ while (queue->level_buffers == queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_VOID_PENDING)
{
- GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
- GST_UNLOCK(queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+ g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
- g_cond_signal (queue->emptycond);
- g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ if (queue->writer)
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
+ queue->writer = TRUE;
+ g_cond_wait (queue->not_full, queue->qlock);
+ queue->writer = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
+ queue->level_buffers, queue->size_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
-// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
+ GST_DEBUG_PAD_NAME(pad),
+ queue->level_buffers, queue->size_buffers);
+
+ /* reader waiting on an empty queue */
+ reader = queue->reader;
- /* if we were empty, but aren't any more, signal a condition */
- if (queue->level_buffers == 1)
+ g_mutex_unlock (queue->qlock);
+
+ if (reader)
{
- GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
- g_cond_signal (queue->emptycond);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
+ g_cond_signal (queue->not_empty);
}
-
- GST_UNLOCK (queue);
}
static GstBuffer *
GstQueue *queue;
GstBuffer *buf = NULL;
GSList *front;
- const guchar *name;
+ gboolean writer;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- name = GST_ELEMENT_NAME (queue);
+
+ writer = FALSE;
/* have to lock for thread-safety */
- GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
- GST_LOCK (queue);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
-
- while (!queue->level_buffers) {
- if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
- GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
- GST_UNLOCK(queue);
- // this return NULL shouldn't hurt anything...
- return NULL;
- }
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+ g_mutex_lock (queue->qlock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ while (queue->level_buffers == 0) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_VOID_PENDING)
{
- GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
- GST_UNLOCK(queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+ g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
- g_cond_signal (queue->fullcond);
- g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ if (queue->reader)
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
+ queue->reader = TRUE;
+ g_cond_wait (queue->not_empty, queue->qlock);
+ queue->reader = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
- GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
-// if (queue->level_buffers < queue->size_buffers)
- if (queue->level_buffers == queue->size_buffers)
- {
- GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
- g_cond_signal (queue->fullcond);
- }
-
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
- GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
+ GST_DEBUG_PAD_NAME(pad),
+ queue->level_buffers, queue->size_buffers);
+
+ /* writer waiting on a full queue */
+ writer = queue->writer;
+
+ g_mutex_unlock (queue->qlock);
- GST_UNLOCK(queue);
+ if (writer)
+ {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
+ g_cond_signal (queue->not_full);
+ }
+
+ // FIXME where should this be? locked?
+ if (GST_IS_EVENT(buf)) {
+ GstEvent *event = GST_EVENT(buf);
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_EOS:
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
+ gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
+ break;
+ default:
+ break;
+ }
+ }
return buf;
}
gint leaky; /* whether the queue is leaky, and if so at which end */
-// GMutex *lock; (optimization?)
- GCond *emptycond;
- GCond *fullcond;
+ GMutex *qlock; /* lock for queue (vs object lock) */
+ /* we are single reader and single writer queue */
+ gboolean reader; /* reader waiting on empty queue */
+ gboolean writer; /* writer waiting on full queue */
+ GCond *not_empty; /* signals buffers now available for reading */
+ GCond *not_full; /* signals space now available for writing */
GTimeVal *timeval; /* the timeout for the queue locking */
};
/* Statistics signals and args */
enum {
SIGNAL_UPDATE,
- /* FILL ME */
LAST_SIGNAL
};
static void gst_statistics_print (GstStatistics *statistics);
static GstElementClass *parent_class = NULL;
-static guint gst_statistics_signals[LAST_SIGNAL] = { 0 };
+static guint gst_statistics_signals[LAST_SIGNAL] = { 0, };
static stats zero_stats = { 0, };
gst_statistics_signals[SIGNAL_UPDATE] =
g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL,
- g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0);
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property);
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT (buf);
- gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
statistics->stats.events += 1;
- if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) {
- update = TRUE;
+ if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
+ gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
+ if (statistics->update_on_eos) {
+ update = TRUE;
+ }
}
if (statistics->update_freq.events) {
statistics->update_count.events += 1;
if (update) {
if (statistics->update) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n");
g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n");
}
if (!statistics->silent) {
gst_statistics_print(statistics);