X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstcollectpads.c;h=16a239b5d34b1dedf6978ba6bcdf3e206342af4f;hb=a87b4551a6090663a1714f263d4e20fe75eb46ca;hp=d24e03b4859ac33133da3f889298e0b5f9fa2edb;hpb=d1d99af22973dd379d95fa477097576275ec1394;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstcollectpads.c b/libs/gst/base/gstcollectpads.c index d24e03b..16a239b 100644 --- a/libs/gst/base/gstcollectpads.c +++ b/libs/gst/base/gstcollectpads.c @@ -22,62 +22,47 @@ */ /** * SECTION:gstcollectpads + * @title: GstCollectPads * @short_description: manages a set of pads that operate in collect mode * @see_also: * * Manages a set of pads that operate in collect mode. This means that control * is given to the manager of this object when all pads have data. - * - * - * Collectpads are created with gst_collect_pads_new(). A callback should then + * + * * Collectpads are created with gst_collect_pads_new(). A callback should then * be installed with gst_collect_pads_set_function (). - * - * - * Pads are added to the collection with gst_collect_pads_add_pad()/ + * + * * Pads are added to the collection with gst_collect_pads_add_pad()/ * gst_collect_pads_remove_pad(). The pad * has to be a sinkpad. The chain and event functions of the pad are * overridden. The element_private of the pad is used to store * private information for the collectpads. - * - * - * For each pad, data is queued in the _chain function or by + * + * * For each pad, data is queued in the _chain function or by * performing a pull_range. - * - * - * When data is queued on all pads in waiting mode, the callback function is called. - * - * - * Data can be dequeued from the pad with the gst_collect_pads_pop() method. + * + * * When data is queued on all pads in waiting mode, the callback function is called. + * + * * Data can be dequeued from the pad with the gst_collect_pads_pop() method. * One can peek at the data with the gst_collect_pads_peek() function. - * These functions will return NULL if the pad received an EOS event. When all - * pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS + * These functions will return %NULL if the pad received an EOS event. When all + * pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS * event itself. - * - * - * Data can also be dequeued in byte units using the gst_collect_pads_available(), + * + * * Data can also be dequeued in byte units using the gst_collect_pads_available(), * gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls. - * - * - * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in + * + * * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in * their state change functions to start and stop the processing of the collectpads. * The gst_collect_pads_stop() call should be called before calling the parent * element state change function in the PAUSED_TO_READY state change to ensure * no pad is blocked and the element can finish streaming. - * - * - * gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by - * elements that start a #GstTask to drive the collect_pads. This feature is however - * not yet implemented. - * - * - * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode. + * + * * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode. * CollectPads element is not waiting for data to be collected on non-waiting pads. * Thus these pads may but need not have data when the callback is called. * All pads are in waiting mode by default. - * - * * - * Last reviewed on 2011-10-28 (0.10.36) */ #ifdef HAVE_CONFIG_H @@ -132,11 +117,17 @@ struct _GstCollectPadsPrivate gpointer query_user_data; GstCollectPadsClipFunction clip_func; gpointer clip_user_data; + GstCollectPadsFlushFunction flush_func; + gpointer flush_user_data; /* no other lock needed */ GMutex evt_lock; /* these make up sort of poor man's event signaling */ GCond evt_cond; guint32 evt_cookie; + + gboolean seeking; + gboolean pending_flush_start; + gboolean pending_flush_stop; }; static void gst_collect_pads_clear (GstCollectPads * pads, @@ -260,6 +251,13 @@ gst_collect_pads_init (GstCollectPads * pads) g_mutex_init (&pads->priv->evt_lock); g_cond_init (&pads->priv->evt_cond); pads->priv->evt_cookie = 0; + + pads->priv->seeking = FALSE; + pads->priv->pending_flush_start = FALSE; + pads->priv->pending_flush_stop = FALSE; + + /* clear floating flag */ + gst_object_ref_sink (pads); } static void @@ -290,7 +288,7 @@ gst_collect_pads_finalize (GObject * object) * * MT safe. * - * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error. + * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error. */ GstCollectPads * gst_collect_pads_new (void) @@ -314,11 +312,11 @@ gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads, /** * gst_collect_pads_set_buffer_function: * @pads: the collectpads to use - * @func: the function to set + * @func: (scope call): the function to set * @user_data: (closure): user data passed to the function * * Set the callback function and user data that will be called with - * the oldest buffer when all pads have been collected, or NULL on EOS. + * the oldest buffer when all pads have been collected, or %NULL on EOS. * If a buffer is passed, the callback owns a reference and must unref * it. * @@ -339,7 +337,7 @@ gst_collect_pads_set_buffer_function (GstCollectPads * pads, /** * gst_collect_pads_set_compare_function: * @pads: the pads to use - * @func: the function to set + * @func: (scope call): the function to set * @user_data: (closure): user data passed to the function * * Set the timestamp comparison function. @@ -366,7 +364,7 @@ gst_collect_pads_set_compare_function (GstCollectPads * pads, /** * gst_collect_pads_set_function: * @pads: the collectpads to use - * @func: the function to set + * @func: (scope call): the function to set * @user_data: user data passed to the function * * CollectPads provides a default collection algorithm that will determine @@ -426,7 +424,7 @@ unref_data (GstCollectData * data) /** * gst_collect_pads_set_event_function: * @pads: the collectpads to use - * @func: the function to set + * @func: (scope call): the function to set * @user_data: user data passed to the function * * Set the event callback function and user data that will be called when @@ -454,7 +452,7 @@ gst_collect_pads_set_event_function (GstCollectPads * pads, /** * gst_collect_pads_set_query_function: * @pads: the collectpads to use - * @func: the function to set + * @func: (scope call): the function to set * @user_data: user data passed to the function * * Set the query callback function and user data that will be called after @@ -489,42 +487,70 @@ gst_collect_pads_set_query_function (GstCollectPads * pads, * * Convenience clipping function that converts incoming buffer's timestamp * to running time, or clips the buffer if outside configured segment. +* +* Since 1.6, this clipping function also sets the DTS parameter of the +* GstCollectData structure. This version of the running time DTS can be +* negative. G_MININT64 is used to indicate invalid value. */ GstFlowReturn gst_collect_pads_clip_running_time (GstCollectPads * pads, GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf, gpointer user_data) { - GstClockTime time; - *outbuf = buf; - time = GST_BUFFER_PTS (buf); /* invalid left alone and passed */ - if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) { - time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time); - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) { - GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment"); - gst_buffer_unref (buf); - *outbuf = NULL; + if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) { + GstClockTime time; + GstClockTime buf_dts, abs_dts; + gint dts_sign; + + time = GST_BUFFER_PTS (buf); + + if (GST_CLOCK_TIME_IS_VALID (time)) { + time = + gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time); + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) { + GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %" + GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf))); + gst_buffer_unref (buf); + *outbuf = NULL; + return GST_FLOW_OK; + } + } + + GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %" + GST_TIME_FORMAT " running time", + GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time)); + *outbuf = gst_buffer_make_writable (buf); + GST_BUFFER_PTS (*outbuf) = time; + + dts_sign = gst_segment_to_running_time_full (&cdata->segment, + GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts); + buf_dts = GST_BUFFER_DTS (*outbuf); + if (dts_sign > 0) { + GST_BUFFER_DTS (*outbuf) = abs_dts; + GST_COLLECT_PADS_DTS (cdata) = abs_dts; + } else if (dts_sign < 0) { + GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE; + GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts); } else { - GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %" - GST_TIME_FORMAT " running time", - GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time)); - *outbuf = gst_buffer_make_writable (buf); - GST_BUFFER_PTS (*outbuf) = time; - GST_BUFFER_DTS (*outbuf) = gst_segment_to_running_time (&cdata->segment, - GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf)); + GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE; + GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE; } + + GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %" + GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts), + GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata))); } return GST_FLOW_OK; } - /** +/** * gst_collect_pads_set_clip_function: * @pads: the collectpads to use - * @clipfunc: clip function to install + * @clipfunc: (scope call): clip function to install * @user_data: user data to pass to @clip_func * * Install a clipping function that is called right after a buffer is received @@ -542,12 +568,35 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads, } /** + * gst_collect_pads_set_flush_function: + * @pads: the collectpads to use + * @func: (scope call): flush function to install + * @user_data: user data to pass to @func + * + * Install a flush function that is called when the internal + * state of all pads should be flushed as part of flushing seek + * handling. See #GstCollectPadsFlushFunction for more info. + * + * Since: 1.4 + */ +void +gst_collect_pads_set_flush_function (GstCollectPads * pads, + GstCollectPadsFlushFunction func, gpointer user_data) +{ + g_return_if_fail (pads != NULL); + g_return_if_fail (GST_IS_COLLECT_PADS (pads)); + + pads->priv->flush_func = func; + pads->priv->flush_user_data = user_data; +} + +/** * gst_collect_pads_add_pad: * @pads: the collectpads to use * @pad: (transfer none): the pad to add * @size: the size of the returned #GstCollectData structure - * @destroy_notify: function to be called before the returned #GstCollectData - * structure is freed + * @destroy_notify: (scope async): function to be called before the returned + * #GstCollectData structure is freed * @lock: whether to lock this pad in usual waiting state * * Add a pad to the collection of collect pads. The pad has to be @@ -576,8 +625,8 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads, * * MT safe. * - * Returns: a new #GstCollectData to identify the new pad. Or NULL - * if wrong parameters are supplied. + * Returns: (nullable) (transfer none): a new #GstCollectData to identify the + * new pad. Or %NULL if wrong parameters are supplied. */ GstCollectData * gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size, @@ -604,6 +653,7 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size, data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0; data->priv->refcount = 1; data->priv->destroy_notify = destroy_notify; + data->ABI.abi.dts = G_MININT64; GST_OBJECT_LOCK (pads); GST_OBJECT_LOCK (pad); @@ -726,7 +776,7 @@ unknown_pad: } /* - * Must be called with STREAM_LOCK. + * Must be called with STREAM_LOCK and OBJECT_LOCK. */ static void gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads, @@ -735,7 +785,7 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads, GSList *walk = NULL; /* Update the pads flushing flag */ - for (walk = pads->data; walk; walk = g_slist_next (walk)) { + for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) { GstCollectData *cdata = walk->data; if (GST_IS_PAD (cdata->pad)) { @@ -778,7 +828,9 @@ gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing) /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */ GST_COLLECT_PADS_STREAM_LOCK (pads); + GST_OBJECT_LOCK (pads); gst_collect_pads_set_flushing_unlocked (pads, flushing); + GST_OBJECT_UNLOCK (pads); GST_COLLECT_PADS_STREAM_UNLOCK (pads); } @@ -892,8 +944,8 @@ gst_collect_pads_stop (GstCollectPads * pads) * * MT safe. * - * Returns: The buffer in @data or NULL if no buffer is queued. - * should unref the buffer after usage. + * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no + * buffer is queued. should unref the buffer after usage. */ GstBuffer * gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data) @@ -907,7 +959,7 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data) if ((result = data->buffer)) gst_buffer_ref (result); - GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p", + GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%" GST_PTR_FORMAT, GST_DEBUG_PAD_NAME (data->pad), result); return result; @@ -924,8 +976,8 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data) * * MT safe. * - * Returns: (transfer full): The buffer in @data or NULL if no buffer was - * queued. You should unref the buffer after usage. + * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no + * buffer was queued. You should unref the buffer after usage. */ GstBuffer * gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data) @@ -946,7 +998,7 @@ gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data) GST_COLLECT_PADS_EVT_BROADCAST (pads); - GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p", + GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%" GST_PTR_FORMAT, GST_DEBUG_PAD_NAME (data->pad), result); return result; @@ -1091,15 +1143,15 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data, * * MT safe. * - * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested. - * A return of NULL signals that the pad is end-of-stream. - * Unref the buffer after use. + * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can + * be less that requested. A return of %NULL signals that the pad is + * end-of-stream. Unref the buffer after use. */ GstBuffer * gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data, guint size) { - guint readsize; + guint readsize, buf_size; GstBuffer *buffer; g_return_val_if_fail (pads != NULL, NULL); @@ -1110,7 +1162,8 @@ gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data, if ((buffer = data->buffer) == NULL) return NULL; - readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos); + buf_size = gst_buffer_get_size (buffer); + readsize = MIN (size, buf_size - data->pos); return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos, readsize); @@ -1130,9 +1183,9 @@ gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data, * * MT safe. * - * Returns: A sub buffer. The size of the buffer can be less that requested. - * A return of NULL signals that the pad is end-of-stream. - * Unref the buffer after use. + * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can + * be less that requested. A return of %NULL signals that the pad is + * end-of-stream. Unref the buffer after use. */ GstBuffer * gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data, @@ -1282,7 +1335,13 @@ gst_collect_pads_check_collected (GstCollectPads * pads) GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s", pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func)); - flow_ret = func (pads, user_data); + if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking, + TRUE, FALSE))) { + GST_INFO_OBJECT (pads, "finished seeking"); + } + do { + flow_ret = func (pads, user_data); + } while (flow_ret == GST_FLOW_OK); } else { gboolean collected = FALSE; @@ -1294,6 +1353,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads) pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func)); + if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking, + TRUE, FALSE))) { + GST_INFO_OBJECT (pads, "finished seeking"); + } flow_ret = func (pads, user_data); collected = TRUE; @@ -1324,7 +1387,7 @@ gst_collect_pads_check_collected (GstCollectPads * pads) * * Must be called with STREAM_LOCK. * - * Returns TRUE if a pad was set to waiting + * Returns %TRUE if a pad was set to waiting * (from non-waiting state). */ static gboolean @@ -1407,10 +1470,7 @@ gst_collect_pads_find_best_pad (GstCollectPads * pads, buffer = gst_collect_pads_peek (pads, data); /* if we have a buffer check if it is better then the current best one */ if (buffer != NULL) { - timestamp = GST_BUFFER_DTS (buffer); - if (!GST_CLOCK_TIME_IS_VALID (timestamp)) { - timestamp = GST_BUFFER_PTS (buffer); - } + timestamp = GST_BUFFER_DTS_OR_PTS (buffer); gst_buffer_unref (buffer); if (best == NULL || pads->priv->compare_func (pads, data, timestamp, best, best_time, pads->priv->compare_user_data) < 0) { @@ -1575,7 +1635,7 @@ gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data, if (pads->priv->clip_func) { in = gst_buffer_new (); GST_BUFFER_PTS (in) = time; - GST_BUFFER_DTS (in) = time; + GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE; pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data); if (out) { otime = GST_BUFFER_PTS (out); @@ -1597,7 +1657,7 @@ gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data, * @event: event being processed * @discard: process but do not send event downstream * - * Default GstCollectPads event handling that elements should always + * Default #GstCollectPads event handling that elements should always * chain up to to ensure proper operation. Element might however indicate * event should not be forwarded downstream. */ @@ -1620,33 +1680,59 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { - /* forward event to unblock check_collected */ - GST_DEBUG_OBJECT (pad, "forwarding flush start"); - res = gst_pad_event_default (pad, parent, event); - event = NULL; - - /* now unblock the chain function. - * no cond per pad, so they all unblock, - * non-flushing block again */ - GST_COLLECT_PADS_STREAM_LOCK (pads); - GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING); - gst_collect_pads_clear (pads, data); - - /* cater for possible default muxing functionality */ - if (buffer_func) { - /* restore to initial state */ - gst_collect_pads_set_waiting (pads, data, TRUE); - /* if the current pad is affected, reset state, recalculate later */ - if (pads->priv->earliest_data == data) { - unref_data (data); - pads->priv->earliest_data = NULL; - pads->priv->earliest_time = GST_CLOCK_TIME_NONE; + if (g_atomic_int_get (&pads->priv->seeking)) { + /* drop all but the first FLUSH_STARTs when seeking */ + if (!g_atomic_int_compare_and_exchange (&pads-> + priv->pending_flush_start, TRUE, FALSE)) + goto eat; + + /* unblock collect pads */ + gst_pad_event_default (pad, parent, event); + event = NULL; + + GST_COLLECT_PADS_STREAM_LOCK (pads); + /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we + * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to + * non-flushing (which happens in gst_collect_pads_event_default). + */ + gst_collect_pads_set_flushing (pads, TRUE); + + if (pads->priv->flush_func) + pads->priv->flush_func (pads, pads->priv->flush_user_data); + + g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE); + GST_COLLECT_PADS_STREAM_UNLOCK (pads); + + goto eat; + } else { + /* forward event to unblock check_collected */ + GST_DEBUG_OBJECT (pad, "forwarding flush start"); + res = gst_pad_event_default (pad, parent, event); + event = NULL; + + /* now unblock the chain function. + * no cond per pad, so they all unblock, + * non-flushing block again */ + GST_COLLECT_PADS_STREAM_LOCK (pads); + GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING); + gst_collect_pads_clear (pads, data); + + /* cater for possible default muxing functionality */ + if (buffer_func) { + /* restore to initial state */ + gst_collect_pads_set_waiting (pads, data, TRUE); + /* if the current pad is affected, reset state, recalculate later */ + if (pads->priv->earliest_data == data) { + unref_data (data); + pads->priv->earliest_data = NULL; + pads->priv->earliest_time = GST_CLOCK_TIME_NONE; + } } - } - GST_COLLECT_PADS_STREAM_UNLOCK (pads); + GST_COLLECT_PADS_STREAM_UNLOCK (pads); - goto eat; + goto eat; + } } case GST_EVENT_FLUSH_STOP: { @@ -1664,12 +1750,22 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) pads->priv->queuedpads++; - pads->priv->eospads--; + if (!g_atomic_int_get (&pads->priv->seeking)) { + pads->priv->eospads--; + } GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS); } GST_COLLECT_PADS_STREAM_UNLOCK (pads); - goto forward; + if (g_atomic_int_get (&pads->priv->seeking)) { + if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop, + TRUE, FALSE)) + goto forward; + else + goto eat; + } else { + goto forward; + } } case GST_EVENT_EOS: { @@ -1712,7 +1808,8 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT); /* now we can use for e.g. running time */ - seg.position = gst_collect_pads_clip_time (pads, data, seg.start); + seg.position = + gst_collect_pads_clip_time (pads, data, seg.start + seg.offset); /* update again */ data->segment = seg; @@ -1735,6 +1832,7 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, GST_COLLECT_PADS_STREAM_LOCK (pads); gst_event_parse_gap (event, &start, &duration); + /* FIXME, handle reverse playback case */ if (GST_CLOCK_TIME_IS_VALID (duration)) start += duration; /* we do not expect another buffer until after gap, @@ -1770,6 +1868,99 @@ forward: return gst_pad_event_default (pad, parent, event); } +typedef struct +{ + GstEvent *event; + gboolean result; +} EventData; + +static gboolean +event_forward_func (GstPad * pad, EventData * data) +{ + gboolean ret = TRUE; + GstPad *peer = gst_pad_get_peer (pad); + + if (peer) { + ret = gst_pad_send_event (peer, gst_event_ref (data->event)); + gst_object_unref (peer); + } + + data->result &= ret; + /* Always send to all pads */ + return FALSE; +} + +static gboolean +forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event) +{ + EventData data; + + data.event = event; + data.result = TRUE; + + gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data); + + gst_event_unref (event); + + return data.result; +} + +/** + * gst_collect_pads_src_event_default: + * @pads: the #GstCollectPads to use + * @pad: src #GstPad that received the event + * @event: event being processed + * + * Default #GstCollectPads event handling for the src pad of elements. + * Elements can chain up to this to let flushing seek event handling + * be done by #GstCollectPads. + * + * Since: 1.4 + */ +gboolean +gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad, + GstEvent * event) +{ + GstObject *parent; + gboolean res = TRUE; + + parent = GST_OBJECT_PARENT (pad); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK:{ + GstSeekFlags flags; + + pads->priv->eospads = 0; + + GST_INFO_OBJECT (pads, "starting seek"); + + gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL); + if (flags & GST_SEEK_FLAG_FLUSH) { + g_atomic_int_set (&pads->priv->seeking, TRUE); + g_atomic_int_set (&pads->priv->pending_flush_start, TRUE); + /* forward the seek upstream */ + res = forward_event_to_all_sinkpads (pad, event); + event = NULL; + if (!res) { + g_atomic_int_set (&pads->priv->seeking, FALSE); + g_atomic_int_set (&pads->priv->pending_flush_start, FALSE); + } + } + + GST_INFO_OBJECT (pads, "seek done, result: %d", res); + + break; + } + default: + break; + } + + if (event) + res = gst_pad_event_default (pad, parent, event); + + return res; +} + static gboolean gst_collect_pads_event_default_internal (GstCollectPads * pads, GstCollectData * data, GstEvent * event, gpointer user_data) @@ -1837,7 +2028,7 @@ pad_removed: * @query: query being processed * @discard: process but do not send event downstream * - * Default GstCollectPads query handling that elements should always + * Default #GstCollectPads query handling that elements should always * chain up to to ensure proper operation. Element might however indicate * query should not be forwarded downstream. */ @@ -1938,7 +2129,7 @@ pad_removed: * and if so we call the collected function. When this is done we check if * data has been unqueued. If data is still queued we wait holding the stream * lock to make sure no EOS event can happen while we are ready to be - * collected + * collected */ static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) @@ -2004,9 +2195,7 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) { GstClockTime timestamp; - timestamp = GST_BUFFER_DTS (buffer); - if (!GST_CLOCK_TIME_IS_VALID (timestamp)) - timestamp = GST_BUFFER_PTS (buffer); + timestamp = GST_BUFFER_DTS_OR_PTS (buffer); if (GST_CLOCK_TIME_IS_VALID (timestamp)) data->segment.position = timestamp;