+2004-06-11 Thomas Vander Stichele <thomas at apestaart dot org>
+
+ * gst/gstqueue.c:
+ * gst/gstqueue.h:
+ fix removing from the wrong queue on event timeout
+ fix disposing of the event queue by casting correctly
+ add mutexes for handling the event queue
+ someone was sleeping when fixing queue last time around :)
+
2004-06-10 Johan Dahlin <johan@gnome.org>
* gst/gst.c (gst_init_check_with_popt_table): Do not fail on
queue->item_del = g_cond_new ();
queue->event_done = g_cond_new ();
queue->events = g_queue_new ();
+ queue->event_lock = g_mutex_new ();
queue->queue = g_queue_new ();
GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
g_cond_free (queue->event_done);
+ g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) {
- GstEvent *event = g_queue_pop_head (queue->events);
+ GstQueueEventResponse *er = g_queue_pop_head (queue->events);
- gst_event_unref (event);
+ gst_event_unref (er->event);
}
+ g_mutex_unlock (queue->event_lock);
+ g_mutex_free (queue->event_lock);
g_queue_free (queue->events);
if (G_OBJECT_CLASS (parent_class)->dispose)
gst_queue_handle_pending_events (GstQueue * queue)
{
/* check for events to send upstream */
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "handling pending events, events queue of size %d",
+ g_queue_get_length (queue->events));
+ g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) {
- GstQueueEventResponse *er = g_queue_pop_head (queue->events);
-
- GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream");
+ GstQueueEventResponse *er;
+
+ er = g_queue_pop_head (queue->events);
+
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "sending event %p (%d) from event response %p upstream",
+ er->event, GST_EVENT_TYPE (er->event), er);
+ if (er->handled) {
+ /* change this to an assert when this file gets reviewed properly. */
+ GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
+ ("already handled event %p (%d) from event response %p upstream",
+ er->event, GST_EVENT_TYPE (er->event), er));
+ break;
+ }
+ g_mutex_unlock (queue->event_lock);
er->ret = gst_pad_event_default (queue->srcpad, er->event);
er->handled = TRUE;
g_cond_signal (queue->event_done);
+ g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
}
+ g_mutex_unlock (queue->event_lock);
}
#define STATUS(queue, msg) \
GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res;
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
+ event, GST_EVENT_TYPE (event));
g_mutex_lock (queue->qlock);
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
/* push the event to the queue and wait for upstream consumption */
er.event = event;
er.handled = FALSE;
+ g_mutex_lock (queue->event_lock);
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "putting event %p (%d) on internal queue", event,
+ GST_EVENT_TYPE (event));
g_queue_push_tail (queue->events, &er);
+ g_mutex_unlock (queue->event_lock);
GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"Preparing for loop for event handler");
/* see the chain function on why this is here - it prevents a deadlock */
if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
!er.handled) {
GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
- "timeout in upstream event handling");
- /* remove ourselves from the pending list. Since we're
- * locked, others cannot reference this anymore. */
- queue->queue->head = g_list_remove (queue->queue->head, &er);
- queue->queue->head = g_list_first (queue->queue->head);
- queue->queue->tail = g_list_last (queue->queue->head);
- queue->queue->length--;
+ "timeout in upstream event handling, dropping event %p (%s)",
+ er.event, GST_EVENT_TYPE (er.event));
+ g_mutex_lock (queue->event_lock);
+ /* since this queue is for src events (ie upstream), this thread is
+ * the only one that is pushing stuff on it, so we're sure that
+ * it's still the tail element. FIXME: But in practice, we should use
+ * GList instead of GQueue for this so we can remove any element in
+ * the list. */
+ g_queue_pop_tail (queue->events);
+ g_mutex_unlock (queue->event_lock);
+ gst_event_unref (er.event);
res = FALSE;
goto handled;
}
GTimeVal *timeval; /* the timeout for the queue locking */
GQueue *events; /* upstream events get decoupled here */
+ GMutex *event_lock; /* lock when handling the events queue */
GstCaps *negotiated_caps;
queue->item_del = g_cond_new ();
queue->event_done = g_cond_new ();
queue->events = g_queue_new ();
+ queue->event_lock = g_mutex_new ();
queue->queue = g_queue_new ();
GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
g_cond_free (queue->event_done);
+ g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) {
- GstEvent *event = g_queue_pop_head (queue->events);
+ GstQueueEventResponse *er = g_queue_pop_head (queue->events);
- gst_event_unref (event);
+ gst_event_unref (er->event);
}
+ g_mutex_unlock (queue->event_lock);
+ g_mutex_free (queue->event_lock);
g_queue_free (queue->events);
if (G_OBJECT_CLASS (parent_class)->dispose)
gst_queue_handle_pending_events (GstQueue * queue)
{
/* check for events to send upstream */
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "handling pending events, events queue of size %d",
+ g_queue_get_length (queue->events));
+ g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) {
- GstQueueEventResponse *er = g_queue_pop_head (queue->events);
-
- GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream");
+ GstQueueEventResponse *er;
+
+ er = g_queue_pop_head (queue->events);
+
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "sending event %p (%d) from event response %p upstream",
+ er->event, GST_EVENT_TYPE (er->event), er);
+ if (er->handled) {
+ /* change this to an assert when this file gets reviewed properly. */
+ GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
+ ("already handled event %p (%d) from event response %p upstream",
+ er->event, GST_EVENT_TYPE (er->event), er));
+ break;
+ }
+ g_mutex_unlock (queue->event_lock);
er->ret = gst_pad_event_default (queue->srcpad, er->event);
er->handled = TRUE;
g_cond_signal (queue->event_done);
+ g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
}
+ g_mutex_unlock (queue->event_lock);
}
#define STATUS(queue, msg) \
GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res;
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
+ event, GST_EVENT_TYPE (event));
g_mutex_lock (queue->qlock);
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
/* push the event to the queue and wait for upstream consumption */
er.event = event;
er.handled = FALSE;
+ g_mutex_lock (queue->event_lock);
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+ "putting event %p (%d) on internal queue", event,
+ GST_EVENT_TYPE (event));
g_queue_push_tail (queue->events, &er);
+ g_mutex_unlock (queue->event_lock);
GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"Preparing for loop for event handler");
/* see the chain function on why this is here - it prevents a deadlock */
if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
!er.handled) {
GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
- "timeout in upstream event handling");
- /* remove ourselves from the pending list. Since we're
- * locked, others cannot reference this anymore. */
- queue->queue->head = g_list_remove (queue->queue->head, &er);
- queue->queue->head = g_list_first (queue->queue->head);
- queue->queue->tail = g_list_last (queue->queue->head);
- queue->queue->length--;
+ "timeout in upstream event handling, dropping event %p (%s)",
+ er.event, GST_EVENT_TYPE (er.event));
+ g_mutex_lock (queue->event_lock);
+ /* since this queue is for src events (ie upstream), this thread is
+ * the only one that is pushing stuff on it, so we're sure that
+ * it's still the tail element. FIXME: But in practice, we should use
+ * GList instead of GQueue for this so we can remove any element in
+ * the list. */
+ g_queue_pop_tail (queue->events);
+ g_mutex_unlock (queue->event_lock);
+ gst_event_unref (er.event);
res = FALSE;
goto handled;
}
GTimeVal *timeval; /* the timeout for the queue locking */
GQueue *events; /* upstream events get decoupled here */
+ GMutex *event_lock; /* lock when handling the events queue */
GstCaps *negotiated_caps;