From: Alessandro Decina Date: Mon, 16 Sep 2013 07:55:58 +0000 (+0200) Subject: collectpads: implement flushing seek support X-Git-Tag: 1.3.1~330 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=7aec5739eb6cc667afde0db8b69f2f069da5e15e;p=platform%2Fupstream%2Fgstreamer.git collectpads: implement flushing seek support Implement common flushing seek logic in GstCollectPads. Add new API so that elements can opt-in to using the new logic (gst_collect_pads_src_event_default) and can extend it (gst_collect_pads_set_flush_function) to flush any internal state. See https://bugzilla.gnome.org/show_bug.cgi?id=706779 and https://bugzilla.gnome.org/show_bug.cgi?id=706441 for the background discussion. API: gst_collect_pads_set_flush_function() API: gst_collect_pads_src_event_default() https://bugzilla.gnome.org/show_bug.cgi?id=708416 --- diff --git a/libs/gst/base/gstcollectpads.c b/libs/gst/base/gstcollectpads.c index 799835a..5951cca 100644 --- a/libs/gst/base/gstcollectpads.c +++ b/libs/gst/base/gstcollectpads.c @@ -132,11 +132,17 @@ struct _GstCollectPadsPrivate gpointer query_user_data; GstCollectPadsClipFunction clip_func; gpointer clip_user_data; + GstCollectPadsFlushFunction flush_func; + gpointer flush_user_data; /* no other lock needed */ GMutex evt_lock; /* these make up sort of poor man's event signaling */ GCond evt_cond; guint32 evt_cookie; + + gboolean seeking; + gboolean pending_flush_start; + gboolean pending_flush_stop; }; static void gst_collect_pads_clear (GstCollectPads * pads, @@ -260,6 +266,10 @@ gst_collect_pads_init (GstCollectPads * pads) g_mutex_init (&pads->priv->evt_lock); g_cond_init (&pads->priv->evt_cond); pads->priv->evt_cookie = 0; + + pads->priv->seeking = FALSE; + pads->priv->pending_flush_start = FALSE; + pads->priv->pending_flush_stop = FALSE; } static void @@ -541,6 +551,17 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads, pads->priv->clip_user_data = user_data; } +void +gst_collect_pads_set_flush_function (GstCollectPads * pads, + GstCollectPadsFlushFunction func, gpointer user_data) +{ + g_return_if_fail (pads != NULL); + g_return_if_fail (GST_IS_COLLECT_PADS (pads)); + + pads->priv->flush_func = func; + pads->priv->flush_user_data = user_data; +} + /** * gst_collect_pads_add_pad: * @pads: the collectpads to use @@ -1179,7 +1200,7 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data, /* Do something only on a change and if not locked */ if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) && (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) != - ! !waiting)) { + !!waiting)) { /* Set waiting state for this pad */ if (waiting) GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING); @@ -1284,6 +1305,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads) GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s", pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func)); + if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking, + TRUE, FALSE) == TRUE)) { + GST_INFO_OBJECT (pads, "finished seeking"); + } do { flow_ret = func (pads, user_data); } while (flow_ret == GST_FLOW_OK); @@ -1298,6 +1323,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads) pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func)); + if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking, + TRUE, FALSE) == TRUE)) { + GST_INFO_OBJECT (pads, "finished seeking"); + } flow_ret = func (pads, user_data); collected = TRUE; @@ -1624,33 +1653,59 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { - /* forward event to unblock check_collected */ - GST_DEBUG_OBJECT (pad, "forwarding flush start"); - res = gst_pad_event_default (pad, parent, event); - event = NULL; - - /* now unblock the chain function. - * no cond per pad, so they all unblock, - * non-flushing block again */ - GST_COLLECT_PADS_STREAM_LOCK (pads); - GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING); - gst_collect_pads_clear (pads, data); - - /* cater for possible default muxing functionality */ - if (buffer_func) { - /* restore to initial state */ - gst_collect_pads_set_waiting (pads, data, TRUE); - /* if the current pad is affected, reset state, recalculate later */ - if (pads->priv->earliest_data == data) { - unref_data (data); - pads->priv->earliest_data = NULL; - pads->priv->earliest_time = GST_CLOCK_TIME_NONE; + if (g_atomic_int_get (&pads->priv->seeking)) { + /* drop all but the first FLUSH_STARTs when seeking */ + if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start, + TRUE, FALSE) == FALSE) + goto eat; + + /* unblock collect pads */ + gst_pad_event_default (pad, parent, event); + event = NULL; + + GST_COLLECT_PADS_STREAM_LOCK (pads); + /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we + * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to + * non-flushing (which happens in gst_collect_pads_event_default). + */ + gst_collect_pads_set_flushing (pads, TRUE); + + if (pads->priv->flush_func) + pads->priv->flush_func (pads, pads->priv->flush_user_data); + + g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE); + GST_COLLECT_PADS_STREAM_UNLOCK (pads); + + goto eat; + } else { + /* forward event to unblock check_collected */ + GST_DEBUG_OBJECT (pad, "forwarding flush start"); + res = gst_pad_event_default (pad, parent, event); + event = NULL; + + /* now unblock the chain function. + * no cond per pad, so they all unblock, + * non-flushing block again */ + GST_COLLECT_PADS_STREAM_LOCK (pads); + GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING); + gst_collect_pads_clear (pads, data); + + /* cater for possible default muxing functionality */ + if (buffer_func) { + /* restore to initial state */ + gst_collect_pads_set_waiting (pads, data, TRUE); + /* if the current pad is affected, reset state, recalculate later */ + if (pads->priv->earliest_data == data) { + unref_data (data); + pads->priv->earliest_data = NULL; + pads->priv->earliest_time = GST_CLOCK_TIME_NONE; + } } - } - GST_COLLECT_PADS_STREAM_UNLOCK (pads); + GST_COLLECT_PADS_STREAM_UNLOCK (pads); - goto eat; + goto eat; + } } case GST_EVENT_FLUSH_STOP: { @@ -1673,7 +1728,15 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, } GST_COLLECT_PADS_STREAM_UNLOCK (pads); - goto forward; + if (g_atomic_int_get (&pads->priv->seeking)) { + if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop, + TRUE, FALSE)) + goto forward; + else + goto eat; + } else { + goto forward; + } } case GST_EVENT_EOS: { @@ -1774,6 +1837,73 @@ forward: return gst_pad_event_default (pad, parent, event); } +typedef struct +{ + GstEvent *event; + gboolean result; +} EventData; + +static gboolean +event_forward_func (GstPad * pad, EventData * data) +{ + data->result &= gst_pad_push_event (pad, gst_event_ref (data->event)); + return !data->result; +} + +static gboolean +forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event) +{ + EventData data; + + data.event = event; + data.result = TRUE; + + gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data); + return data.result; +} + +gboolean +gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad, + GstEvent * event) +{ + GstObject *parent; + gboolean res = TRUE; + + parent = GST_OBJECT_PARENT (pad); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK:{ + GstSeekFlags flags; + + GST_INFO_OBJECT (pads, "starting seek"); + + gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL); + if (flags & GST_SEEK_FLAG_FLUSH) { + g_atomic_int_set (&pads->priv->seeking, TRUE); + g_atomic_int_set (&pads->priv->pending_flush_start, TRUE); + /* forward the seek upstream */ + res = forward_event_to_all_sinkpads (pad, event); + event = NULL; + if (!res) { + g_atomic_int_set (&pads->priv->seeking, FALSE); + g_atomic_int_set (&pads->priv->pending_flush_start, FALSE); + } + } + + GST_INFO_OBJECT (pads, "seek done, result: %d", res); + + break; + } + default: + break; + } + + if (event) + res = gst_pad_event_default (pad, parent, event); + + return res; +} + static gboolean gst_collect_pads_event_default_internal (GstCollectPads * pads, GstCollectData * data, GstEvent * event, gpointer user_data) diff --git a/libs/gst/base/gstcollectpads.h b/libs/gst/base/gstcollectpads.h index 4c99d58..8d8128c 100644 --- a/libs/gst/base/gstcollectpads.h +++ b/libs/gst/base/gstcollectpads.h @@ -236,6 +236,23 @@ typedef GstFlowReturn (*GstCollectPadsClipFunction) (GstCollectPads *pads, GstCo GstBuffer *inbuffer, GstBuffer **outbuffer, gpointer user_data); + +/** + * GstCollectPadsFlushFunction: + * @pads: a #GstCollectPads + * @user_data: user data + * + * A function that will be called while processing a flushing seek event. + * + * The function should flush any internal state of the element and the state of + * all the pads. It should clear only the state not directly managed by the + * @pads object. It is therefore not necessary to call + * gst_collect_pads_set_flushing nor gst_collect_pads_clear from this function. + * + * Since: FIXME + */ +typedef void (*GstCollectPadsFlushFunction) (GstCollectPads *pads, gpointer user_data); + /** * GST_COLLECT_PADS_GET_STREAM_LOCK: * @pads: a #GstCollectPads @@ -311,6 +328,9 @@ void gst_collect_pads_set_compare_function (GstCollectPads *pads, void gst_collect_pads_set_clip_function (GstCollectPads *pads, GstCollectPadsClipFunction clipfunc, gpointer user_data); +void gst_collect_pads_set_flush_function (GstCollectPads *pads, + GstCollectPadsFlushFunction func, + gpointer user_data); /* pad management */ GstCollectData* gst_collect_pads_add_pad (GstCollectPads *pads, GstPad *pad, guint size, @@ -349,6 +369,8 @@ GstFlowReturn gst_collect_pads_clip_running_time (GstCollectPads * pads, /* default handlers */ gboolean gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, GstEvent * event, gboolean discard); +gboolean gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad, + GstEvent * event); gboolean gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data, GstQuery * query, gboolean discard);