This simplifies the code and also makes sure that we don't forget to check all
conditions for waiting.
Also fix a potential deadlock caused by not checking if we're actually still
running before starting to wait.
} G_STMT_END
#define SRC_STREAM_BROADCAST(self) G_STMT_START { \
} G_STMT_END
#define SRC_STREAM_BROADCAST(self) G_STMT_START { \
- GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
- g_thread_self()); \
- g_cond_broadcast(&(self->priv->src_cond)); \
- } G_STMT_END
-
-#define KICK_SRC_THREAD(self) G_STMT_START { \
SRC_STREAM_LOCK (self); \
SRC_STREAM_LOCK (self); \
- GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \
- g_thread_self ()); \
+ GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
+ g_thread_self()); \
if (self->priv->aggregate_id) \
gst_clock_id_unschedule (self->priv->aggregate_id); \
if (self->priv->aggregate_id) \
gst_clock_id_unschedule (self->priv->aggregate_id); \
- self->priv->n_kicks++; \
- SRC_STREAM_BROADCAST (self); \
+ g_cond_broadcast(&(self->priv->src_cond)); \
SRC_STREAM_UNLOCK (self); \
} G_STMT_END
SRC_STREAM_UNLOCK (self); \
} G_STMT_END
/* aggregate */
GstClockID aggregate_id;
/* aggregate */
GstClockID aggregate_id;
GMutex src_lock;
GCond src_cond;
};
GMutex src_lock;
GCond src_cond;
};
}
SRC_STREAM_LOCK (self);
}
SRC_STREAM_LOCK (self);
+
+ /* Before waiting, check if we're actually still running */
+ if (!self->priv->running || !self->priv->send_eos) {
+ SRC_STREAM_UNLOCK (self);
+
+ return FALSE;
+ }
+
start = gst_aggregator_get_next_time (self);
if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
|| !GST_CLOCK_TIME_IS_VALID (start)) {
start = gst_aggregator_get_next_time (self);
if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
|| !GST_CLOCK_TIME_IS_VALID (start)) {
- while (self->priv->n_kicks <= 0)
- SRC_STREAM_WAIT (self);
- self->priv->n_kicks--;
+ /* We wake up here when something happened, and below
+ * then check if we're ready now. If we return FALSE,
+ * we will be directly called again.
+ */
+ SRC_STREAM_WAIT (self);
} else {
GstClockTime base_time, time;
GstClock *clock;
} else {
GstClockTime base_time, time;
GstClock *clock;
gst_clock_id_unref (self->priv->aggregate_id);
self->priv->aggregate_id = NULL;
}
gst_clock_id_unref (self->priv->aggregate_id);
self->priv->aggregate_id = NULL;
}
GST_DEBUG_OBJECT (self, "clock returned %d", status);
GST_DEBUG_OBJECT (self, "clock returned %d", status);
flush_start ? "Pausing" : "Stopping");
self->priv->running = FALSE;
flush_start ? "Pausing" : "Stopping");
self->priv->running = FALSE;
- KICK_SRC_THREAD (self);
+ SRC_STREAM_BROADCAST (self);
if (flush_start) {
res = gst_pad_push_event (self->srcpad, flush_start);
}
gst_pad_stop_task (self->srcpad);
if (flush_start) {
res = gst_pad_push_event (self->srcpad, flush_start);
}
gst_pad_stop_task (self->srcpad);
- KICK_SRC_THREAD (self);
+ SRC_STREAM_BROADCAST (self);
GST_INFO_OBJECT (self, "Starting srcpad task");
self->priv->running = TRUE;
GST_INFO_OBJECT (self, "Starting srcpad task");
self->priv->running = TRUE;
- self->priv->n_kicks = 0;
gst_pad_start_task (GST_PAD (self->srcpad),
(GstTaskFunction) aggregate_func, self, NULL);
}
gst_pad_start_task (GST_PAD (self->srcpad),
(GstTaskFunction) aggregate_func, self, NULL);
}
gst_pad_push_event (self->srcpad, event);
priv->send_eos = TRUE;
event = NULL;
gst_pad_push_event (self->srcpad, event);
priv->send_eos = TRUE;
event = NULL;
- KICK_SRC_THREAD (self);
+ SRC_STREAM_BROADCAST (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad);
}
PAD_UNLOCK_EVENT (aggpad);
}
PAD_UNLOCK_EVENT (aggpad);
- KICK_SRC_THREAD (self);
+ SRC_STREAM_BROADCAST (self);
goto eat;
}
case GST_EVENT_SEGMENT:
goto eat;
}
case GST_EVENT_SEGMENT:
gst_buffer_replace (&tmpbuf, NULL);
gst_element_remove_pad (element, pad);
gst_buffer_replace (&tmpbuf, NULL);
gst_element_remove_pad (element, pad);
- KICK_SRC_THREAD (self);
+ SRC_STREAM_BROADCAST (self);
if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
- KICK_SRC_THREAD (self);
+ SRC_STREAM_BROADCAST (self);
GST_DEBUG_OBJECT (aggpad, "Done chaining");
GST_DEBUG_OBJECT (aggpad, "Done chaining");