gpointer query_user_data;
GstCollectPadsClipFunction clip_func;
gpointer clip_user_data;
+ GstCollectPadsFlushFunction flush_func;
+ gpointer flush_user_data;
/* no other lock needed */
GMutex evt_lock; /* these make up sort of poor man's event signaling */
GCond evt_cond;
guint32 evt_cookie;
+
+ gboolean seeking;
+ gboolean pending_flush_start;
+ gboolean pending_flush_stop;
};
static void gst_collect_pads_clear (GstCollectPads * pads,
g_mutex_init (&pads->priv->evt_lock);
g_cond_init (&pads->priv->evt_cond);
pads->priv->evt_cookie = 0;
+
+ pads->priv->seeking = FALSE;
+ pads->priv->pending_flush_start = FALSE;
+ pads->priv->pending_flush_stop = FALSE;
}
static void
pads->priv->clip_user_data = user_data;
}
+void
+gst_collect_pads_set_flush_function (GstCollectPads * pads,
+ GstCollectPadsFlushFunction func, gpointer user_data)
+{
+ g_return_if_fail (pads != NULL);
+ g_return_if_fail (GST_IS_COLLECT_PADS (pads));
+
+ pads->priv->flush_func = func;
+ pads->priv->flush_user_data = user_data;
+}
+
/**
* gst_collect_pads_add_pad:
* @pads: the collectpads to use
/* Do something only on a change and if not locked */
if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
(GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
- ! !waiting)) {
+ !!waiting)) {
/* Set waiting state for this pad */
if (waiting)
GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
+ if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
+ TRUE, FALSE) == TRUE)) {
+ GST_INFO_OBJECT (pads, "finished seeking");
+ }
do {
flow_ret = func (pads, user_data);
} while (flow_ret == GST_FLOW_OK);
pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
GST_DEBUG_FUNCPTR_NAME (func));
+ if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
+ TRUE, FALSE) == TRUE)) {
+ GST_INFO_OBJECT (pads, "finished seeking");
+ }
flow_ret = func (pads, user_data);
collected = TRUE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
- /* forward event to unblock check_collected */
- GST_DEBUG_OBJECT (pad, "forwarding flush start");
- res = gst_pad_event_default (pad, parent, event);
- event = NULL;
-
- /* now unblock the chain function.
- * no cond per pad, so they all unblock,
- * non-flushing block again */
- GST_COLLECT_PADS_STREAM_LOCK (pads);
- GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
- gst_collect_pads_clear (pads, data);
-
- /* cater for possible default muxing functionality */
- if (buffer_func) {
- /* restore to initial state */
- gst_collect_pads_set_waiting (pads, data, TRUE);
- /* if the current pad is affected, reset state, recalculate later */
- if (pads->priv->earliest_data == data) {
- unref_data (data);
- pads->priv->earliest_data = NULL;
- pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
+ if (g_atomic_int_get (&pads->priv->seeking)) {
+ /* drop all but the first FLUSH_STARTs when seeking */
+ if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start,
+ TRUE, FALSE) == FALSE)
+ goto eat;
+
+ /* unblock collect pads */
+ gst_pad_event_default (pad, parent, event);
+ event = NULL;
+
+ GST_COLLECT_PADS_STREAM_LOCK (pads);
+ /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
+ * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
+ * non-flushing (which happens in gst_collect_pads_event_default).
+ */
+ gst_collect_pads_set_flushing (pads, TRUE);
+
+ if (pads->priv->flush_func)
+ pads->priv->flush_func (pads, pads->priv->flush_user_data);
+
+ g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
+ GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+
+ goto eat;
+ } else {
+ /* forward event to unblock check_collected */
+ GST_DEBUG_OBJECT (pad, "forwarding flush start");
+ res = gst_pad_event_default (pad, parent, event);
+ event = NULL;
+
+ /* now unblock the chain function.
+ * no cond per pad, so they all unblock,
+ * non-flushing block again */
+ GST_COLLECT_PADS_STREAM_LOCK (pads);
+ GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
+ gst_collect_pads_clear (pads, data);
+
+ /* cater for possible default muxing functionality */
+ if (buffer_func) {
+ /* restore to initial state */
+ gst_collect_pads_set_waiting (pads, data, TRUE);
+ /* if the current pad is affected, reset state, recalculate later */
+ if (pads->priv->earliest_data == data) {
+ unref_data (data);
+ pads->priv->earliest_data = NULL;
+ pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
+ }
}
- }
- GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+ GST_COLLECT_PADS_STREAM_UNLOCK (pads);
- goto eat;
+ goto eat;
+ }
}
case GST_EVENT_FLUSH_STOP:
{
}
GST_COLLECT_PADS_STREAM_UNLOCK (pads);
- goto forward;
+ if (g_atomic_int_get (&pads->priv->seeking)) {
+ if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
+ TRUE, FALSE))
+ goto forward;
+ else
+ goto eat;
+ } else {
+ goto forward;
+ }
}
case GST_EVENT_EOS:
{
return gst_pad_event_default (pad, parent, event);
}
+typedef struct
+{
+ GstEvent *event;
+ gboolean result;
+} EventData;
+
+static gboolean
+event_forward_func (GstPad * pad, EventData * data)
+{
+ data->result &= gst_pad_push_event (pad, gst_event_ref (data->event));
+ return !data->result;
+}
+
+static gboolean
+forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
+{
+ EventData data;
+
+ data.event = event;
+ data.result = TRUE;
+
+ gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
+ return data.result;
+}
+
+gboolean
+gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
+ GstEvent * event)
+{
+ GstObject *parent;
+ gboolean res = TRUE;
+
+ parent = GST_OBJECT_PARENT (pad);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEEK:{
+ GstSeekFlags flags;
+
+ GST_INFO_OBJECT (pads, "starting seek");
+
+ gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
+ if (flags & GST_SEEK_FLAG_FLUSH) {
+ g_atomic_int_set (&pads->priv->seeking, TRUE);
+ g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
+ /* forward the seek upstream */
+ res = forward_event_to_all_sinkpads (pad, event);
+ event = NULL;
+ if (!res) {
+ g_atomic_int_set (&pads->priv->seeking, FALSE);
+ g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
+ }
+ }
+
+ GST_INFO_OBJECT (pads, "seek done, result: %d", res);
+
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (event)
+ res = gst_pad_event_default (pad, parent, event);
+
+ return res;
+}
+
static gboolean
gst_collect_pads_event_default_internal (GstCollectPads * pads,
GstCollectData * data, GstEvent * event, gpointer user_data)
GstBuffer *inbuffer, GstBuffer **outbuffer,
gpointer user_data);
+
+/**
+ * GstCollectPadsFlushFunction:
+ * @pads: a #GstCollectPads
+ * @user_data: user data
+ *
+ * A function that will be called while processing a flushing seek event.
+ *
+ * The function should flush any internal state of the element and the state of
+ * all the pads. It should clear only the state not directly managed by the
+ * @pads object. It is therefore not necessary to call
+ * gst_collect_pads_set_flushing nor gst_collect_pads_clear from this function.
+ *
+ * Since: FIXME
+ */
+typedef void (*GstCollectPadsFlushFunction) (GstCollectPads *pads, gpointer user_data);
+
/**
* GST_COLLECT_PADS_GET_STREAM_LOCK:
* @pads: a #GstCollectPads
void gst_collect_pads_set_clip_function (GstCollectPads *pads,
GstCollectPadsClipFunction clipfunc,
gpointer user_data);
+void gst_collect_pads_set_flush_function (GstCollectPads *pads,
+ GstCollectPadsFlushFunction func,
+ gpointer user_data);
/* pad management */
GstCollectData* gst_collect_pads_add_pad (GstCollectPads *pads, GstPad *pad, guint size,
/* default handlers */
gboolean gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
GstEvent * event, gboolean discard);
+gboolean gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
+ GstEvent * event);
gboolean gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
GstQuery * query, gboolean discard);