X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstbasesrc.c;h=e7e381536b72846ce74956c2739ded58407bf77b;hb=5bf13cdd5314bc3c6c81bd620e712acdcab14eb2;hp=bedb88e211697b01984c109c3a8dfefc42377362;hpb=d3ffa8263901b7e3a9936235f48b1203838edd0c;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index bedb88e..e7e3815 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -16,54 +16,44 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** * SECTION:gstbasesrc + * @title: GstBaseSrc * @short_description: Base class for getrange based source elements * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink * - * This is a generice base class for source elements. The following + * This is a generic base class for source elements. The following * types of sources are supported: - * - * random access sources like files - * seekable sources - * live sources - * + * + * * random access sources like files + * * seekable sources + * * live sources * * The source can be configured to operate in any #GstFormat with the * gst_base_src_set_format() method. The currently set format determines - * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT - * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES. + * the format of the internal #GstSegment and any %GST_EVENT_SEGMENT + * events. The default format for #GstBaseSrc is %GST_FORMAT_BYTES. * * #GstBaseSrc always supports push mode scheduling. If the following * conditions are met, it also supports pull mode scheduling: - * - * The format is set to #GST_FORMAT_BYTES (default). - * - * #GstBaseSrcClass.is_seekable() returns %TRUE. - * - * + * + * * The format is set to %GST_FORMAT_BYTES (default). + * * #GstBaseSrcClass.is_seekable() returns %TRUE. * * If all the conditions are met for operating in pull mode, #GstBaseSrc is * automatically seekable in push mode as well. The following conditions must * be met to make the element seekable in push mode when the format is not - * #GST_FORMAT_BYTES: - * - * - * #GstBaseSrcClass.is_seekable() returns %TRUE. - * - * - * #GstBaseSrcClass.query() can convert all supported seek formats to the - * internal format as set with gst_base_src_set_format(). - * - * - * #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns - * %TRUE. - * - * + * %GST_FORMAT_BYTES: + * + * * #GstBaseSrcClass.is_seekable() returns %TRUE. + * * #GstBaseSrcClass.query() can convert all supported seek formats to the + * internal format as set with gst_base_src_set_format(). + * * #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns + * %TRUE. * * When the element does not meet the requirements to operate in pull mode, the * offset and length in the #GstBaseSrcClass.create() method should be ignored. @@ -82,7 +72,7 @@ * #GstBaseSrcClass.create() method will not be called in PAUSED but only in * PLAYING. To signal the pipeline that the element will not produce data, the * return value from the READY to PAUSED state will be - * #GST_STATE_CHANGE_NO_PREROLL. + * %GST_STATE_CHANGE_NO_PREROLL. * * A typical live source will timestamp the buffers it creates with the * current running time of the pipeline. This is one reason why a live source @@ -112,23 +102,25 @@ * There is only support in #GstBaseSrc for exactly one source pad, which * should be named "src". A source implementation (subclass of #GstBaseSrc) * should install a pad template in its class_init function, like so: - * |[ + * |[ * static void * my_element_class_init (GstMyElementClass *klass) * { * GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); * // srctemplate should be a #GstStaticPadTemplate with direction - * // #GST_PAD_SRC and name "src" - * gst_element_class_add_pad_template (gstelement_class, - * gst_static_pad_template_get (&srctemplate)); - * // see #GstElementDetails - * gst_element_class_set_details (gstelement_class, &details); + * // %GST_PAD_SRC and name "src" + * gst_element_class_add_static_pad_template (gstelement_class, &srctemplate); + * + * gst_element_class_set_static_metadata (gstelement_class, + * "Source name", + * "Source", + * "My Source element", + * "The author "); * } * ]| * - * - * Controlled shutdown of live sources in applications - * + * ## Controlled shutdown of live sources in applications + * * Applications that record from a live source may want to stop recording * in a controlled way, so that the recording is stopped, but the data * already in the pipeline is processed to the end (remember that many live @@ -140,16 +132,13 @@ * * An application may send an EOS event to a source element to make it * perform the EOS logic (send EOS event downstream or post a - * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done + * %GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done * with the gst_element_send_event() function on the element or its parent bin. * * After the EOS has been sent to the element, the application should wait for * an EOS message to be posted on the pipeline's bus. Once this EOS message is * received, it may safely shut down the entire pipeline. * - * Last reviewed on 2007-12-19 (0.10.16) - * - * */ #ifdef HAVE_CONFIG_H @@ -163,7 +152,6 @@ #include #include "gstbasesrc.h" -#include "gsttypefindhelper.h" #include GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug); @@ -179,6 +167,18 @@ GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug); #define GST_LIVE_SIGNAL(elem) g_cond_signal (GST_LIVE_GET_COND (elem)); #define GST_LIVE_BROADCAST(elem) g_cond_broadcast (GST_LIVE_GET_COND (elem)); + +#define GST_ASYNC_GET_COND(elem) (&GST_BASE_SRC_CAST(elem)->priv->async_cond) +#define GST_ASYNC_WAIT(elem) g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem)) +#define GST_ASYNC_SIGNAL(elem) g_cond_signal (GST_ASYNC_GET_COND (elem)); + +#define CLEAR_PENDING_EOS(bsrc) \ + G_STMT_START { \ + g_atomic_int_set (&bsrc->priv->has_pending_eos, FALSE); \ + gst_event_replace (&bsrc->priv->pending_eos, NULL); \ + } G_STMT_END + + /* BaseSrc signals and args */ enum { @@ -188,7 +188,6 @@ enum #define DEFAULT_BLOCKSIZE 4096 #define DEFAULT_NUM_BUFFERS -1 -#define DEFAULT_TYPEFIND FALSE #define DEFAULT_DO_TIMESTAMP FALSE enum @@ -196,60 +195,79 @@ enum PROP_0, PROP_BLOCKSIZE, PROP_NUM_BUFFERS, +#ifndef GST_REMOVE_DEPRECATED PROP_TYPEFIND, +#endif PROP_DO_TIMESTAMP }; -#define GST_BASE_SRC_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate)) - +/* The basesrc implementation need to respect the following locking order: + * 1. STREAM_LOCK + * 2. LIVE_LOCK + * 3. OBJECT_LOCK + */ struct _GstBaseSrcPrivate { - gboolean discont; - gboolean flushing; + gboolean discont; /* STREAM_LOCK */ + gboolean flushing; /* LIVE_LOCK */ - GstFlowReturn start_result; - gboolean async; + GstFlowReturn start_result; /* OBJECT_LOCK */ + gboolean async; /* OBJECT_LOCK */ /* if a stream-start event should be sent */ - gboolean stream_start_pending; + gboolean stream_start_pending; /* STREAM_LOCK */ - /* if segment should be sent */ - gboolean segment_pending; + /* if segment should be sent and a + * seqnum if it was originated by a seek */ + gboolean segment_pending; /* OBJECT_LOCK */ + guint32 segment_seqnum; /* OBJECT_LOCK */ /* if EOS is pending (atomic) */ - gint pending_eos; + GstEvent *pending_eos; /* OBJECT_LOCK */ + gint has_pending_eos; /* atomic */ + + /* if the eos was caused by a forced eos from the application */ + gboolean forced_eos; /* LIVE_LOCK */ /* startup latency is the time it takes between going to PLAYING and producing * the first BUFFER with running_time 0. This value is included in the latency * reporting. */ - GstClockTime latency; + GstClockTime latency; /* OBJECT_LOCK */ /* timestamp offset, this is the offset add to the values of gst_times for * pseudo live sources */ - GstClockTimeDiff ts_offset; + GstClockTimeDiff ts_offset; /* OBJECT_LOCK */ - gboolean do_timestamp; - volatile gint dynamic_size; + gboolean do_timestamp; /* OBJECT_LOCK */ + volatile gint dynamic_size; /* atomic */ + volatile gint automatic_eos; /* atomic */ /* stream sequence number */ - guint32 seqnum; + guint32 seqnum; /* STREAM_LOCK */ /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be * pushed in the data stream */ - GList *pending_events; - volatile gint have_events; + GList *pending_events; /* OBJECT_LOCK */ + volatile gint have_events; /* OBJECT_LOCK */ /* QoS *//* with LOCK */ - gboolean qos_enabled; - gdouble proportion; - GstClockTime earliest_time; + gdouble proportion; /* OBJECT_LOCK */ + GstClockTime earliest_time; /* OBJECT_LOCK */ - GstBufferPool *pool; - GstAllocator *allocator; - GstAllocationParams params; + GstBufferPool *pool; /* OBJECT_LOCK */ + GstAllocator *allocator; /* OBJECT_LOCK */ + GstAllocationParams params; /* OBJECT_LOCK */ + + GCond async_cond; /* OBJECT_LOCK */ + + /* for _submit_buffer_list() */ + GstBufferList *pending_bufferlist; }; +#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \ + ((src)->priv->pending_bufferlist != NULL) + static GstElementClass *parent_class = NULL; +static gint private_offset = 0; static void gst_base_src_class_init (GstBaseSrcClass * klass); static void gst_base_src_init (GstBaseSrc * src, gpointer g_class); @@ -277,11 +295,21 @@ gst_base_src_get_type (void) _type = g_type_register_static (GST_TYPE_ELEMENT, "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT); + + private_offset = + g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate)); + g_once_init_leave (&base_src_type, _type); } return base_src_type; } +static inline GstBaseSrcPrivate * +gst_base_src_get_instance_private (GstBaseSrc * self) +{ + return (G_STRUCT_MEMBER_P (self, private_offset)); +} + static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter); static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps); @@ -302,8 +330,8 @@ static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event); static gboolean gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query); -static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc, - gboolean active); +static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, + gboolean flushing); static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc); static gboolean gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment); @@ -318,7 +346,7 @@ static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query); static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc, - gboolean flushing, gboolean live_play, gboolean * playing); + gboolean flushing); static gboolean gst_base_src_start (GstBaseSrc * basesrc); static gboolean gst_base_src_stop (GstBaseSrc * basesrc); @@ -334,7 +362,7 @@ static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset, static gboolean gst_base_src_seekable (GstBaseSrc * src); static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc); static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset, - guint * length); + guint * length, gboolean force); static void gst_base_src_class_init (GstBaseSrcClass * klass) @@ -345,9 +373,10 @@ gst_base_src_class_init (GstBaseSrcClass * klass) gobject_class = G_OBJECT_CLASS (klass); gstelement_class = GST_ELEMENT_CLASS (klass); - GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element"); + if (private_offset != 0) + g_type_class_adjust_private_offset (klass, &private_offset); - g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate)); + GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element"); parent_class = g_type_class_peek_parent (klass); @@ -364,10 +393,12 @@ gst_base_src_class_init (GstBaseSrcClass * klass) "Number of buffers to output before sending EOS (-1 = unlimited)", -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#ifndef GST_REMOVE_DEPRECATED g_object_class_install_property (gobject_class, PROP_TYPEFIND, g_param_spec_boolean ("typefind", "Typefind", - "Run typefind before negotiating", DEFAULT_TYPEFIND, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + "Run typefind before negotiating (deprecated, non-functional)", FALSE, + G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS)); +#endif g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP, g_param_spec_boolean ("do-timestamp", "Do timestamp", "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP, @@ -404,13 +435,14 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) GstPad *pad; GstPadTemplate *pad_template; - basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc); + basesrc->priv = gst_base_src_get_instance_private (basesrc); basesrc->is_live = FALSE; g_mutex_init (&basesrc->live_lock); g_cond_init (&basesrc->live_cond); basesrc->num_buffers = DEFAULT_NUM_BUFFERS; basesrc->num_buffers_left = -1; + basesrc->priv->automatic_eos = TRUE; basesrc->can_activate_push = TRUE; @@ -436,10 +468,10 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->clock_id = NULL; /* we operate in BYTES by default */ gst_base_src_set_format (basesrc, GST_FORMAT_BYTES); - basesrc->typefind = DEFAULT_TYPEFIND; basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP; g_atomic_int_set (&basesrc->priv->have_events, FALSE); + g_cond_init (&basesrc->priv->async_cond); basesrc->priv->start_result = GST_FLOW_FLUSHING; GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED); GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); @@ -458,6 +490,7 @@ gst_base_src_finalize (GObject * object) g_mutex_clear (&basesrc->live_lock); g_cond_clear (&basesrc->live_cond); + g_cond_clear (&basesrc->priv->async_cond); event_p = &basesrc->pending_seek; gst_event_replace (event_p, NULL); @@ -471,6 +504,31 @@ gst_base_src_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +/* Call with LIVE_LOCK held */ +static GstFlowReturn +gst_base_src_wait_playing_unlocked (GstBaseSrc * src) +{ + while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) { + /* block until the state changes, or we get a flush, or something */ + GST_DEBUG_OBJECT (src, "live source waiting for running state"); + GST_LIVE_WAIT (src); + GST_DEBUG_OBJECT (src, "live source unlocked"); + } + + if (src->priv->flushing) + goto flushing; + + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (src, "we are flushing"); + return GST_FLOW_FLUSHING; + } +} + + /** * gst_base_src_wait_playing: * @src: the src @@ -480,35 +538,25 @@ gst_base_src_finalize (GObject * object) * and call this method before continuing to produce the remaining data. * * This function will block until a state change to PLAYING happens (in which - * case this function returns #GST_FLOW_OK) or the processing must be stopped due + * case this function returns %GST_FLOW_OK) or the processing must be stopped due * to a state change to READY or a FLUSH event (in which case this function - * returns #GST_FLOW_FLUSHING). + * returns %GST_FLOW_FLUSHING). * - * Returns: #GST_FLOW_OK if @src is PLAYING and processing can + * Returns: %GST_FLOW_OK if @src is PLAYING and processing can * continue. Any other return value should be returned from the create vmethod. */ GstFlowReturn gst_base_src_wait_playing (GstBaseSrc * src) { - g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR); + GstFlowReturn ret; - do { - /* block until the state changes, or we get a flush, or something */ - GST_DEBUG_OBJECT (src, "live source waiting for running state"); - GST_LIVE_WAIT (src); - GST_DEBUG_OBJECT (src, "live source unlocked"); - if (src->priv->flushing) - goto flushing; - } while (G_UNLIKELY (!src->live_running)); + g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR); - return GST_FLOW_OK; + GST_LIVE_LOCK (src); + ret = gst_base_src_wait_playing_unlocked (src); + GST_LIVE_UNLOCK (src); - /* ERRORS */ -flushing: - { - GST_DEBUG_OBJECT (src, "we are flushing"); - return GST_FLOW_FLUSHING; - } + return ret; } /** @@ -563,10 +611,10 @@ gst_base_src_is_live (GstBaseSrc * src) * @format: the format to use * * Sets the default format of the source. This will be the format used - * for sending NEW_SEGMENT events and for performing seeks. + * for sending SEGMENT events and for performing seeks. * * If a format of GST_FORMAT_BYTES is set, the element will be able to - * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE. + * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns %TRUE. * * This function must only be called in states < %GST_STATE_PAUSED. */ @@ -599,6 +647,32 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic) } /** + * gst_base_src_set_automatic_eos: + * @src: base source instance + * @automatic_eos: automatic eos + * + * If @automatic_eos is %TRUE, @src will automatically go EOS if a buffer + * after the total size is returned. By default this is %TRUE but sources + * that can't return an authoritative size and only know that they're EOS + * when trying to read more should set this to %FALSE. + * + * When @src operates in %GST_FORMAT_TIME, #GstBaseSrc will send an EOS + * when a buffer outside of the currently configured segment is pushed if + * @automatic_eos is %TRUE. Since 1.16, if @automatic_eos is %FALSE an + * EOS will be pushed only when the #GstBaseSrc.create implementation + * returns %GST_FLOW_EOS. + * + * Since: 1.4 + */ +void +gst_base_src_set_automatic_eos (GstBaseSrc * src, gboolean automatic_eos) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + + g_atomic_int_set (&src->priv->automatic_eos, automatic_eos); +} + +/** * gst_base_src_set_async: * @src: base source instance * @async: new async mode @@ -648,14 +722,14 @@ gst_base_src_is_async (GstBaseSrc * src) * @min_latency: (out) (allow-none): the min latency of the source * @max_latency: (out) (allow-none): the max latency of the source * - * Query the source for the latency parameters. @live will be TRUE when @src is - * configured as a live source. @min_latency will be set to the difference - * between the running time and the timestamp of the first buffer. - * @max_latency is always the undefined value of -1. + * Query the source for the latency parameters. @live will be %TRUE when @src is + * configured as a live source. @min_latency and @max_latency will be set + * to the difference between the running time and the timestamp of the first + * buffer. * * This function is mostly used by subclasses. * - * Returns: TRUE if the query succeeded. + * Returns: %TRUE if the query succeeded. */ gboolean gst_base_src_query_latency (GstBaseSrc * src, gboolean * live, @@ -680,11 +754,11 @@ gst_base_src_query_latency (GstBaseSrc * src, gboolean * live, if (min_latency) *min_latency = min; if (max_latency) - *max_latency = -1; + *max_latency = min; GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min), - GST_TIME_ARGS (-1)); + GST_TIME_ARGS (min)); GST_OBJECT_UNLOCK (src); return TRUE; @@ -747,6 +821,8 @@ gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp) GST_OBJECT_LOCK (src); src->priv->do_timestamp = timestamp; + if (timestamp && src->segment.format != GST_FORMAT_TIME) + gst_segment_init (&src->segment, GST_FORMAT_TIME); GST_OBJECT_UNLOCK (src); } @@ -777,7 +853,7 @@ gst_base_src_get_do_timestamp (GstBaseSrc * src) * @src: The source * @start: The new start value for the segment * @stop: Stop value for the new segment - * @position: The position value for the new segent + * @time: The new time value for the start of the new segment * * Prepare a new seamless segment for emission downstream. This function must * only be called by derived sub-classes, and only from the create() function, @@ -790,25 +866,28 @@ gst_base_src_get_do_timestamp (GstBaseSrc * src) */ gboolean gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop, - gint64 position) + gint64 time) { gboolean res = TRUE; - GST_DEBUG_OBJECT (src, - "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %" - GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start), - GST_TIME_ARGS (stop), GST_TIME_ARGS (position)); - GST_OBJECT_LOCK (src); src->segment.base = gst_segment_to_running_time (&src->segment, src->segment.format, src->segment.position); - src->segment.start = start; + src->segment.position = src->segment.start = start; src->segment.stop = stop; - src->segment.position = position; + src->segment.time = time; - /* forward, we send data from position to stop */ + /* Mark pending segment. Will be sent before next data */ src->priv->segment_pending = TRUE; + src->priv->segment_seqnum = gst_util_seqnum_next (); + + GST_DEBUG_OBJECT (src, + "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %" + GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT, + GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time), + GST_TIME_ARGS (src->segment.base)); + GST_OBJECT_UNLOCK (src); src->priv->discont = TRUE; @@ -817,14 +896,26 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop, return res; } +/* called with STREAM_LOCK */ static gboolean gst_base_src_send_stream_start (GstBaseSrc * src) { gboolean ret = TRUE; if (src->priv->stream_start_pending) { - ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ()); + gchar *stream_id; + GstEvent *event; + + stream_id = + gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL); + + GST_DEBUG_OBJECT (src, "Pushing STREAM_START"); + event = gst_event_new_stream_start (stream_id); + gst_event_set_group_id (event, gst_util_group_id_next ()); + + ret = gst_pad_push_event (src->srcpad, event); src->priv->stream_start_pending = FALSE; + g_free (stream_id); } return ret; @@ -833,7 +924,7 @@ gst_base_src_send_stream_start (GstBaseSrc * src) /** * gst_base_src_set_caps: * @src: a #GstBaseSrc - * @caps: a #GstCaps + * @caps: (transfer none): a #GstCaps * * Set new caps on the basesrc source pad. * @@ -844,15 +935,27 @@ gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps) { GstBaseSrcClass *bclass; gboolean res = TRUE; + GstCaps *current_caps; bclass = GST_BASE_SRC_GET_CLASS (src); gst_base_src_send_stream_start (src); - if (bclass->set_caps) - res = bclass->set_caps (src, caps); - if (res) - res = gst_pad_set_caps (src->srcpad, caps); + current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (src)); + if (current_caps && gst_caps_is_equal (current_caps, caps)) { + GST_DEBUG_OBJECT (src, "New caps equal to old ones: %" GST_PTR_FORMAT, + caps); + res = TRUE; + } else { + if (bclass->set_caps) + res = bclass->set_caps (src, caps); + + if (res) + res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps)); + } + + if (current_caps) + gst_caps_unref (current_caps); return res; } @@ -964,7 +1067,9 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) } else res = TRUE; - gst_query_set_position (query, format, position); + if (res) + gst_query_set_position (query, format, position); + break; } } @@ -992,8 +1097,8 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) guint length = 0; /* may have to refresh duration */ - if (g_atomic_int_get (&src->priv->dynamic_size)) - gst_base_src_update_length (src, 0, &length); + gst_base_src_update_length (src, 0, &length, + g_atomic_int_get (&src->priv->dynamic_size)); /* this is the duration as configured by the subclass. */ GST_OBJECT_LOCK (src); @@ -1017,7 +1122,10 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) * means that we cannot report the duration at all. */ res = TRUE; } - gst_query_set_duration (query, format, duration); + + if (res) + gst_query_set_duration (query, format, duration); + break; } } @@ -1040,7 +1148,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) gst_base_src_seekable (src), 0, duration); res = TRUE; } else { - /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */ + /* FIXME 2.0: return TRUE + seekable=FALSE for SEEKING query here */ /* Don't reply to the query to make up for demuxers which don't * handle the SEEKING query yet. Players like Totem will fall back * to the duration when the SEEKING query isn't answered. */ @@ -1050,23 +1158,23 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) } case GST_QUERY_SEGMENT: { + GstFormat format; gint64 start, stop; GST_OBJECT_LOCK (src); - /* no end segment configured, current duration then */ + + format = src->segment.format; + + start = + gst_segment_to_stream_time (&src->segment, format, + src->segment.start); if ((stop = src->segment.stop) == -1) stop = src->segment.duration; - start = src->segment.start; + else + stop = gst_segment_to_stream_time (&src->segment, format, stop); - /* adjust to stream time */ - if (src->segment.time != -1) { - start -= src->segment.time; - if (stop != -1) - stop -= src->segment.time; - } + gst_query_set_segment (query, src->segment.rate, format, start, stop); - gst_query_set_segment (query, src->segment.rate, src->segment.format, - start, stop); GST_OBJECT_UNLOCK (src); res = TRUE; break; @@ -1193,6 +1301,22 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) res = FALSE; break; } + case GST_QUERY_URI:{ + if (GST_IS_URI_HANDLER (src)) { + gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src)); + + if (uri != NULL) { + gst_query_set_uri (query, uri); + g_free (uri); + res = TRUE; + } else { + res = FALSE; + } + } else { + res = FALSE; + } + break; + } default: res = FALSE; break; @@ -1246,6 +1370,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment) bclass = GST_BASE_SRC_GET_CLASS (src); + GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment); + if (bclass->do_seek) result = bclass->do_seek (src, segment); @@ -1265,8 +1391,8 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, * seek format, adjust by the relative seek offset and then convert back to * the processing format */ - GstSeekType cur_type, stop_type; - gint64 cur, stop; + GstSeekType start_type, stop_type; + gint64 start, stop; GstSeekFlags flags; GstFormat seek_format, dest_format; gdouble rate; @@ -1274,25 +1400,25 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, gboolean res = TRUE; gst_event_parse_seek (event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); + &start_type, &start, &stop_type, &stop); dest_format = segment->format; if (seek_format == dest_format) { gst_segment_do_seek (segment, rate, seek_format, flags, - cur_type, cur, stop_type, stop, &update); + start_type, start, stop_type, stop, &update); return TRUE; } - if (cur_type != GST_SEEK_TYPE_NONE) { - /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */ + if (start_type != GST_SEEK_TYPE_NONE) { + /* FIXME: Handle seek_end by converting the input segment vals */ res = - gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format, - &cur); - cur_type = GST_SEEK_TYPE_SET; + gst_pad_query_convert (src->srcpad, seek_format, start, dest_format, + &start); + start_type = GST_SEEK_TYPE_SET; } if (res && stop_type != GST_SEEK_TYPE_NONE) { - /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */ + /* FIXME: Handle seek_end by converting the input segment vals */ res = gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format, &stop); @@ -1300,7 +1426,7 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, } /* And finally, configure our output segment in the desired format */ - gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur, + gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start, stop_type, stop, &update); if (!res) @@ -1336,11 +1462,23 @@ gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset, { GstFlowReturn ret; GstBaseSrcPrivate *priv = src->priv; + GstBufferPool *pool = NULL; + GstAllocator *allocator = NULL; + GstAllocationParams params; + GST_OBJECT_LOCK (src); if (priv->pool) { - ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL); + pool = gst_object_ref (priv->pool); + } else if (priv->allocator) { + allocator = gst_object_ref (priv->allocator); + } + params = priv->params; + GST_OBJECT_UNLOCK (src); + + if (pool) { + ret = gst_buffer_pool_acquire_buffer (pool, buffer, NULL); } else if (size != -1) { - *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params); + *buffer = gst_buffer_new_allocate (allocator, size, ¶ms); if (G_UNLIKELY (*buffer == NULL)) goto alloc_failed; @@ -1350,13 +1488,21 @@ gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset, size); goto alloc_failed; } + +done: + if (pool) + gst_object_unref (pool); + if (allocator) + gst_object_unref (allocator); + return ret; /* ERRORS */ alloc_failed: { GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size); - return GST_FLOW_ERROR; + ret = GST_FLOW_ERROR; + goto done; } } @@ -1471,9 +1617,9 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) gdouble rate; GstFormat seek_format, dest_format; GstSeekFlags flags; - GstSeekType cur_type, stop_type; - gint64 cur, stop; - gboolean flush, playing; + GstSeekType start_type, stop_type; + gint64 start, stop; + gboolean flush; gboolean update; gboolean relative_seek = FALSE; gboolean seekseg_configured = FALSE; @@ -1489,9 +1635,9 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) if (event) { gst_event_parse_seek (event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); + &start_type, &start, &stop_type, &stop); - relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) || + relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) || SEEK_TYPE_IS_RELATIVE (stop_type); if (dest_format != seek_format && !relative_seek) { @@ -1525,7 +1671,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) /* unblock streaming thread. */ if (unlock) - gst_base_src_set_flushing (src, TRUE, FALSE, &playing); + gst_base_src_set_flushing (src, TRUE); /* grab streaming lock, this should eventually be possible, either * because the task is paused, our streaming thread stopped @@ -1541,7 +1687,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) } if (unlock) - gst_base_src_set_flushing (src, FALSE, playing, NULL); + gst_base_src_set_flushing (src, FALSE); /* If we configured the seeksegment above, don't overwrite it now. Otherwise * copy the current segment info into the temp segment that we can actually @@ -1564,7 +1710,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) /* The seek format matches our processing format, no need to ask the * the subclass to configure the segment. */ gst_segment_do_seek (&seeksegment, rate, seek_format, flags, - cur_type, cur, stop_type, stop, &update); + start_type, start, stop_type, stop, &update); } } /* Else, no seek event passed, so we're just (re)starting the @@ -1614,12 +1760,8 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) gst_element_post_message (GST_ELEMENT (src), message); } - /* for deriving a stop position for the playback segment from the seek - * segment, we must take the duration when the stop is not set */ - if ((stop = seeksegment.stop) == -1) - stop = seeksegment.duration; - src->priv->segment_pending = TRUE; + src->priv->segment_seqnum = seqnum; } src->priv->discont = TRUE; @@ -1651,8 +1793,10 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) { GstBaseSrc *src; gboolean result = FALSE; + GstBaseSrcClass *bclass; src = GST_BASE_SRC (element); + bclass = GST_BASE_SRC_GET_CLASS (src); GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event); @@ -1660,62 +1804,112 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) /* bidirectional events */ case GST_EVENT_FLUSH_START: GST_DEBUG_OBJECT (src, "pushing flush-start event downstream"); + result = gst_pad_push_event (src->srcpad, event); + gst_base_src_set_flushing (src, TRUE); event = NULL; break; case GST_EVENT_FLUSH_STOP: - GST_LIVE_LOCK (src->srcpad); - src->priv->segment_pending = TRUE; - /* sending random flushes downstream can break stuff, - * especially sync since all segment info will get flushed */ + { + gboolean start; + + GST_PAD_STREAM_LOCK (src->srcpad); + gst_base_src_set_flushing (src, FALSE); + GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream"); result = gst_pad_push_event (src->srcpad, event); - GST_LIVE_UNLOCK (src->srcpad); + + /* For external flush, restart the task .. */ + GST_LIVE_LOCK (src); + src->priv->segment_pending = TRUE; + + GST_OBJECT_LOCK (src->srcpad); + start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH); + GST_OBJECT_UNLOCK (src->srcpad); + + /* ... and for live sources, only if in playing state */ + if (src->is_live) { + if (!src->live_running) + start = FALSE; + } + + if (start) + gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, + src->srcpad, NULL); + + GST_LIVE_UNLOCK (src); + GST_PAD_STREAM_UNLOCK (src->srcpad); + event = NULL; break; + } /* downstream serialized events */ case GST_EVENT_EOS: { - GstBaseSrcClass *bclass; - - bclass = GST_BASE_SRC_GET_CLASS (src); + gboolean push_mode; /* queue EOS and make sure the task or pull function performs the EOS * actions. * - * We have two possibilities: + * For push mode, This will be done in 3 steps. It is required to not + * block here as gst_element_send_event() will hold the STATE_LOCK, hence + * blocking would prevent asynchronous state change to complete. * - * - Before we are to enter the _create function, we check the pending_eos - * first and do EOS instead of entering it. - * - If we are in the _create function or we did not manage to set the - * flag fast enough and we are about to enter the _create function, - * we unlock it so that we exit with FLUSHING immediately. We then - * check the EOS flag and do the EOS logic. + * 1. We stop the streaming thread + * 2. We set the pending eos + * 3. We start the streaming thread again, so it is performed + * asynchronously. + * + * For pull mode, we simply mark the pending EOS without flushing. */ - g_atomic_int_set (&src->priv->pending_eos, TRUE); - GST_DEBUG_OBJECT (src, "EOS marked, calling unlock"); + GST_OBJECT_LOCK (src->srcpad); + push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH; + GST_OBJECT_UNLOCK (src->srcpad); - /* unlock the _create function so that we can check the pending_eos flag - * and we can do EOS. This will eventually release the LIVE_LOCK again so - * that we can grab it and stop the unlock again. We don't take the stream - * lock so that this operation is guaranteed to never block. */ - gst_base_src_activate_pool (src, FALSE); - if (bclass->unlock) - bclass->unlock (src); + if (push_mode) { + gst_base_src_set_flushing (src, TRUE); - GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK"); + GST_PAD_STREAM_LOCK (src->srcpad); + gst_base_src_set_flushing (src, FALSE); - GST_LIVE_LOCK (src); - GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop"); - /* now stop the unlock of the streaming thread again. Grabbing the live - * lock is enough because that protects the create function. */ - if (bclass->unlock_stop) - bclass->unlock_stop (src); - gst_base_src_activate_pool (src, TRUE); - GST_LIVE_UNLOCK (src); + GST_OBJECT_LOCK (src); + g_atomic_int_set (&src->priv->has_pending_eos, TRUE); + if (src->priv->pending_eos) + gst_event_unref (src->priv->pending_eos); + src->priv->pending_eos = event; + GST_OBJECT_UNLOCK (src); + + GST_DEBUG_OBJECT (src, + "EOS marked, start task for asynchronous handling"); + gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, + src->srcpad, NULL); + + GST_PAD_STREAM_UNLOCK (src->srcpad); + } else { + /* In pull mode, we need not to return flushing to downstream, though + * the stream lock is not kept after getrange was unblocked */ + GST_OBJECT_LOCK (src); + g_atomic_int_set (&src->priv->has_pending_eos, TRUE); + if (src->priv->pending_eos) + gst_event_unref (src->priv->pending_eos); + src->priv->pending_eos = event; + GST_OBJECT_UNLOCK (src); + gst_base_src_set_pool_flushing (src, TRUE); + if (bclass->unlock) + bclass->unlock (src); + + GST_PAD_STREAM_LOCK (src->srcpad); + if (bclass->unlock_stop) + bclass->unlock_stop (src); + gst_base_src_set_pool_flushing (src, TRUE); + GST_PAD_STREAM_UNLOCK (src->srcpad); + } + + + event = NULL; result = TRUE; break; } @@ -1723,9 +1917,11 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) /* sending random SEGMENT downstream can break sync. */ break; case GST_EVENT_TAG: + case GST_EVENT_SINK_MESSAGE: case GST_EVENT_CUSTOM_DOWNSTREAM: case GST_EVENT_CUSTOM_BOTH: - /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */ + case GST_EVENT_PROTECTION: + /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH, PROTECTION in the dataflow */ GST_OBJECT_LOCK (src); src->priv->pending_events = g_list_append (src->priv->pending_events, event); @@ -1856,10 +2052,10 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event) case GST_EVENT_FLUSH_START: /* cancel any blocking getrange, is normally called * when in pull mode. */ - result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL); + result = gst_base_src_set_flushing (src, TRUE); break; case GST_EVENT_FLUSH_STOP: - result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL); + result = gst_base_src_set_flushing (src, FALSE); break; case GST_EVENT_QOS: { @@ -1935,9 +2131,11 @@ gst_base_src_set_property (GObject * object, guint prop_id, case PROP_NUM_BUFFERS: src->num_buffers = g_value_get_int (value); break; +#ifndef GST_REMOVE_DEPRECATED case PROP_TYPEFIND: src->typefind = g_value_get_boolean (value); break; +#endif case PROP_DO_TIMESTAMP: gst_base_src_set_do_timestamp (src, g_value_get_boolean (value)); break; @@ -1962,9 +2160,11 @@ gst_base_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_NUM_BUFFERS: g_value_set_int (value, src->num_buffers); break; +#ifndef GST_REMOVE_DEPRECATED case PROP_TYPEFIND: g_value_set_boolean (value, src->typefind); break; +#endif case PROP_DO_TIMESTAMP: g_value_set_boolean (value, gst_base_src_get_do_timestamp (src)); break; @@ -2102,8 +2302,12 @@ gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer) if (!GST_CLOCK_TIME_IS_VALID (dts)) { if (do_timestamp) { dts = running_time; - } else { - dts = 0; + } else if (!GST_CLOCK_TIME_IS_VALID (pts)) { + if (GST_CLOCK_TIME_IS_VALID (basesrc->segment.start)) { + dts = basesrc->segment.start; + } else { + dts = 0; + } } GST_BUFFER_DTS (buffer) = dts; @@ -2176,29 +2380,31 @@ no_sync: /* Called with STREAM_LOCK and LIVE_LOCK */ static gboolean -gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length) +gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length, + gboolean force) { guint64 size, maxsize; GstBaseSrcClass *bclass; - GstFormat format; gint64 stop; - gboolean dynamic; + + /* only operate if we are working with bytes */ + if (src->segment.format != GST_FORMAT_BYTES) + return TRUE; bclass = GST_BASE_SRC_GET_CLASS (src); - format = src->segment.format; stop = src->segment.stop; /* get total file size */ size = src->segment.duration; - /* only operate if we are working with bytes */ - if (format != GST_FORMAT_BYTES) - return TRUE; - - /* the max amount of bytes to read is the total size or - * up to the segment.stop if present. */ - if (stop != -1) - maxsize = MIN (size, stop); + /* when not doing automatic EOS, just use the stop position. We don't use + * the size to check for EOS */ + if (!g_atomic_int_get (&src->priv->automatic_eos)) + maxsize = stop; + /* Otherwise, the max amount of bytes to read is the total + * size or up to the segment.stop if present. */ + else if (stop != -1) + maxsize = size != -1 ? MIN (size, stop) : stop; else maxsize = size; @@ -2207,39 +2413,41 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length) ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset, *length, size, stop, maxsize); - dynamic = g_atomic_int_get (&src->priv->dynamic_size); - GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic); - /* check size if we have one */ if (maxsize != -1) { /* if we run past the end, check if the file became bigger and - * retry. */ - if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) { + * retry. Mind wrap when checking. */ + if (G_UNLIKELY (offset >= maxsize || offset + *length >= maxsize || force)) { /* see if length of the file changed */ if (bclass->get_size) if (!bclass->get_size (src, &size)) size = -1; - /* make sure we don't exceed the configured segment stop - * if it was set */ - if (stop != -1) - maxsize = MIN (size, stop); + /* when not doing automatic EOS, just use the stop position. We don't use + * the size to check for EOS */ + if (!g_atomic_int_get (&src->priv->automatic_eos)) + maxsize = stop; + /* Otherwise, the max amount of bytes to read is the total + * size or up to the segment.stop if present. */ + else if (stop != -1) + maxsize = size != -1 ? MIN (size, stop) : stop; else maxsize = size; - /* if we are at or past the end, EOS */ - if (G_UNLIKELY (offset >= maxsize)) - goto unexpected_length; - - /* else we can clip to the end */ - if (G_UNLIKELY (offset + *length >= maxsize)) - *length = maxsize - offset; + if (maxsize != -1) { + /* if we are at or past the end, EOS */ + if (G_UNLIKELY (offset >= maxsize)) + goto unexpected_length; + /* else we can clip to the end */ + if (G_UNLIKELY (offset + *length >= maxsize)) + *length = maxsize - offset; + } } } - /* keep track of current duration. - * segment is in bytes, we checked that above. */ + /* keep track of current duration. segment is in bytes, we checked + * that above. */ GST_OBJECT_LOCK (src); src->segment.duration = size; GST_OBJECT_UNLOCK (src); @@ -2249,6 +2457,7 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length) /* ERRORS */ unexpected_length: { + GST_DEBUG_OBJECT (src, "processing at or past EOS"); return FALSE; } } @@ -2263,13 +2472,14 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, GstClockReturn status; GstBuffer *res_buf; GstBuffer *in_buf; + gboolean own_res_buf; bclass = GST_BASE_SRC_GET_CLASS (src); again: if (src->is_live) { if (G_UNLIKELY (!src->live_running)) { - ret = gst_base_src_wait_playing (src); + ret = gst_base_src_wait_playing_unlocked (src); if (ret != GST_FLOW_OK) goto stopped; } @@ -2282,7 +2492,7 @@ again: if (G_UNLIKELY (!bclass->create)) goto no_function; - if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length))) + if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE))) goto unexpected_length; /* track position */ @@ -2300,26 +2510,46 @@ again: } /* don't enter the create function if a pending EOS event was set. For the - * logic of the pending_eos, check the event function of this class. */ - if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) + * logic of the has_pending_eos, check the event function of this class. */ + if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) { + src->priv->forced_eos = TRUE; goto eos; + } GST_DEBUG_OBJECT (src, "calling create offset %" G_GUINT64_FORMAT " length %u, time %" G_GINT64_FORMAT, offset, length, src->segment.time); res_buf = in_buf = *buf; + own_res_buf = (*buf == NULL); + GST_LIVE_UNLOCK (src); ret = bclass->create (src, offset, length, &res_buf); + GST_LIVE_LOCK (src); + + /* As we released the LIVE_LOCK, the state may have changed */ + if (src->is_live) { + if (G_UNLIKELY (!src->live_running)) { + GstFlowReturn wait_ret; + wait_ret = gst_base_src_wait_playing_unlocked (src); + if (wait_ret != GST_FLOW_OK) { + if (ret == GST_FLOW_OK && own_res_buf) + gst_buffer_unref (res_buf); + ret = wait_ret; + goto stopped; + } + } + } /* The create function could be unlocked because we have a pending EOS. It's * possible that we have a valid buffer from create that we need to * discard when the create function returned _OK. */ - if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) { + if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) { if (ret == GST_FLOW_OK) { - if (*buf == NULL) + if (own_res_buf) gst_buffer_unref (res_buf); } + src->priv->forced_eos = TRUE; goto eos; } @@ -2334,7 +2564,9 @@ again: GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't " "fill the provided buffer, copying"); - gst_buffer_map (in_buf, &info, GST_MAP_WRITE); + if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE)) + goto map_failed; + copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size); gst_buffer_unmap (in_buf, &info); gst_buffer_set_size (in_buf, copied_size); @@ -2345,6 +2577,16 @@ again: res_buf = in_buf; } + if (res_buf == NULL) { + GstBufferList *pending_list = src->priv->pending_bufferlist; + + if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0) + goto null_buffer; + + res_buf = gst_buffer_list_get_writable (pending_list, 0); + own_res_buf = FALSE; + } + /* no timestamp set and we are at offset 0, we can timestamp with 0 */ if (offset == 0 && src->segment.time == 0 && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) { @@ -2373,7 +2615,7 @@ again: /* this case is triggered when we were waiting for the clock and * it got unlocked because we did a state change. In any case, get rid of * the buffer. */ - if (*buf == NULL) + if (own_res_buf) gst_buffer_unref (res_buf); if (!src->live_running) { @@ -2395,7 +2637,7 @@ again: GST_ELEMENT_ERROR (src, CORE, CLOCK, (_("Internal clock error.")), ("clock returned unexpected return value %d", status)); - if (*buf == NULL) + if (own_res_buf) gst_buffer_unref (res_buf); ret = GST_FLOW_ERROR; break; @@ -2418,6 +2660,15 @@ not_ok: gst_flow_get_name (ret)); return ret; } +map_failed: + { + GST_ELEMENT_ERROR (src, RESOURCE, BUSY, + (_("Failed to map buffer.")), + ("failed to map result buffer in WRITE mode")); + if (own_res_buf) + gst_buffer_unref (res_buf); + return GST_FLOW_ERROR; + } not_started: { GST_DEBUG_OBJECT (src, "getrange but not started"); @@ -2442,7 +2693,7 @@ reached_num_buffers: flushing: { GST_DEBUG_OBJECT (src, "we are flushing"); - if (*buf == NULL) + if (own_res_buf) gst_buffer_unref (res_buf); return GST_FLOW_FLUSHING; } @@ -2451,6 +2702,14 @@ eos: GST_DEBUG_OBJECT (src, "we are EOS"); return GST_FLOW_EOS; } +null_buffer: + { + GST_ELEMENT_ERROR (src, STREAM, FAILED, + (_("Internal data flow error.")), + ("Subclass %s neither returned a buffer nor submitted a buffer list " + "from its create function", G_OBJECT_TYPE_NAME (src))); + return GST_FLOW_ERROR; + } } static GstFlowReturn @@ -2505,6 +2764,7 @@ start_failed: } } +/* Called with STREAM_LOCK */ static void gst_base_src_loop (GstPad * pad) { @@ -2520,17 +2780,44 @@ gst_base_src_loop (GstPad * pad) src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); + /* Just leave immediately if we're flushing */ + GST_LIVE_LOCK (src); + if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad))) + goto flushing; + GST_LIVE_UNLOCK (src); + + /* Just return if EOS is pushed again, as the app might be unaware that an + * EOS have been sent already */ + if (GST_PAD_IS_EOS (pad)) { + GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task"); + gst_pad_pause_task (pad); + goto done; + } + gst_base_src_send_stream_start (src); + /* The stream-start event could've caused something to flush us */ + GST_LIVE_LOCK (src); + if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad))) + goto flushing; + GST_LIVE_UNLOCK (src); + /* check if we need to renegotiate */ if (gst_pad_check_reconfigure (pad)) { - if (!gst_base_src_negotiate (src)) - goto not_negotiated; + if (!gst_base_src_negotiate (src)) { + gst_pad_mark_reconfigure (pad); + if (GST_PAD_IS_FLUSHING (pad)) { + GST_LIVE_LOCK (src); + goto flushing; + } else { + goto negotiate_failed; + } + } } GST_LIVE_LOCK (src); - if (G_UNLIKELY (src->priv->flushing)) + if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad))) goto flushing; blocksize = src->blocksize; @@ -2555,6 +2842,12 @@ gst_base_src_loop (GstPad * pad) GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u", GST_TIME_ARGS (position), blocksize); + /* clean up just in case we got interrupted or so last time round */ + if (src->priv->pending_bufferlist != NULL) { + gst_buffer_list_unref (src->priv->pending_bufferlist); + src->priv->pending_bufferlist = NULL; + } + ret = gst_base_src_get_range (src, position, blocksize, &buf); if (G_UNLIKELY (ret != GST_FLOW_OK)) { GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s", @@ -2562,13 +2855,19 @@ gst_base_src_loop (GstPad * pad) GST_LIVE_UNLOCK (src); goto pause; } - /* this should not happen */ - if (G_UNLIKELY (buf == NULL)) - goto null_buffer; + + /* Note: at this point buf might be a single buf returned which we own or + * the first buf of a pending buffer list submitted via submit_buffer_list(), + * in which case the buffer is owned by the pending buffer list and not us. */ + g_assert (buf != NULL); /* push events to close/start our segment before we push the buffer. */ if (G_UNLIKELY (src->priv->segment_pending)) { - gst_pad_push_event (pad, gst_event_new_segment (&src->segment)); + GstEvent *seg_event = gst_event_new_segment (&src->segment); + + gst_event_set_seqnum (seg_event, src->priv->segment_seqnum); + src->priv->segment_seqnum = gst_util_seqnum_next (); + gst_pad_push_event (pad, seg_event); src->priv->segment_pending = FALSE; } @@ -2638,7 +2937,8 @@ gst_base_src_loop (GstPad * pad) /* positive rate, check if we reached the stop */ if (src->segment.stop != -1) { if (position >= src->segment.stop) { - eos = TRUE; + if (g_atomic_int_get (&src->priv->automatic_eos)) + eos = TRUE; position = src->segment.stop; } } @@ -2646,7 +2946,8 @@ gst_base_src_loop (GstPad * pad) /* negative rate, check if we reached the start. start is always set to * something different from -1 */ if (position <= src->segment.start) { - eos = TRUE; + if (g_atomic_int_get (&src->priv->automatic_eos)) + eos = TRUE; position = src->segment.start; } /* when going reverse, all buffers are DISCONT */ @@ -2665,14 +2966,26 @@ gst_base_src_loop (GstPad * pad) } GST_LIVE_UNLOCK (src); - ret = gst_pad_push (pad, buf); + /* push buffer or buffer list */ + if (src->priv->pending_bufferlist != NULL) { + ret = gst_pad_push_list (pad, src->priv->pending_bufferlist); + src->priv->pending_bufferlist = NULL; + } else { + ret = gst_pad_push (pad, buf); + } + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + if (ret == GST_FLOW_NOT_NEGOTIATED) { + goto not_negotiated; + } GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s", gst_flow_get_name (ret)); goto pause; } - if (G_UNLIKELY (eos)) { + /* Segment pending means that a new segment was configured + * during this loop run */ + if (G_UNLIKELY (eos && !src->priv->segment_pending)) { GST_INFO_OBJECT (src, "pausing after end of segment"); ret = GST_FLOW_EOS; goto pause; @@ -2684,7 +2997,16 @@ done: /* special cases */ not_negotiated: { - GST_DEBUG_OBJECT (src, "Failed to renegotiate"); + if (gst_pad_needs_reconfigure (pad)) { + GST_DEBUG_OBJECT (src, "Retrying to renegotiate"); + return; + } + /* fallthrough when push returns NOT_NEGOTIATED and we don't have + * a pending negotiation request on our srcpad */ + } +negotiate_failed: + { + GST_DEBUG_OBJECT (src, "Not negotiated"); ret = GST_FLOW_NOT_NEGOTIATED; goto pause; } @@ -2708,12 +3030,19 @@ pause: GstFormat format; gint64 position; - /* perform EOS logic */ flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0; format = src->segment.format; position = src->segment.position; - if (flag_segment) { + /* perform EOS logic */ + if (src->priv->forced_eos) { + g_assert (g_atomic_int_get (&src->priv->has_pending_eos)); + GST_OBJECT_LOCK (src); + event = src->priv->pending_eos; + src->priv->pending_eos = NULL; + GST_OBJECT_UNLOCK (src); + + } else if (flag_segment) { GstMessage *message; message = gst_message_new_segment_done (GST_OBJECT_CAST (src), @@ -2722,12 +3051,15 @@ pause: gst_element_post_message (GST_ELEMENT_CAST (src), message); event = gst_event_new_segment_done (format, position); gst_event_set_seqnum (event, src->priv->seqnum); - gst_pad_push_event (pad, event); + } else { event = gst_event_new_eos (); gst_event_set_seqnum (event, src->priv->seqnum); - gst_pad_push_event (pad, event); } + + gst_pad_push_event (pad, event); + src->priv->forced_eos = FALSE; + } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) { event = gst_event_new_eos (); gst_event_set_seqnum (event, src->priv->seqnum); @@ -2737,20 +3069,11 @@ pause: * due to flushing and posting an error message because of * that is the wrong thing to do, e.g. when we're doing * a flushing seek. */ - GST_ELEMENT_ERROR (src, STREAM, FAILED, - (_("Internal data flow error.")), - ("streaming task paused, reason %s (%d)", reason, ret)); + GST_ELEMENT_FLOW_ERROR (src, ret); gst_pad_push_event (pad, event); } goto done; } -null_buffer: - { - GST_ELEMENT_ERROR (src, STREAM, FAILED, - (_("Internal data flow error.")), ("element returned NULL buffer")); - GST_LIVE_UNLOCK (src); - goto done; - } } static gboolean @@ -2774,6 +3097,11 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool, oldalloc = priv->allocator; priv->allocator = allocator; + if (priv->pool) + gst_object_ref (priv->pool); + if (priv->allocator) + gst_object_ref (priv->allocator); + if (params) priv->params = *params; else @@ -2801,12 +3129,11 @@ activate_failed: } } -static gboolean -gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active) +static void +gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing) { GstBaseSrcPrivate *priv = basesrc->priv; GstBufferPool *pool; - gboolean res = TRUE; GST_OBJECT_LOCK (basesrc); if ((pool = priv->pool)) @@ -2814,10 +3141,9 @@ gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active) GST_OBJECT_UNLOCK (basesrc); if (pool) { - res = gst_buffer_pool_set_active (pool, active); + gst_buffer_pool_set_flushing (pool, flushing); gst_object_unref (pool); } - return res; } @@ -2864,7 +3190,25 @@ gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query) config = gst_buffer_pool_get_config (pool); gst_buffer_pool_config_set_params (config, outcaps, size, min, max); gst_buffer_pool_config_set_allocator (config, allocator, ¶ms); - gst_buffer_pool_set_config (pool, config); + + /* buffer pool may have to do some changes */ + if (!gst_buffer_pool_set_config (pool, config)) { + config = gst_buffer_pool_get_config (pool); + + /* If change are not acceptable, fallback to generic pool */ + if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min, + max)) { + GST_DEBUG_OBJECT (basesrc, "unsupported pool, making new pool"); + + gst_object_unref (pool); + pool = gst_buffer_pool_new (); + gst_buffer_pool_config_set_params (config, outcaps, size, min, max); + gst_buffer_pool_config_set_allocator (config, allocator, ¶ms); + } + + if (!gst_buffer_pool_set_config (pool, config)) + goto config_failed; + } } if (update_allocator) @@ -2880,6 +3224,13 @@ gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query) } return TRUE; + +config_failed: + GST_ELEMENT_ERROR (basesrc, RESOURCE, SETTINGS, + ("Failed to configure the buffer pool"), + ("Configuration is most likely invalid, please report this issue.")); + gst_object_unref (pool); + return FALSE; } static gboolean @@ -2926,6 +3277,11 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps) result = gst_base_src_set_allocation (basesrc, pool, allocator, ¶ms); + if (allocator) + gst_object_unref (allocator); + if (pool) + gst_object_unref (pool); + gst_query_unref (query); return result; @@ -3052,6 +3408,8 @@ gst_base_src_start (GstBaseSrc * basesrc) gboolean result; GST_LIVE_LOCK (basesrc); + + GST_OBJECT_LOCK (basesrc); if (GST_BASE_SRC_IS_STARTING (basesrc)) goto was_starting; if (GST_BASE_SRC_IS_STARTED (basesrc)) @@ -3059,12 +3417,14 @@ gst_base_src_start (GstBaseSrc * basesrc) basesrc->priv->start_result = GST_FLOW_FLUSHING; GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING); + gst_segment_init (&basesrc->segment, basesrc->segment.format); + GST_OBJECT_UNLOCK (basesrc); + basesrc->num_buffers_left = basesrc->num_buffers; basesrc->running = FALSE; basesrc->priv->segment_pending = FALSE; - GST_OBJECT_LOCK (basesrc); - gst_segment_init (&basesrc->segment, basesrc->segment.format); - GST_OBJECT_UNLOCK (basesrc); + basesrc->priv->segment_seqnum = gst_util_seqnum_next (); + basesrc->priv->forced_eos = FALSE; GST_LIVE_UNLOCK (basesrc); bclass = GST_BASE_SRC_GET_CLASS (basesrc); @@ -3076,8 +3436,12 @@ gst_base_src_start (GstBaseSrc * basesrc) if (!result) goto could_not_start; - if (!gst_base_src_is_async (basesrc)) + if (!gst_base_src_is_async (basesrc)) { gst_base_src_start_complete (basesrc, GST_FLOW_OK); + /* not really waiting here, we call this to get the result + * from the start_complete call */ + result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK; + } return result; @@ -3085,19 +3449,24 @@ gst_base_src_start (GstBaseSrc * basesrc) was_starting: { GST_DEBUG_OBJECT (basesrc, "was starting"); + GST_OBJECT_UNLOCK (basesrc); GST_LIVE_UNLOCK (basesrc); return TRUE; } was_started: { GST_DEBUG_OBJECT (basesrc, "was started"); + GST_OBJECT_UNLOCK (basesrc); GST_LIVE_UNLOCK (basesrc); return TRUE; } could_not_start: { GST_DEBUG_OBJECT (basesrc, "could not start"); - /* subclass is supposed to post a message. We don't have to call _stop. */ + /* subclass is supposed to post a message but we post one as a fallback + * just in case. We don't have to call _stop. */ + GST_ELEMENT_ERROR (basesrc, CORE, STATE_CHANGE, (NULL), + ("Failed to start")); gst_base_src_start_complete (basesrc, GST_FLOW_ERROR); return FALSE; } @@ -3160,10 +3529,6 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret) GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access); - /* stop flushing now but for live sources, still block in the LIVE lock when - * we are not yet PLAYING */ - gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL); - gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc)); GST_OBJECT_LOCK (basesrc->srcpad); @@ -3173,32 +3538,38 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret) /* take the stream lock here, we only want to let the task run when we have * set the STARTED flag */ GST_PAD_STREAM_LOCK (basesrc->srcpad); - if (mode == GST_PAD_MODE_PUSH) { - /* do initial seek, which will start the task */ - GST_OBJECT_LOCK (basesrc); - event = basesrc->pending_seek; - basesrc->pending_seek = NULL; - GST_OBJECT_UNLOCK (basesrc); - - /* The perform seek code will start the task when finished. We don't have to - * unlock the streaming thread because it is not running yet */ - if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE))) - goto seek_failed; - - if (event) - gst_event_unref (event); - } else { - /* if not random_access, we cannot operate in pull mode for now */ - if (G_UNLIKELY (!basesrc->random_access)) - goto no_get_range; + switch (mode) { + case GST_PAD_MODE_PUSH: + /* do initial seek, which will start the task */ + GST_OBJECT_LOCK (basesrc); + event = basesrc->pending_seek; + basesrc->pending_seek = NULL; + GST_OBJECT_UNLOCK (basesrc); + + /* The perform seek code will start the task when finished. We don't have to + * unlock the streaming thread because it is not running yet */ + if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE))) + goto seek_failed; + + if (event) + gst_event_unref (event); + break; + case GST_PAD_MODE_PULL: + /* if not random_access, we cannot operate in pull mode for now */ + if (G_UNLIKELY (!basesrc->random_access)) + goto no_get_range; + break; + default: + goto not_activated_yet; + break; } - GST_LIVE_LOCK (basesrc); + GST_OBJECT_LOCK (basesrc); GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED); GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); basesrc->priv->start_result = ret; - GST_LIVE_SIGNAL (basesrc); - GST_LIVE_UNLOCK (basesrc); + GST_ASYNC_SIGNAL (basesrc); + GST_OBJECT_UNLOCK (basesrc); GST_PAD_STREAM_UNLOCK (basesrc->srcpad); @@ -3208,7 +3579,7 @@ seek_failed: { GST_PAD_STREAM_UNLOCK (basesrc->srcpad); GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek"); - gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL); + gst_base_src_stop (basesrc); if (event) gst_event_unref (event); ret = GST_FLOW_ERROR; @@ -3217,18 +3588,26 @@ seek_failed: no_get_range: { GST_PAD_STREAM_UNLOCK (basesrc->srcpad); - gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL); + gst_base_src_stop (basesrc); GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping"); ret = GST_FLOW_ERROR; goto error; } +not_activated_yet: + { + GST_PAD_STREAM_UNLOCK (basesrc->srcpad); + gst_base_src_stop (basesrc); + GST_WARNING_OBJECT (basesrc, "pad not activated yet"); + ret = GST_FLOW_ERROR; + goto error; + } error: { - GST_LIVE_LOCK (basesrc); + GST_OBJECT_LOCK (basesrc); basesrc->priv->start_result = ret; GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); - GST_LIVE_SIGNAL (basesrc); - GST_LIVE_UNLOCK (basesrc); + GST_ASYNC_SIGNAL (basesrc); + GST_OBJECT_UNLOCK (basesrc); return; } } @@ -3246,27 +3625,16 @@ gst_base_src_start_wait (GstBaseSrc * basesrc) { GstFlowReturn result; - GST_LIVE_LOCK (basesrc); - if (G_UNLIKELY (basesrc->priv->flushing)) - goto flushing; - + GST_OBJECT_LOCK (basesrc); while (GST_BASE_SRC_IS_STARTING (basesrc)) { - GST_LIVE_WAIT (basesrc); - if (G_UNLIKELY (basesrc->priv->flushing)) - goto flushing; + GST_ASYNC_WAIT (basesrc); } result = basesrc->priv->start_result; - GST_LIVE_UNLOCK (basesrc); + GST_OBJECT_UNLOCK (basesrc); - return result; + GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result)); - /* ERRORS */ -flushing: - { - GST_DEBUG_OBJECT (basesrc, "we are flushing"); - GST_LIVE_UNLOCK (basesrc); - return GST_FLOW_FLUSHING; - } + return result; } static gboolean @@ -3278,32 +3646,40 @@ gst_base_src_stop (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "stopping source"); /* flush all */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL); + gst_base_src_set_flushing (basesrc, TRUE); + /* stop the task */ gst_pad_stop_task (basesrc->srcpad); + /* stop flushing, this will balance unlock/unlock_stop calls */ + gst_base_src_set_flushing (basesrc, FALSE); - GST_LIVE_LOCK (basesrc); + GST_OBJECT_LOCK (basesrc); if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc)) goto was_stopped; GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED); basesrc->priv->start_result = GST_FLOW_FLUSHING; - GST_LIVE_SIGNAL (basesrc); - GST_LIVE_UNLOCK (basesrc); + GST_ASYNC_SIGNAL (basesrc); + GST_OBJECT_UNLOCK (basesrc); bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->stop) result = bclass->stop (basesrc); + if (basesrc->priv->pending_bufferlist != NULL) { + gst_buffer_list_unref (basesrc->priv->pending_bufferlist); + basesrc->priv->pending_bufferlist = NULL; + } + gst_base_src_set_allocation (basesrc, NULL, NULL, NULL); return result; was_stopped: { - GST_DEBUG_OBJECT (basesrc, "was started"); - GST_LIVE_UNLOCK (basesrc); + GST_DEBUG_OBJECT (basesrc, "was stopped"); + GST_OBJECT_UNLOCK (basesrc); return TRUE; } } @@ -3311,49 +3687,39 @@ was_stopped: /* start or stop flushing dataprocessing */ static gboolean -gst_base_src_set_flushing (GstBaseSrc * basesrc, - gboolean flushing, gboolean live_play, gboolean * playing) +gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing) { GstBaseSrcClass *bclass; bclass = GST_BASE_SRC_GET_CLASS (basesrc); - GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play); + GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing); if (flushing) { - gst_base_src_activate_pool (basesrc, FALSE); - /* unlock any subclasses, we need to do this before grabbing the - * LIVE_LOCK since we hold this lock before going into ::create. We pass an - * unlock to the params because of backwards compat (see seek handler)*/ + gst_base_src_set_pool_flushing (basesrc, TRUE); + /* unlock any subclasses to allow turning off the streaming thread */ if (bclass->unlock) bclass->unlock (basesrc); } - /* the live lock is released when we are blocked, waiting for playing or - * when we sync to the clock. */ + /* the live lock is released when we are blocked, waiting for playing, + * when we sync to the clock or creating a buffer */ GST_LIVE_LOCK (basesrc); - if (playing) - *playing = basesrc->live_running; basesrc->priv->flushing = flushing; if (flushing) { - /* if we are locked in the live lock, signal it to make it flush */ - basesrc->live_running = TRUE; - /* clear pending EOS if any */ - g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); - - /* step 1, now that we have the LIVE lock, clear our unlock request */ - if (bclass->unlock_stop) - bclass->unlock_stop (basesrc); + if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) { + GST_OBJECT_LOCK (basesrc); + CLEAR_PENDING_EOS (basesrc); + basesrc->priv->forced_eos = FALSE; + GST_OBJECT_UNLOCK (basesrc); + } - /* step 2, unblock clock sync (if any) or any other blocking thing */ + /* unblock clock sync (if any) or any other blocking thing */ if (basesrc->clock_id) gst_clock_id_unschedule (basesrc->clock_id); } else { - /* signal the live source that it can start playing */ - basesrc->live_running = live_play; - - gst_base_src_activate_pool (basesrc, TRUE); + gst_base_src_set_pool_flushing (basesrc, FALSE); /* Drop all delayed events */ GST_OBJECT_LOCK (basesrc); @@ -3366,9 +3732,18 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, } GST_OBJECT_UNLOCK (basesrc); } + GST_LIVE_SIGNAL (basesrc); GST_LIVE_UNLOCK (basesrc); + if (!flushing) { + /* Now wait for the stream lock to be released and clear our unlock request */ + GST_PAD_STREAM_LOCK (basesrc->srcpad); + if (bclass->unlock_stop) + bclass->unlock_stop (basesrc); + GST_PAD_STREAM_UNLOCK (basesrc->srcpad); + } + return TRUE; } @@ -3377,17 +3752,6 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, static gboolean gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) { - GstBaseSrcClass *bclass; - - bclass = GST_BASE_SRC_GET_CLASS (basesrc); - - /* unlock subclasses locked in ::create, we only do this when we stop playing. */ - if (!live_play) { - GST_DEBUG_OBJECT (basesrc, "unlock"); - if (bclass->unlock) - bclass->unlock (basesrc); - } - /* we are now able to grab the LIVE lock, when we get it, we can be * waiting for PLAYING while blocked in the LIVE cond or we can be waiting * for the clock. */ @@ -3405,13 +3769,10 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) if (live_play) { gboolean start; - /* clear our unlock request when going to PLAYING */ - GST_DEBUG_OBJECT (basesrc, "unlock stop"); - if (bclass->unlock_stop) - bclass->unlock_stop (basesrc); - /* for live sources we restart the timestamp correction */ + GST_OBJECT_LOCK (basesrc); basesrc->priv->latency = -1; + GST_OBJECT_UNLOCK (basesrc); /* have to restart the task in case it stopped because of the unlock when * we went to PAUSED. Only do this if we operating in push mode. */ GST_OBJECT_LOCK (basesrc->srcpad); @@ -3507,12 +3868,18 @@ gst_base_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active) { gboolean res; + GstBaseSrc *src = GST_BASE_SRC (parent); + + src->priv->stream_start_pending = FALSE; + + GST_DEBUG_OBJECT (pad, "activating in mode %d", mode); switch (mode) { case GST_PAD_MODE_PULL: res = gst_base_src_activate_pull (pad, parent, active); break; case GST_PAD_MODE_PUSH: + src->priv->stream_start_pending = active; res = gst_base_src_activate_push (pad, parent, active); break; default: @@ -3537,7 +3904,6 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - basesrc->priv->stream_start_pending = TRUE; no_preroll = gst_base_src_is_live (basesrc); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: @@ -3560,7 +3926,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED"); if (gst_base_src_is_live (basesrc)) { - /* make sure we block in the live lock in PAUSED */ + /* make sure we block in the live cond in PAUSED */ gst_base_src_set_playing (basesrc, FALSE); no_preroll = TRUE; } @@ -3569,9 +3935,12 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) { /* we don't need to unblock anything here, the pad deactivation code * already did this */ - g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); + if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) { + GST_OBJECT_LOCK (basesrc); + CLEAR_PENDING_EOS (basesrc); + GST_OBJECT_UNLOCK (basesrc); + } gst_event_replace (&basesrc->pending_seek, NULL); - basesrc->priv->stream_start_pending = FALSE; break; } case GST_STATE_CHANGE_READY_TO_NULL: @@ -3592,3 +3961,92 @@ failure: return result; } } + +/** + * gst_base_src_get_buffer_pool: + * @src: a #GstBaseSrc + * + * Returns: (transfer full): the instance of the #GstBufferPool used + * by the src; unref it after usage. + */ +GstBufferPool * +gst_base_src_get_buffer_pool (GstBaseSrc * src) +{ + GstBufferPool *ret = NULL; + + g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL); + + GST_OBJECT_LOCK (src); + if (src->priv->pool) + ret = gst_object_ref (src->priv->pool); + GST_OBJECT_UNLOCK (src); + + return ret; +} + +/** + * gst_base_src_get_allocator: + * @src: a #GstBaseSrc + * @allocator: (out) (allow-none) (transfer full): the #GstAllocator + * used + * @params: (out) (allow-none) (transfer full): the + * #GstAllocationParams of @allocator + * + * Lets #GstBaseSrc sub-classes to know the memory @allocator + * used by the base class and its @params. + * + * Unref the @allocator after usage. + */ +void +gst_base_src_get_allocator (GstBaseSrc * src, + GstAllocator ** allocator, GstAllocationParams * params) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + + GST_OBJECT_LOCK (src); + if (allocator) + *allocator = src->priv->allocator ? + gst_object_ref (src->priv->allocator) : NULL; + + if (params) + *params = src->priv->params; + GST_OBJECT_UNLOCK (src); +} + +/** + * gst_base_src_submit_buffer_list: + * @src: a #GstBaseSrc + * @buffer_list: (transfer full): a #GstBufferList + * + * Subclasses can call this from their create virtual method implementation + * to submit a buffer list to be pushed out later. This is useful in + * cases where the create function wants to produce multiple buffers to be + * pushed out in one go in form of a #GstBufferList, which can reduce overhead + * drastically, especially for packetised inputs (for data streams where + * the packetisation/chunking is not important it is usually more efficient + * to return larger buffers instead). + * + * Subclasses that use this function from their create function must return + * %GST_FLOW_OK and no buffer from their create virtual method implementation. + * If a buffer is returned after a buffer list has also been submitted via this + * function the behaviour is undefined. + * + * Subclasses must only call this function once per create function call and + * subclasses must only call this function when the source operates in push + * mode. + * + * Since: 1.14 + */ +void +gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list)); + g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE); + + /* we need it to be writable later in get_range() where we use get_writable */ + src->priv->pending_bufferlist = gst_buffer_list_make_writable (buffer_list); + + GST_LOG_OBJECT (src, "%u buffers submitted in buffer list", + gst_buffer_list_length (buffer_list)); +}