queue->queue = NULL;
queue->tail = NULL;
queue->level_buffers = 0;
+ queue->max_buffers = 5;
queue->level_bytes = 0;
queue->size_buffers = 0;
queue->size_bytes = 0;
- queue->waiterlock = g_mutex_new();
- queue->waitercond = g_cond_new();
+ queue->emptylock = g_mutex_new();
+ queue->emptycond = g_cond_new();
+
+ queue->fulllock = g_mutex_new();
+ queue->fullcond = g_cond_new();
}
GstElement *gst_queue_new(gchar *name) {
queue = GST_QUEUE(pad->parent);
/* we have to lock the queue since we span threads */
+
GST_LOCK(queue);
+ if (queue->level_buffers >= queue->max_buffers) {
+ GST_UNLOCK(queue);
+ while (queue->level_buffers >= queue->max_buffers) {
+ g_mutex_lock(queue->fulllock);
+// g_print("0");
+ g_cond_wait(queue->fullcond,queue->fulllock);
+ g_mutex_unlock(queue->fulllock);
+ }
+ GST_LOCK(queue);
+ }
+
+
/* put the buffer on the head of the list */
/* if the queue is NULL, start a new list and make this the tail */
if (!queue->queue) {
GST_UNLOCK(queue);
if (tosignal) {
- g_mutex_lock(queue->waiterlock);
- g_cond_signal(queue->waitercond);
- g_mutex_unlock(queue->waiterlock);
+ g_mutex_lock(queue->emptylock);
+ g_cond_signal(queue->emptycond);
+ g_mutex_unlock(queue->emptylock);
// g_print(">");
}
}
GstQueue *queue = GST_QUEUE(connection);
GstBuffer *buf = NULL;
GList *front;
+ gboolean tosignal = FALSE;
/* have to lock for thread-safety */
GST_LOCK(queue);
if (!queue->level_buffers) {
GST_UNLOCK(queue);
while (!queue->level_buffers) {
- g_mutex_lock(queue->waiterlock);
+ g_mutex_lock(queue->emptylock);
// g_print("0");
- g_cond_wait(queue->waitercond,queue->waiterlock);
- g_mutex_unlock(queue->waiterlock);
+ g_cond_wait(queue->emptycond,queue->emptylock);
+ g_mutex_unlock(queue->emptylock);
}
GST_LOCK(queue);
}
queue->level_buffers--;
// g_print("-");
- /* unlock now */
+ tosignal = queue->level_buffers < queue->max_buffers;
GST_UNLOCK(queue);
+
+ if (tosignal) {
+ g_mutex_lock(queue->fulllock);
+ g_cond_signal(queue->fullcond);
+ g_mutex_unlock(queue->fulllock);
+ }
+
+
+ /* unlock now */
}
GList *tail; /* have to keep track of this myself */
gint level_buffers; /* number of buffers queued here */
+ gint max_buffers; /* maximum number of buffers queued here */
gint level_bytes; /* number of bytes queued here */
gint size_buffers; /* size of queue in buffers */
gint size_bytes; /* size of queue in bytes */
- GMutex *waiterlock; /* used when the queue is empty */
- GCond *waitercond;
+ GMutex *emptylock; /* used when the queue is empty */
+ GCond *emptycond;
+ GMutex *fulllock; /* used when the queue is full */
+ GCond *fullcond;
};
struct _GstQueueClass {
queue->queue = NULL;
queue->tail = NULL;
queue->level_buffers = 0;
+ queue->max_buffers = 5;
queue->level_bytes = 0;
queue->size_buffers = 0;
queue->size_bytes = 0;
- queue->waiterlock = g_mutex_new();
- queue->waitercond = g_cond_new();
+ queue->emptylock = g_mutex_new();
+ queue->emptycond = g_cond_new();
+
+ queue->fulllock = g_mutex_new();
+ queue->fullcond = g_cond_new();
}
GstElement *gst_queue_new(gchar *name) {
queue = GST_QUEUE(pad->parent);
/* we have to lock the queue since we span threads */
+
GST_LOCK(queue);
+ if (queue->level_buffers >= queue->max_buffers) {
+ GST_UNLOCK(queue);
+ while (queue->level_buffers >= queue->max_buffers) {
+ g_mutex_lock(queue->fulllock);
+// g_print("0");
+ g_cond_wait(queue->fullcond,queue->fulllock);
+ g_mutex_unlock(queue->fulllock);
+ }
+ GST_LOCK(queue);
+ }
+
+
/* put the buffer on the head of the list */
/* if the queue is NULL, start a new list and make this the tail */
if (!queue->queue) {
GST_UNLOCK(queue);
if (tosignal) {
- g_mutex_lock(queue->waiterlock);
- g_cond_signal(queue->waitercond);
- g_mutex_unlock(queue->waiterlock);
+ g_mutex_lock(queue->emptylock);
+ g_cond_signal(queue->emptycond);
+ g_mutex_unlock(queue->emptylock);
// g_print(">");
}
}
GstQueue *queue = GST_QUEUE(connection);
GstBuffer *buf = NULL;
GList *front;
+ gboolean tosignal = FALSE;
/* have to lock for thread-safety */
GST_LOCK(queue);
if (!queue->level_buffers) {
GST_UNLOCK(queue);
while (!queue->level_buffers) {
- g_mutex_lock(queue->waiterlock);
+ g_mutex_lock(queue->emptylock);
// g_print("0");
- g_cond_wait(queue->waitercond,queue->waiterlock);
- g_mutex_unlock(queue->waiterlock);
+ g_cond_wait(queue->emptycond,queue->emptylock);
+ g_mutex_unlock(queue->emptylock);
}
GST_LOCK(queue);
}
queue->level_buffers--;
// g_print("-");
- /* unlock now */
+ tosignal = queue->level_buffers < queue->max_buffers;
GST_UNLOCK(queue);
+
+ if (tosignal) {
+ g_mutex_lock(queue->fulllock);
+ g_cond_signal(queue->fullcond);
+ g_mutex_unlock(queue->fulllock);
+ }
+
+
+ /* unlock now */
}
GList *tail; /* have to keep track of this myself */
gint level_buffers; /* number of buffers queued here */
+ gint max_buffers; /* maximum number of buffers queued here */
gint level_bytes; /* number of bytes queued here */
gint size_buffers; /* size of queue in buffers */
gint size_bytes; /* size of queue in bytes */
- GMutex *waiterlock; /* used when the queue is empty */
- GCond *waitercond;
+ GMutex *emptylock; /* used when the queue is empty */
+ GCond *emptycond;
+ GMutex *fulllock; /* used when the queue is full */
+ GCond *fullcond;
};
struct _GstQueueClass {