X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstbasesrc.c;h=111340585badc431330cc405b1c4f65cf7349eda;hb=43abf99a8a064cbe74f110658937088b95d09437;hp=df0a7184cf2fcc84131cbddd14e6034ce0bf7113;hpb=770159fb1ca388b11f2905fd0af58ec66387a4a0;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index df0a718..1113405 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -160,23 +160,22 @@ #include #include +#include #include "gstbasesrc.h" #include "gsttypefindhelper.h" -#include #include GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug); #define GST_CAT_DEFAULT gst_base_src_debug -#define GST_LIVE_GET_LOCK(elem) (GST_BASE_SRC_CAST(elem)->live_lock) +#define GST_LIVE_GET_LOCK(elem) (&GST_BASE_SRC_CAST(elem)->live_lock) #define GST_LIVE_LOCK(elem) g_mutex_lock(GST_LIVE_GET_LOCK(elem)) #define GST_LIVE_TRYLOCK(elem) g_mutex_trylock(GST_LIVE_GET_LOCK(elem)) #define GST_LIVE_UNLOCK(elem) g_mutex_unlock(GST_LIVE_GET_LOCK(elem)) -#define GST_LIVE_GET_COND(elem) (GST_BASE_SRC_CAST(elem)->live_cond) +#define GST_LIVE_GET_COND(elem) (&GST_BASE_SRC_CAST(elem)->live_cond) #define GST_LIVE_WAIT(elem) g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem)) -#define GST_LIVE_TIMED_WAIT(elem, timeval) g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\ - timeval) +#define GST_LIVE_WAIT_UNTIL(elem, end_time) g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time) #define GST_LIVE_SIGNAL(elem) g_cond_signal (GST_LIVE_GET_COND (elem)); #define GST_LIVE_BROADCAST(elem) g_cond_broadcast (GST_LIVE_GET_COND (elem)); @@ -209,6 +208,12 @@ struct _GstBaseSrcPrivate gboolean discont; gboolean flushing; + GstFlowReturn start_result; + gboolean async; + + /* if a stream-start event should be sent */ + gboolean stream_start_pending; + /* if segment should be sent */ gboolean segment_pending; @@ -240,7 +245,7 @@ struct _GstBaseSrcPrivate GstClockTime earliest_time; GstBufferPool *pool; - const GstAllocator *allocator; + GstAllocator *allocator; guint prefix; guint alignment; }; @@ -280,14 +285,12 @@ gst_base_src_get_type (void) static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter); -static void gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps); -static void gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps); +static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps); +static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps); static gboolean gst_base_src_is_random_access (GstBaseSrc * src); -static gboolean gst_base_src_activate_push (GstPad * pad, GstObject * parent, - gboolean active); -static gboolean gst_base_src_activate_pull (GstPad * pad, GstObject * parent, - gboolean active); +static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); static void gst_base_src_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_base_src_get_property (GObject * object, guint prop_id, @@ -315,6 +318,7 @@ static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc, static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing); + static gboolean gst_base_src_start (GstBaseSrc * basesrc); static gboolean gst_base_src_stop (GstBaseSrc * basesrc); @@ -384,8 +388,7 @@ gst_base_src_class_init (GstBaseSrcClass * klass) klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc); /* Registering debug symbols for function pointers */ - GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push); - GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull); + GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange); @@ -401,8 +404,8 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc); basesrc->is_live = FALSE; - basesrc->live_lock = g_mutex_new (); - basesrc->live_cond = g_cond_new (); + g_mutex_init (&basesrc->live_lock); + g_cond_init (&basesrc->live_cond); basesrc->num_buffers = DEFAULT_NUM_BUFFERS; basesrc->num_buffers_left = -1; @@ -416,8 +419,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) pad = gst_pad_new_from_template (pad_template, "src"); GST_DEBUG_OBJECT (basesrc, "setting functions on src pad"); - gst_pad_set_activatepush_function (pad, gst_base_src_activate_push); - gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull); + gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode); gst_pad_set_event_function (pad, gst_base_src_event); gst_pad_set_query_function (pad, gst_base_src_query); gst_pad_set_getrange_function (pad, gst_base_src_getrange); @@ -435,8 +437,10 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP; g_atomic_int_set (&basesrc->priv->have_events, FALSE); - GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); - GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE); + basesrc->priv->start_result = GST_FLOW_FLUSHING; + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED); + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE); GST_DEBUG_OBJECT (basesrc, "init done"); } @@ -449,8 +453,8 @@ gst_base_src_finalize (GObject * object) basesrc = GST_BASE_SRC (object); - g_mutex_free (basesrc->live_lock); - g_cond_free (basesrc->live_cond); + g_mutex_clear (&basesrc->live_lock); + g_cond_clear (&basesrc->live_cond); event_p = &basesrc->pending_seek; gst_event_replace (event_p, NULL); @@ -475,7 +479,7 @@ gst_base_src_finalize (GObject * object) * This function will block until a state change to PLAYING happens (in which * case this function returns #GST_FLOW_OK) or the processing must be stopped due * to a state change to READY or a FLUSH event (in which case this function - * returns #GST_FLOW_WRONG_STATE). + * returns #GST_FLOW_FLUSHING). * * Since: 0.10.12 * @@ -502,7 +506,7 @@ gst_base_src_wait_playing (GstBaseSrc * src) flushing: { GST_DEBUG_OBJECT (src, "we are flushing"); - return GST_FLOW_WRONG_STATE; + return GST_FLOW_FLUSHING; } } @@ -598,6 +602,49 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic) } /** + * gst_base_src_set_async: + * @src: base source instance + * @async: new async mode + * + * Configure async behaviour in @src, no state change will block. The open, + * close, start, stop, play and pause virtual methods will be executed in a + * different thread and are thus allowed to perform blocking operations. Any + * blocking operation should be unblocked with the unlock vmethod. + */ +void +gst_base_src_set_async (GstBaseSrc * src, gboolean async) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + + GST_OBJECT_LOCK (src); + src->priv->async = async; + GST_OBJECT_UNLOCK (src); +} + +/** + * gst_base_src_is_async: + * @src: base source instance + * + * Get the current async behaviour of @src. See also gst_base_src_set_async(). + * + * Returns: %TRUE if @src is operating in async mode. + */ +gboolean +gst_base_src_is_async (GstBaseSrc * src) +{ + gboolean res; + + g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE); + + GST_OBJECT_LOCK (src); + res = src->priv->async; + GST_OBJECT_UNLOCK (src); + + return res; +} + + +/** * gst_base_src_query_latency: * @src: the source * @live: (out) (allow-none): if the source is live @@ -785,6 +832,19 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop, return res; } +static gboolean +gst_base_src_send_stream_start (GstBaseSrc * src) +{ + gboolean ret = TRUE; + + if (src->priv->stream_start_pending) { + ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ()); + src->priv->stream_start_pending = FALSE; + } + + return ret; +} + /** * gst_base_src_set_caps: * @src: a #GstBaseSrc @@ -802,6 +862,7 @@ gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps) bclass = GST_BASE_SRC_GET_CLASS (src); + gst_base_src_send_stream_start (src); gst_pad_push_event (src->srcpad, gst_event_new_caps (caps)); if (bclass->set_caps) @@ -837,14 +898,14 @@ gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter) return caps; } -static void +static GstCaps * gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps) { GST_DEBUG_OBJECT (bsrc, "using default caps fixate function"); - gst_caps_fixate (caps); + return gst_caps_fixate (caps); } -static void +static GstCaps * gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) { GstBaseSrcClass *bclass; @@ -852,7 +913,9 @@ gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) bclass = GST_BASE_SRC_GET_CLASS (bsrc); if (bclass->fixate) - bclass->fixate (bsrc, caps); + caps = bclass->fixate (bsrc, caps); + + return caps; } static gboolean @@ -1133,10 +1196,13 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) bclass = GST_BASE_SRC_GET_CLASS (src); if (bclass->get_caps) { gst_query_parse_caps (query, &filter); - caps = bclass->get_caps (src, filter); - gst_query_set_caps_result (query, caps); - gst_caps_unref (caps); - res = TRUE; + if ((caps = bclass->get_caps (src, filter))) { + gst_query_set_caps_result (query, caps); + gst_caps_unref (caps); + res = TRUE; + } else { + res = FALSE; + } } else res = FALSE; break; @@ -1805,6 +1871,12 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event) result = TRUE; break; } + case GST_EVENT_RECONFIGURE: + result = TRUE; + break; + case GST_EVENT_LATENCY: + result = TRUE; + break; default: result = FALSE; break; @@ -2179,7 +2251,8 @@ again: } } - if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))) + if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src) + && !GST_BASE_SRC_IS_STARTING (src))) goto not_started; if (G_UNLIKELY (!bclass->create)) @@ -2230,6 +2303,7 @@ again: /* no timestamp set and we are at offset 0, we can timestamp with 0 */ if (offset == 0 && src->segment.time == 0 && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) { + GST_DEBUG_OBJECT (src, "setting first timestamp to 0"); *buf = gst_buffer_make_writable (*buf); GST_BUFFER_TIMESTAMP (*buf) = 0; } @@ -2261,7 +2335,7 @@ again: * get rid of the produced buffer. */ GST_DEBUG_OBJECT (src, "clock was unscheduled (%d), returning WRONG_STATE", status); - ret = GST_FLOW_WRONG_STATE; + ret = GST_FLOW_FLUSHING; } else { /* If we are running when this happens, we quickly switched between * pause and playing. We try to produce a new buffer */ @@ -2298,7 +2372,7 @@ not_ok: not_started: { GST_DEBUG_OBJECT (src, "getrange but not started"); - return GST_FLOW_WRONG_STATE; + return GST_FLOW_FLUSHING; } no_function: { @@ -2321,7 +2395,7 @@ flushing: GST_DEBUG_OBJECT (src, "we are flushing"); gst_buffer_unref (*buf); *buf = NULL; - return GST_FLOW_WRONG_STATE; + return GST_FLOW_FLUSHING; } eos: { @@ -2354,7 +2428,7 @@ done: flushing: { GST_DEBUG_OBJECT (src, "we are flushing"); - res = GST_FLOW_WRONG_STATE; + res = GST_FLOW_FLUSHING; goto done; } } @@ -2363,13 +2437,23 @@ static gboolean gst_base_src_is_random_access (GstBaseSrc * src) { /* we need to start the basesrc to check random access */ - if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) { + if (!GST_BASE_SRC_IS_STARTED (src)) { GST_LOG_OBJECT (src, "doing start/stop to check get_range support"); - if (G_LIKELY (gst_base_src_start (src))) + if (G_LIKELY (gst_base_src_start (src))) { + if (gst_base_src_start_wait (src) != GST_FLOW_OK) + goto start_failed; gst_base_src_stop (src); + } } return src->random_access; + + /* ERRORS */ +start_failed: + { + GST_DEBUG_OBJECT (src, "failed to start"); + return FALSE; + } } static void @@ -2387,10 +2471,12 @@ gst_base_src_loop (GstPad * pad) src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); + gst_base_src_send_stream_start (src); + /* check if we need to renegotiate */ if (gst_pad_check_reconfigure (pad)) { if (!gst_base_src_negotiate (src)) - GST_DEBUG_OBJECT (src, "Failed to renegotiate"); + goto not_negotiated; } GST_LIVE_LOCK (src); @@ -2523,6 +2609,7 @@ gst_base_src_loop (GstPad * pad) } if (G_UNLIKELY (src->priv->discont)) { + GST_INFO_OBJECT (src, "marking pending DISCONT"); buf = gst_buffer_make_writable (buf); GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); src->priv->discont = FALSE; @@ -2546,11 +2633,17 @@ done: return; /* special cases */ +not_negotiated: + { + GST_DEBUG_OBJECT (src, "Failed to renegotiate"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto pause; + } flushing: { GST_DEBUG_OBJECT (src, "we are flushing"); GST_LIVE_UNLOCK (src); - ret = GST_FLOW_WRONG_STATE; + ret = GST_FLOW_FLUSHING; goto pause; } pause: @@ -2610,8 +2703,9 @@ null_buffer: static gboolean gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool, - const GstAllocator * allocator, guint prefix, guint alignment) + GstAllocator * allocator, guint prefix, guint alignment) { + GstAllocator *oldalloc; GstBufferPool *oldpool; GstBaseSrcPrivate *priv = basesrc->priv; @@ -2625,6 +2719,7 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool, oldpool = priv->pool; priv->pool = pool; + oldalloc = priv->allocator; priv->allocator = allocator; priv->prefix = prefix; @@ -2639,6 +2734,9 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool, } gst_object_unref (oldpool); } + if (oldalloc) { + gst_allocator_unref (oldalloc); + } return TRUE; /* ERRORS */ @@ -2675,7 +2773,7 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps) gboolean result = TRUE; GstQuery *query; GstBufferPool *pool = NULL; - const GstAllocator *allocator = NULL; + GstAllocator *allocator = NULL; guint size, min, max, prefix, alignment; bclass = GST_BASE_SRC_GET_CLASS (basesrc); @@ -2698,15 +2796,12 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps) &alignment, &pool); if (size == 0) { - const gchar *mem = NULL; - /* no size, we have variable size buffers */ if (gst_query_get_n_allocation_memories (query) > 0) { - mem = gst_query_parse_nth_allocation_memory (query, 0); + if ((allocator = gst_query_parse_nth_allocation_memory (query, 0))) + gst_allocator_ref (allocator); } - GST_DEBUG_OBJECT (basesrc, "0 size, getting allocator %s", - GST_STR_NULL (mem)); - allocator = gst_allocator_find (mem); + GST_DEBUG_OBJECT (basesrc, "0 size, using allocator %p", allocator); } else if (pool == NULL) { /* fixed size, we can use a bufferpool */ GstStructure *config; @@ -2773,8 +2868,7 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc) * nego is not needed */ result = TRUE; } else { - caps = gst_caps_make_writable (caps); - gst_base_src_fixate (basesrc, caps); + caps = gst_base_src_fixate (basesrc, caps); GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps); if (gst_caps_is_fixed (caps)) { /* yay, fixed caps, use those then, it's possible that the subclass does @@ -2784,6 +2878,8 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc) } gst_caps_unref (caps); } else { + if (caps) + gst_caps_unref (caps); GST_DEBUG_OBJECT (basesrc, "no common caps"); } return result; @@ -2838,24 +2934,23 @@ static gboolean gst_base_src_start (GstBaseSrc * basesrc) { GstBaseSrcClass *bclass; - gboolean result, have_size; - guint64 size; - gboolean seekable; - GstFormat format; - - if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED)) - return TRUE; + gboolean result; - GST_DEBUG_OBJECT (basesrc, "starting source"); + GST_LIVE_LOCK (basesrc); + if (GST_BASE_SRC_IS_STARTING (basesrc)) + goto was_starting; + if (GST_BASE_SRC_IS_STARTED (basesrc)) + goto was_started; + basesrc->priv->start_result = GST_FLOW_FLUSHING; + GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING); basesrc->num_buffers_left = basesrc->num_buffers; - + basesrc->running = FALSE; + basesrc->priv->segment_pending = FALSE; GST_OBJECT_LOCK (basesrc); gst_segment_init (&basesrc->segment, basesrc->segment.format); GST_OBJECT_UNLOCK (basesrc); - - basesrc->running = FALSE; - basesrc->priv->segment_pending = FALSE; + GST_LIVE_UNLOCK (basesrc); bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->start) @@ -2866,14 +2961,65 @@ gst_base_src_start (GstBaseSrc * basesrc) if (!result) goto could_not_start; - GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED); + if (!gst_base_src_is_async (basesrc)) + gst_base_src_start_complete (basesrc, GST_FLOW_OK); + return result; + + /* ERROR */ +was_starting: + { + GST_DEBUG_OBJECT (basesrc, "was starting"); + GST_LIVE_UNLOCK (basesrc); + return TRUE; + } +was_started: + { + GST_DEBUG_OBJECT (basesrc, "was started"); + GST_LIVE_UNLOCK (basesrc); + return TRUE; + } +could_not_start: + { + GST_DEBUG_OBJECT (basesrc, "could not start"); + /* subclass is supposed to post a message. We don't have to call _stop. */ + gst_base_src_start_complete (basesrc, GST_FLOW_ERROR); + return FALSE; + } +} + +/** + * gst_base_src_start_complete: + * @src: base source instance + * @ret: a #GstFlowReturn + * + * Complete an asynchronous start operation. When the subclass overrides the + * start method, it should call gst_base_src_start_complete() when the start + * operation completes either from the same thread or from an asynchronous + * helper thread. + */ +void +gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret) +{ + gboolean have_size; + guint64 size; + gboolean seekable; + GstFormat format; + GstPadMode mode; + GstEvent *event; + + if (ret != GST_FLOW_OK) + goto error; + + GST_DEBUG_OBJECT (basesrc, "starting source"); format = basesrc->segment.format; /* figure out the size */ have_size = FALSE; size = -1; if (format == GST_FORMAT_BYTES) { + GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc); + if (bclass->get_size) { if (!(have_size = bclass->get_size (basesrc, &size))) size = -1; @@ -2899,16 +3045,107 @@ gst_base_src_start (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access); + /* stop flushing now but for live sources, still block in the LIVE lock when + * we are not yet PLAYING */ + gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); + gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc)); - return TRUE; + GST_OBJECT_LOCK (basesrc->srcpad); + mode = GST_PAD_MODE (basesrc->srcpad); + GST_OBJECT_UNLOCK (basesrc->srcpad); - /* ERROR */ -could_not_start: + if (mode == GST_PAD_MODE_PUSH) { + /* do initial seek, which will start the task */ + GST_OBJECT_LOCK (basesrc); + event = basesrc->pending_seek; + basesrc->pending_seek = NULL; + GST_OBJECT_UNLOCK (basesrc); + + /* no need to unlock anything, the task is certainly + * not running here. The perform seek code will start the task when + * finished. */ + if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE))) + goto seek_failed; + + if (event) + gst_event_unref (event); + } else { + /* if not random_access, we cannot operate in pull mode for now */ + if (G_UNLIKELY (!basesrc->random_access)) + goto no_get_range; + } + + GST_LIVE_LOCK (basesrc); + GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED); + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + basesrc->priv->start_result = ret; + GST_LIVE_SIGNAL (basesrc); + GST_LIVE_UNLOCK (basesrc); + + return; + +seek_failed: { - GST_DEBUG_OBJECT (basesrc, "could not start"); - /* subclass is supposed to post a message. We don't have to call _stop. */ - return FALSE; + GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek"); + gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); + if (event) + gst_event_unref (event); + ret = GST_FLOW_ERROR; + goto error; + } +no_get_range: + { + gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); + GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping"); + ret = GST_FLOW_ERROR; + goto error; + } +error: + { + GST_LIVE_LOCK (basesrc); + basesrc->priv->start_result = ret; + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + GST_LIVE_SIGNAL (basesrc); + GST_LIVE_UNLOCK (basesrc); + return; + } +} + +/** + * gst_base_src_start_wait: + * @src: base source instance + * @ret: a #GstFlowReturn + * + * Wait until the start operation completes. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +gst_base_src_start_wait (GstBaseSrc * basesrc) +{ + GstFlowReturn result; + + GST_LIVE_LOCK (basesrc); + if (G_UNLIKELY (basesrc->priv->flushing)) + goto flushing; + + while (GST_BASE_SRC_IS_STARTING (basesrc)) { + GST_LIVE_WAIT (basesrc); + if (G_UNLIKELY (basesrc->priv->flushing)) + goto flushing; + } + result = basesrc->priv->start_result; + GST_LIVE_UNLOCK (basesrc); + + return result; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (basesrc, "we are flushing"); + GST_LIVE_UNLOCK (basesrc); + return GST_FLOW_FLUSHING; } } @@ -2918,21 +3155,37 @@ gst_base_src_stop (GstBaseSrc * basesrc) GstBaseSrcClass *bclass; gboolean result = TRUE; - if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED)) - return TRUE; - GST_DEBUG_OBJECT (basesrc, "stopping source"); + /* flush all */ + gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); + /* stop the task */ + gst_pad_stop_task (basesrc->srcpad); + + GST_LIVE_LOCK (basesrc); + if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc)) + goto was_stopped; + + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED); + basesrc->priv->start_result = GST_FLOW_FLUSHING; + GST_LIVE_SIGNAL (basesrc); + GST_LIVE_UNLOCK (basesrc); + bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->stop) result = bclass->stop (basesrc); gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0); - if (result) - GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); - return result; + +was_stopped: + { + GST_DEBUG_OBJECT (basesrc, "was started"); + GST_LIVE_UNLOCK (basesrc); + return TRUE; + } } /* start or stop flushing dataprocessing @@ -3059,7 +3312,6 @@ static gboolean gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) { GstBaseSrc *basesrc; - GstEvent *event; basesrc = GST_BASE_SRC (parent); @@ -3072,30 +3324,8 @@ gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) if (G_UNLIKELY (!gst_base_src_start (basesrc))) goto error_start; - - basesrc->priv->discont = TRUE; - gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); - - /* do initial seek, which will start the task */ - GST_OBJECT_LOCK (basesrc); - event = basesrc->pending_seek; - basesrc->pending_seek = NULL; - GST_OBJECT_UNLOCK (basesrc); - - /* no need to unlock anything, the task is certainly - * not running here. The perform seek code will start the task when - * finished. */ - if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE))) - goto seek_failed; - - if (event) - gst_event_unref (event); } else { GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode"); - /* flush all */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - /* stop the task */ - gst_pad_stop_task (pad); /* now we can stop the source */ if (G_UNLIKELY (!gst_base_src_stop (basesrc))) goto error_stop; @@ -3113,19 +3343,6 @@ error_start: GST_WARNING_OBJECT (basesrc, "Failed to start in push mode"); return FALSE; } -seek_failed: - { - GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek"); - /* flush all */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - /* stop the task */ - gst_pad_stop_task (pad); - /* Stop the basesrc */ - gst_base_src_stop (basesrc); - if (event) - gst_event_unref (event); - return FALSE; - } error_stop: { GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode"); @@ -3145,19 +3362,8 @@ gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active) GST_DEBUG_OBJECT (basesrc, "Activating in pull mode"); if (G_UNLIKELY (!gst_base_src_start (basesrc))) goto error_start; - - /* if not random_access, we cannot operate in pull mode for now */ - if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc))) - goto no_get_range; - - /* stop flushing now but for live sources, still block in the LIVE lock when - * we are not yet PLAYING */ - gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); } else { GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode"); - /* flush all, there is no task to stop */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - if (G_UNLIKELY (!gst_base_src_stop (basesrc))) goto error_stop; } @@ -3169,12 +3375,6 @@ error_start: GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode"); return FALSE; } -no_get_range: - { - GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping"); - gst_base_src_stop (basesrc); - return FALSE; - } error_stop: { GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode"); @@ -3182,6 +3382,28 @@ error_stop: } } +static gboolean +gst_base_src_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) +{ + gboolean res; + + switch (mode) { + case GST_PAD_MODE_PULL: + res = gst_base_src_activate_pull (pad, parent, active); + break; + case GST_PAD_MODE_PUSH: + res = gst_base_src_activate_push (pad, parent, active); + break; + default: + GST_LOG_OBJECT (pad, "unknown activation mode %d", mode); + res = FALSE; + break; + } + return res; +} + + static GstStateChangeReturn gst_base_src_change_state (GstElement * element, GstStateChange transition) { @@ -3195,6 +3417,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: + basesrc->priv->stream_start_pending = TRUE; no_preroll = gst_base_src_is_live (basesrc); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: @@ -3224,13 +3447,11 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_READY: { - GstEvent **event_p; - /* we don't need to unblock anything here, the pad deactivation code * already did this */ g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); - event_p = &basesrc->pending_seek; - gst_event_replace (event_p, NULL); + gst_event_replace (&basesrc->pending_seek, NULL); + basesrc->priv->stream_start_pending = FALSE; break; } case GST_STATE_CHANGE_READY_TO_NULL: