From: Wim Taymans Date: Wed, 25 May 2005 11:50:11 +0000 (+0000) Subject: gst/: Implement gst_pad_pause/start/stop_task(), take STREAM lock in task function. X-Git-Tag: RELEASE-0_9_2~459 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=add280cd100bfd7a0d84bc1b2681ba8e165cfa3b;p=platform%2Fupstream%2Fgstreamer.git gst/: Implement gst_pad_pause/start/stop_task(), take STREAM lock in task function. Original commit message from CVS: * gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush): * gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push), (gst_basesink_finish_preroll), (gst_basesink_chain), (gst_basesink_loop), (gst_basesink_activate), (gst_basesink_change_state): * gst/base/gstbasesrc.c: (gst_basesrc_do_seek), (gst_basesrc_get_range), (gst_basesrc_loop), (gst_basesrc_activate): * gst/elements/gsttee.c: (gst_tee_sink_activate): * gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init), (gst_real_pad_init), (gst_real_pad_set_property), (gst_real_pad_get_property), (gst_pad_set_active), (gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink), (gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent), (gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps), (gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize), (gst_pad_event_default_dispatch), (gst_pad_event_default), (gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose), (gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain), (gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range), (gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task), (gst_pad_stop_task): * gst/gstpad.h: * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), (gst_queue_src_activate): * gst/gsttask.c: (gst_task_init), (gst_task_set_lock), (gst_task_get_state): * gst/gsttask.h: * gst/schedulers/threadscheduler.c: (gst_thread_scheduler_task_start), (gst_thread_scheduler_func): Implement gst_pad_pause/start/stop_task(), take STREAM lock in task function. Remove ACTIVE pad flag, use FLUSHING everywhere Added _pad_chain(), _pad_get_range() to call chain/getrange functions. Add locks around IS_FLUSHING when reading. Take STREAM lock in chain(), get_range() functions so plugins don't need to take it anymore. --- diff --git a/ChangeLog b/ChangeLog index 9196257..34b566c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,48 @@ 2005-05-25 Wim Taymans + * gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush): + * gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push), + (gst_basesink_finish_preroll), (gst_basesink_chain), + (gst_basesink_loop), (gst_basesink_activate), + (gst_basesink_change_state): + * gst/base/gstbasesrc.c: (gst_basesrc_do_seek), + (gst_basesrc_get_range), (gst_basesrc_loop), + (gst_basesrc_activate): + * gst/elements/gsttee.c: (gst_tee_sink_activate): + * gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init), + (gst_real_pad_init), (gst_real_pad_set_property), + (gst_real_pad_get_property), (gst_pad_set_active), + (gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink), + (gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent), + (gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps), + (gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize), + (gst_pad_event_default_dispatch), (gst_pad_event_default), + (gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose), + (gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain), + (gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range), + (gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task), + (gst_pad_stop_task): + * gst/gstpad.h: + * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain), + (gst_queue_loop), (gst_queue_src_activate): + * gst/gsttask.c: (gst_task_init), (gst_task_set_lock), + (gst_task_get_state): + * gst/gsttask.h: + * gst/schedulers/threadscheduler.c: + (gst_thread_scheduler_task_start), (gst_thread_scheduler_func): + Implement gst_pad_pause/start/stop_task(), take STREAM lock + in task function. + Remove ACTIVE pad flag, use FLUSHING everywhere + Added _pad_chain(), _pad_get_range() to call chain/getrange + functions. + Add locks around IS_FLUSHING when reading. + Take STREAM lock in chain(), get_range() functions so plugins + don't need to take it anymore. + + + +2005-05-25 Wim Taymans + * tools/gst-launch.c: (event_loop): Unref message after using its contents instead of before. diff --git a/gst/base/gstadapter.c b/gst/base/gstadapter.c index 993c78c..09239d0 100644 --- a/gst/base/gstadapter.c +++ b/gst/base/gstadapter.c @@ -160,7 +160,7 @@ gst_adapter_peek (GstAdapter * adapter, guint size) if (adapter->assembled_size < size) { adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE; - GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n", + GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u", adapter->assembled_size); adapter->assembled_data = g_realloc (adapter->assembled_data, adapter->assembled_size); @@ -198,7 +198,7 @@ gst_adapter_flush (GstAdapter * adapter, guint flush) g_return_if_fail (flush > 0); g_return_if_fail (flush <= adapter->size); - GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush); + GST_LOG_OBJECT (adapter, "flushing %u bytes", flush); adapter->size -= flush; adapter->assembled_len = 0; while (flush > 0) { diff --git a/gst/base/gstbasesink.c b/gst/base/gstbasesink.c index e677f0e..b8604c8 100644 --- a/gst/base/gstbasesink.c +++ b/gst/base/gstbasesink.c @@ -378,6 +378,8 @@ gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad, if (basesink->preroll_queue->length == 0) { GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink); + GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); if (bclass->preroll) bclass->preroll (basesink, buffer); } @@ -448,7 +450,7 @@ PrerollReturn gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad, GstBuffer * buffer) { - gboolean usable; + gboolean flushing; DEBUG ("finish preroll %p <\n", basesink); /* lock order is important */ @@ -461,13 +463,13 @@ gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad, gst_element_commit_state (GST_ELEMENT (basesink)); GST_STATE_UNLOCK (basesink); + gst_basesink_preroll_queue_push (basesink, pad, buffer); + GST_LOCK (pad); - usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad); + flushing = GST_RPAD_IS_FLUSHING (pad); GST_UNLOCK (pad); - if (!usable) - goto unusable; - - gst_basesink_preroll_queue_push (basesink, pad, buffer); + if (flushing) + goto flushing; if (basesink->need_preroll) goto still_queueing; @@ -490,7 +492,7 @@ no_preroll: GST_STATE_UNLOCK (basesink); return PREROLL_PLAYING; } -unusable: +flushing: { GST_DEBUG ("pad is flushing"); GST_PREROLL_UNLOCK (pad); @@ -726,12 +728,8 @@ gst_basesink_chain (GstPad * pad, GstBuffer * buf) g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode == GST_ACTIVATE_PUSH); - GST_STREAM_LOCK (pad); - result = gst_basesink_chain_unlocked (pad, buf); - GST_STREAM_UNLOCK (pad); - return result; } @@ -748,8 +746,6 @@ gst_basesink_loop (GstPad * pad) g_assert (basesink->pad_mode == GST_ACTIVATE_PULL); - GST_STREAM_LOCK (pad); - result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf); if (result != GST_FLOW_OK) goto paused; @@ -759,12 +755,10 @@ gst_basesink_loop (GstPad * pad) goto paused; /* default */ - GST_STREAM_UNLOCK (pad); return; paused: - gst_task_pause (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); + gst_pad_pause_task (pad); return; } @@ -787,16 +781,8 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode) /* if we have a scheduler we can start the task */ g_return_val_if_fail (basesink->has_loop, FALSE); gst_pad_peer_set_active (pad, mode); - if (GST_ELEMENT_SCHEDULER (basesink)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink), - (GstTaskFunction) gst_basesink_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = + gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad); break; case GST_ACTIVATE_NONE: /* step 1, unblock clock sync (if any) or any other blocking thing */ @@ -816,16 +802,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode) GST_PREROLL_UNLOCK (pad); /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - /* step 3, stop the task */ - if (GST_RPAD_TASK (pad)) { - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); break; } basesink->pad_mode = mode; @@ -911,9 +888,6 @@ gst_basesink_change_state (GstElement * element) basesink->have_preroll = FALSE; GST_PREROLL_UNLOCK (basesink->sinkpad); - /* make sure the element is finished processing */ - GST_STREAM_LOCK (basesink->sinkpad); - GST_STREAM_UNLOCK (basesink->sinkpad); /* clear EOS state */ basesink->eos = FALSE; break; diff --git a/gst/base/gstbasesrc.c b/gst/base/gstbasesrc.c index 43ddf5a..5ac5adb 100644 --- a/gst/base/gstbasesrc.c +++ b/gst/base/gstbasesrc.c @@ -333,9 +333,8 @@ gst_basesrc_do_seek (GstBaseSrc * src, GstEvent * event) } /* and restart the task */ - if (GST_RPAD_TASK (src->srcpad)) { - gst_task_start (GST_RPAD_TASK (src->srcpad)); - } + gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop, + src->srcpad); GST_STREAM_UNLOCK (src->srcpad); gst_event_unref (event); @@ -447,7 +446,7 @@ gst_basesrc_get_property (GObject * object, guint prop_id, GValue * value, } static GstFlowReturn -gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length, +gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length, GstBuffer ** buf) { GstFlowReturn ret; @@ -499,21 +498,6 @@ unexpected_length: } } -static GstFlowReturn -gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length, - GstBuffer ** ret) -{ - GstFlowReturn fret; - - GST_STREAM_LOCK (pad); - - fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret); - - GST_STREAM_UNLOCK (pad); - - return fret; -} - static gboolean gst_basesrc_check_get_range (GstPad * pad) { @@ -538,9 +522,7 @@ gst_basesrc_loop (GstPad * pad) src = GST_BASESRC (GST_OBJECT_PARENT (pad)); - GST_STREAM_LOCK (pad); - - ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf); + ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf); if (ret != GST_FLOW_OK) goto eos; @@ -550,22 +532,19 @@ gst_basesrc_loop (GstPad * pad) if (ret != GST_FLOW_OK) goto pause; - GST_STREAM_UNLOCK (pad); return; eos: { GST_DEBUG_OBJECT (src, "going to EOS"); - gst_task_pause (GST_RPAD_TASK (pad)); + gst_pad_pause_task (pad); gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS)); - GST_STREAM_UNLOCK (pad); return; } pause: { GST_DEBUG_OBJECT (src, "pausing task"); - gst_task_pause (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); + gst_pad_pause_task (pad); return; } } @@ -733,17 +712,8 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode) result = FALSE; switch (mode) { case GST_ACTIVATE_PUSH: - /* if we have a scheduler we can start the task */ - if (GST_ELEMENT_SCHEDULER (basesrc)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc), - (GstTaskFunction) gst_basesrc_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = + gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad); break; case GST_ACTIVATE_PULL: result = TRUE; @@ -753,16 +723,7 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode) gst_basesrc_unlock (basesrc); /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - /* step 3, stop the task */ - if (GST_RPAD_TASK (pad)) { - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); break; } return result; diff --git a/gst/elements/gsttee.c b/gst/elements/gsttee.c index cd7213c..9806606 100644 --- a/gst/elements/gsttee.c +++ b/gst/elements/gsttee.c @@ -370,27 +370,10 @@ gst_tee_sink_activate (GstPad * pad, GstActivateMode mode) break; case GST_ACTIVATE_PULL: g_return_val_if_fail (tee->has_sink_loop, FALSE); - if (GST_ELEMENT_SCHEDULER (tee)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee), - (GstTaskFunction) gst_tee_loop, pad); - - gst_pad_start_task (pad); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad); break; case GST_ACTIVATE_NONE: - GST_STREAM_LOCK (pad); - if (GST_RPAD_TASK (pad)) { - gst_pad_stop_task (pad); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); break; } tee->sink_mode = mode; diff --git a/gst/gstpad.c b/gst/gstpad.c index 8379f43..b8d4cbb 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -126,7 +126,7 @@ gst_pad_init (GstPad * pad) static void gst_pad_dispose (GObject * object) { - GstPad *pad = GST_PAD (object); + GstPad *pad = GST_PAD_CAST (object); gst_pad_set_pad_template (pad, NULL); /* FIXME, we have links to many other things like caps @@ -152,8 +152,7 @@ enum { REAL_ARG_0, REAL_ARG_CAPS, - REAL_ARG_ACTIVE - /* FILL ME */ + /* FILL ME */ }; static void gst_real_pad_class_init (GstRealPadClass * klass); @@ -219,9 +218,6 @@ gst_real_pad_class_init (GstRealPadClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRealPadClass, request_link), NULL, NULL, gst_marshal_VOID__OBJECT, G_TYPE_NONE, 0); - g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_ACTIVE, - g_param_spec_boolean ("active", "Active", "Whether the pad is active.", - TRUE, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_CAPS, g_param_spec_boxed ("caps", "Caps", "The capabilities of the pad", GST_TYPE_CAPS, G_PARAM_READABLE)); @@ -251,7 +247,7 @@ gst_real_pad_init (GstRealPad * pad) pad->queryfunc = gst_pad_query_default; pad->intlinkfunc = gst_pad_get_internal_links_default; - GST_FLAG_UNSET (pad, GST_PAD_ACTIVE); + GST_RPAD_UNSET_FLUSHING (pad); pad->preroll_lock = g_mutex_new (); pad->preroll_cond = g_cond_new (); @@ -271,10 +267,6 @@ gst_real_pad_set_property (GObject * object, guint prop_id, g_return_if_fail (GST_IS_PAD (object)); switch (prop_id) { - case REAL_ARG_ACTIVE: - g_warning ("FIXME: not useful any more!!!"); - gst_pad_set_active (GST_PAD (object), g_value_get_boolean (value)); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -288,9 +280,6 @@ gst_real_pad_get_property (GObject * object, guint prop_id, g_return_if_fail (GST_IS_PAD (object)); switch (prop_id) { - case REAL_ARG_ACTIVE: - g_value_set_boolean (value, GST_FLAG_IS_SET (object, GST_PAD_ACTIVE)); - break; case REAL_ARG_CAPS: g_value_set_boxed (value, GST_PAD_CAPS (GST_REAL_PAD (object))); break; @@ -451,7 +440,7 @@ gboolean gst_pad_set_active (GstPad * pad, GstActivateMode mode) { GstRealPad *realpad; - gboolean old; + GstActivateMode old; GstPadActivateFunction activatefunc; gboolean active; @@ -460,17 +449,17 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode) GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad); active = GST_PAD_MODE_ACTIVATE (mode); - old = GST_PAD_IS_ACTIVE (realpad); + old = GST_RPAD_ACTIVATE_MODE (realpad); /* if nothing changed, we can just exit */ - if (G_UNLIKELY (old == active)) + if (G_UNLIKELY (old == mode)) goto was_ok; /* make sure data is disallowed when going inactive */ if (!active) { GST_CAT_DEBUG (GST_CAT_PADS, "de-activating pad %s:%s", GST_DEBUG_PAD_NAME (realpad)); - GST_FLAG_UNSET (realpad, GST_PAD_ACTIVE); + GST_RPAD_SET_FLUSHING (realpad); /* unlock blocked pads so element can resume and stop */ GST_PAD_BLOCK_SIGNAL (realpad); } @@ -510,16 +499,24 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode) GST_LOCK (realpad); if (result == FALSE) goto activate_error; + + /* store the mode */ + GST_RPAD_ACTIVATE_MODE (realpad) = mode; } /* when going to active allow data passing now */ if (active) { GST_CAT_DEBUG (GST_CAT_PADS, "activating pad %s:%s in mode %d", GST_DEBUG_PAD_NAME (realpad), mode); - GST_FLAG_SET (realpad, GST_PAD_ACTIVE); - } - GST_UNLOCK (realpad); + GST_RPAD_UNSET_FLUSHING (realpad); + GST_UNLOCK (realpad); + } else { + GST_UNLOCK (realpad); + /* and make streaming finish */ + GST_STREAM_LOCK (realpad); + GST_STREAM_UNLOCK (realpad); + } return TRUE; was_ok: @@ -607,7 +604,7 @@ gst_pad_is_active (GstPad * pad) g_return_val_if_fail (GST_IS_PAD (pad), FALSE); GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad); - result = GST_FLAG_IS_SET (realpad, GST_PAD_ACTIVE); + result = GST_PAD_MODE_ACTIVATE (GST_RPAD_ACTIVATE_MODE (realpad)); GST_UNLOCK (realpad); return result; @@ -933,7 +930,7 @@ gst_pad_get_query_types (GstPad * pad) if (G_UNLIKELY ((func = GST_RPAD_QUERYTYPEFUNC (rpad)) == NULL)) goto no_func; - return func (GST_PAD (rpad)); + return func (GST_PAD_CAST (rpad)); no_func: { @@ -1187,10 +1184,10 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad) goto not_linked_together; if (GST_RPAD_UNLINKFUNC (realsrc)) { - GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD (realsrc)); + GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD_CAST (realsrc)); } if (GST_RPAD_UNLINKFUNC (realsink)) { - GST_RPAD_UNLINKFUNC (realsink) (GST_PAD (realsink)); + GST_RPAD_UNLINKFUNC (realsink) (GST_PAD_CAST (realsink)); } /* first clear peers */ @@ -1329,7 +1326,8 @@ gst_pad_link_prepare (GstPad * srcpad, GstPad * sinkpad, if (G_UNLIKELY (GST_RPAD_PEER (realsink) != NULL)) goto sink_was_linked; - if ((GST_PAD (realsrc) != srcpad) || (GST_PAD (realsink) != sinkpad)) { + if ((GST_PAD_CAST (realsrc) != srcpad) + || (GST_PAD_CAST (realsink) != sinkpad)) { GST_CAT_INFO (GST_CAT_PADS, "*actually* linking %s:%s and %s:%s", GST_DEBUG_PAD_NAME (realsrc), GST_DEBUG_PAD_NAME (realsink)); } @@ -1427,12 +1425,14 @@ gst_pad_link (GstPad * srcpad, GstPad * sinkpad) if (GST_RPAD_LINKFUNC (realsrc)) { /* this one will call the peer link function */ result = - GST_RPAD_LINKFUNC (realsrc) (GST_PAD (realsrc), GST_PAD (realsink)); + GST_RPAD_LINKFUNC (realsrc) (GST_PAD_CAST (realsrc), + GST_PAD_CAST (realsink)); } else if (GST_RPAD_LINKFUNC (realsink)) { /* if no source link function, we need to call the sink link * function ourselves. */ result = - GST_RPAD_LINKFUNC (realsink) (GST_PAD (realsink), GST_PAD (realsrc)); + GST_RPAD_LINKFUNC (realsink) (GST_PAD_CAST (realsink), + GST_PAD_CAST (realsrc)); } else { result = GST_PAD_LINK_OK; } @@ -1530,7 +1530,7 @@ gst_pad_get_real_parent (GstPad * pad) GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad); element = GST_PAD_PARENT (realpad); if (element) - gst_object_ref (GST_OBJECT (element)); + gst_object_ref (GST_OBJECT_CAST (element)); GST_UNLOCK (realpad); return element; @@ -1586,7 +1586,7 @@ gst_real_pad_get_caps_unlocked (GstRealPad * realpad) GST_FLAG_SET (realpad, GST_PAD_IN_GETCAPS); GST_UNLOCK (realpad); - result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD (realpad)); + result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD_CAST (realpad)); GST_LOCK (realpad); GST_FLAG_UNSET (realpad, GST_PAD_IN_GETCAPS); @@ -1725,12 +1725,12 @@ gst_pad_peer_get_caps (GstPad * pad) if (G_UNLIKELY (GST_RPAD_IS_IN_GETCAPS (peerpad))) goto was_dispatching; - gst_object_ref (GST_OBJECT (peerpad)); + gst_object_ref (GST_OBJECT_CAST (peerpad)); GST_UNLOCK (realpad); result = gst_pad_get_caps (GST_PAD_CAST (peerpad)); - gst_object_unref (GST_OBJECT (peerpad)); + gst_object_unref (GST_OBJECT_CAST (peerpad)); return result; @@ -2029,7 +2029,7 @@ gst_pad_get_peer (GstPad * pad) GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad); result = GST_RPAD_PEER (realpad); if (result) - gst_object_ref (GST_OBJECT (result)); + gst_object_ref (GST_OBJECT_CAST (result)); GST_UNLOCK (realpad); return GST_PAD_CAST (result); @@ -2072,11 +2072,11 @@ gst_pad_realize (GstPad * pad) GST_LOCK (pad); result = GST_PAD_REALIZE (pad); if (result && pad != GST_PAD_CAST (result)) { - gst_object_ref (GST_OBJECT (result)); + gst_object_ref (GST_OBJECT_CAST (result)); GST_UNLOCK (pad); /* no other thread could dispose this since we * hold at least one ref */ - gst_object_unref (GST_OBJECT (pad)); + gst_object_unref (GST_OBJECT_CAST (pad)); } else { GST_UNLOCK (pad); } @@ -2385,7 +2385,7 @@ gst_pad_event_default_dispatch (GstPad * pad, GstEvent * event) orig = pads = gst_pad_get_internal_links (pad); while (pads) { - GstPad *eventpad = GST_PAD (pads->data); + GstPad *eventpad = GST_PAD_CAST (pads->data); pads = g_list_next (pads); @@ -2444,7 +2444,7 @@ gst_pad_event_default (GstPad * pad, GstEvent * event) if (GST_RPAD_TASK (rpad)) { GST_DEBUG_OBJECT (rpad, "pausing task because of eos"); - gst_task_pause (GST_RPAD_TASK (rpad)); + gst_pad_pause_task (GST_PAD_CAST (rpad)); } } default: @@ -2484,7 +2484,7 @@ gst_pad_dispatcher (GstPad * pad, GstPadDispatcherFunction dispatch, GstRealPad *int_peer = GST_RPAD_PEER (int_rpad); if (int_peer) { - res = dispatch (GST_PAD (int_peer), data); + res = dispatch (GST_PAD_CAST (int_peer), data); if (res) break; } @@ -2561,7 +2561,7 @@ gst_real_pad_dispose (GObject * object) GstPad *pad; GstRealPad *rpad; - pad = GST_PAD (object); + pad = GST_PAD_CAST (object); rpad = GST_REAL_PAD (object); /* No linked pad can ever be disposed. @@ -2582,7 +2582,7 @@ gst_real_pad_dispose (GObject * object) orig = ghostpads = g_list_copy (rpad->ghostpads); while (ghostpads) { - GstPad *ghostpad = GST_PAD (ghostpads->data); + GstPad *ghostpad = GST_PAD_CAST (ghostpads->data); if (GST_IS_ELEMENT (GST_OBJECT_PARENT (ghostpad))) { GstElement *parent = GST_ELEMENT (GST_OBJECT_PARENT (ghostpad)); @@ -2738,7 +2738,7 @@ gst_pad_save_thyself (GstObject * object, xmlNodePtr parent) if (GST_RPAD_PEER (realpad) != NULL) { gchar *content; - peer = GST_PAD (GST_RPAD_PEER (realpad)); + peer = GST_PAD_CAST (GST_RPAD_PEER (realpad)); /* first check to see if the peer's parent's parent is the same */ /* we just save it off */ content = g_strdup_printf ("%s.%s", @@ -2793,7 +2793,7 @@ handle_pad_block (GstRealPad * pad) "signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad)); /* need to grab extra ref for the callbacks */ - gst_object_ref (GST_OBJECT (pad)); + gst_object_ref (GST_OBJECT_CAST (pad)); callback = pad->block_callback; if (callback) { @@ -2820,115 +2820,88 @@ handle_pad_block (GstRealPad * pad) GST_PAD_BLOCK_SIGNAL (pad); } - gst_object_unref (GST_OBJECT (pad)); + gst_object_unref (GST_OBJECT_CAST (pad)); } /********************************************************************** * Data passing functions */ - /** - * gst_pad_push: - * @pad: a source #GstPad. - * @buffer: the #GstBuffer to push. + * gst_pad_chain: + * @pad: a sink #GstPad. + * @buffer: the #GstBuffer to send. * - * Pushes a buffer to the peer of @pad. @pad must be linked. + * Chain a buffer to @pad. The pad has to be a GstRealPad. * - * Returns: a #GstFlowReturn from the peer pad. + * Returns: a #GstFlowReturn from the pad. * * MT safe. */ GstFlowReturn -gst_pad_push (GstPad * pad, GstBuffer * buffer) +gst_pad_chain (GstPad * pad, GstBuffer * buffer) { - GstRealPad *peer; - GstFlowReturn ret; - GstPadChainFunction chainfunc; GstCaps *caps; gboolean caps_changed; + GstPadChainFunction chainfunc; + GstFlowReturn ret; g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR); - g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC, + g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK, GST_FLOW_ERROR); g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + GST_STREAM_LOCK (pad); GST_LOCK (pad); - while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad))) - handle_pad_block (GST_REAL_PAD_CAST (pad)); - - if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL)) - goto not_linked; - - if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer))) - goto not_active; - - if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer))) + if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad))) goto flushing; - gst_object_ref (GST_OBJECT_CAST (peer)); + caps = GST_BUFFER_CAPS (buffer); + caps_changed = caps && caps != GST_RPAD_CAPS (pad); GST_UNLOCK (pad); - /* FIXME, move capnego this into a base class? */ - caps = GST_BUFFER_CAPS (buffer); - caps_changed = caps && caps != GST_RPAD_CAPS (peer); - /* we got a new datatype on the peer pad, see if it can handle it */ + /* we got a new datatype on the pad, see if it can handle it */ if (G_UNLIKELY (caps_changed)) { GST_DEBUG ("caps changed to %" GST_PTR_FORMAT, caps); - if (G_UNLIKELY (!gst_pad_configure_sink (GST_PAD_CAST (peer), caps))) + if (G_UNLIKELY (!gst_pad_configure_sink (pad, caps))) goto not_negotiated; } - /* NOTE: we read the peer chainfunc unlocked. - * we cannot hold the lock for the peer so we might send + /* NOTE: we read the chainfunc unlocked. + * we cannot hold the lock for the pad so we might send * the data to the wrong function. This is not really a * problem since functions are assigned at creation time * and don't change that often... */ - if (G_UNLIKELY ((chainfunc = peer->chainfunc) == NULL)) + if (G_UNLIKELY ((chainfunc = GST_RPAD_CHAINFUNC (pad)) == NULL)) goto no_function; GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "calling chainfunction &%s of peer pad %s:%s", - GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (peer)); + "calling chainfunction &%s of pad %s:%s", + GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (pad)); - ret = chainfunc (GST_PAD_CAST (peer), buffer); - - gst_object_unref (GST_OBJECT_CAST (peer)); + ret = chainfunc (pad, buffer); + GST_STREAM_UNLOCK (pad); return ret; - /* ERROR recovery here */ -not_linked: - { - gst_buffer_unref (buffer); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pushing, but it was not linked"); - GST_UNLOCK (pad); - return GST_FLOW_NOT_CONNECTED; - } -not_active: - { - gst_buffer_unref (buffer); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pushing, but it was inactive"); - GST_UNLOCK (pad); - return GST_FLOW_WRONG_STATE; - } + /* ERRORS */ flushing: { gst_buffer_unref (buffer); GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pushing, but pad was flushing"); GST_UNLOCK (pad); + GST_STREAM_UNLOCK (pad); return GST_FLOW_UNEXPECTED; } not_negotiated: { gst_buffer_unref (buffer); GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pushing buffer but peer did not accept"); + "pushing buffer but pad did not accept"); + GST_STREAM_UNLOCK (pad); return GST_FLOW_NOT_NEGOTIATED; } no_function: @@ -2937,14 +2910,64 @@ no_function: GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pushing, but not chainhandler"); GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL), - ("push on pad %s:%s but the peer pad %s:%s has no chainfunction", - GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer))); - gst_object_unref (GST_OBJECT (peer)); + ("push on pad %s:%s but it has no chainfunction", + GST_DEBUG_PAD_NAME (pad))); + GST_STREAM_UNLOCK (pad); return GST_FLOW_ERROR; } } /** + * gst_pad_push: + * @pad: a source #GstPad. + * @buffer: the #GstBuffer to push. + * + * Pushes a buffer to the peer of @pad. @pad must be linked. + * + * Returns: a #GstFlowReturn from the peer pad. + * + * MT safe. + */ +GstFlowReturn +gst_pad_push (GstPad * pad, GstBuffer * buffer) +{ + GstRealPad *peer; + GstFlowReturn ret; + + g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR); + g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC, + GST_FLOW_ERROR); + g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + + GST_LOCK (pad); + while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad))) + handle_pad_block (GST_REAL_PAD_CAST (pad)); + + if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL)) + goto not_linked; + + gst_object_ref (GST_OBJECT_CAST (peer)); + GST_UNLOCK (pad); + + ret = gst_pad_chain (GST_PAD_CAST (peer), buffer); + + gst_object_unref (GST_OBJECT_CAST (peer)); + + return ret; + + /* ERROR recovery here */ +not_linked: + { + gst_buffer_unref (buffer); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "pushing, but it was not linked"); + GST_UNLOCK (pad); + return GST_FLOW_NOT_CONNECTED; + } +} + +/** * gst_pad_check_pull_range: * @pad: a sink #GstRealPad. * @@ -3010,6 +3033,73 @@ not_connected: } /** + * gst_pad_get_range: + * @pad: a src #GstPad. + * @buffer: a pointer to hold the #GstBuffer. + * @offset: The start offset of the buffer + * @length: The length of the buffer + * + * Calls the getrange function of @pad. + * + * Returns: a #GstFlowReturn from the pad. + * + * MT safe. + */ +GstFlowReturn +gst_pad_get_range (GstPad * pad, guint64 offset, guint size, + GstBuffer ** buffer) +{ + GstFlowReturn ret; + GstPadGetRangeFunction getrangefunc; + + g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR); + g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC, + GST_FLOW_ERROR); + g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); + + GST_STREAM_LOCK (pad); + + GST_LOCK (pad); + if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad))) + goto flushing; + GST_UNLOCK (pad); + + if (G_UNLIKELY ((getrangefunc = GST_RPAD_GETRANGEFUNC (pad)) == NULL)) + goto no_function; + + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "calling getrangefunc %s of peer pad %s:%s, offset %" + G_GUINT64_FORMAT ", size %u", + GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (pad), + offset, size); + + ret = getrangefunc (pad, offset, size, buffer); + + GST_STREAM_UNLOCK (pad); + + return ret; + + /* ERRORS */ +flushing: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "pulling range, but pad was flushing"); + GST_UNLOCK (pad); + GST_STREAM_UNLOCK (pad); + return GST_FLOW_UNEXPECTED; + } +no_function: + { + GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL), + ("pullrange on pad %s:%s but it has no getrangefunction", + GST_DEBUG_PAD_NAME (pad))); + GST_STREAM_UNLOCK (pad); + return GST_FLOW_ERROR; + } +} + + +/** * gst_pad_pull_range: * @pad: a sink #GstPad. * @buffer: a pointer to hold the #GstBuffer. @@ -3028,7 +3118,6 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, { GstRealPad *peer; GstFlowReturn ret; - GstPadGetRangeFunction getrangefunc; g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK, @@ -3043,26 +3132,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL)) goto not_connected; - if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer))) - goto not_active; - - if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer))) - goto flushing; - gst_object_ref (GST_OBJECT_CAST (peer)); GST_UNLOCK (pad); - /* see note in above function */ - if (G_UNLIKELY ((getrangefunc = peer->getrangefunc) == NULL)) - goto no_function; - - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "calling getrangefunc %s of peer pad %s:%s, offset %" - G_GUINT64_FORMAT ", size %u", - GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (peer), - offset, size); - - ret = getrangefunc (GST_PAD_CAST (peer), offset, size, buffer); + ret = gst_pad_get_range (GST_PAD_CAST (peer), offset, size, buffer); gst_object_unref (GST_OBJECT_CAST (peer)); @@ -3076,28 +3149,6 @@ not_connected: GST_UNLOCK (pad); return GST_FLOW_NOT_CONNECTED; } -not_active: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pulling range, but it was inactive"); - GST_UNLOCK (pad); - return GST_FLOW_WRONG_STATE; - } -flushing: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pulling range, but pad was flushing"); - GST_UNLOCK (pad); - return GST_FLOW_UNEXPECTED; - } -no_function: - { - GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL), - ("pullrange on pad %s:%s but the peer pad %s:%s has no getrangefunction", - GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer))); - gst_object_unref (GST_OBJECT (peer)); - return GST_FLOW_ERROR; - } } /** @@ -3105,7 +3156,9 @@ no_function: * @pad: a #GstPad to push the event to. * @event: the #GstEvent to send to the pad. * - * Sends the event to the peer of the given pad. + * Sends the event to the peer of the given pad. This function is + * mainly used by elements to send events to their peer + * elements. * * Returns: TRUE if the event was handled. * @@ -3157,47 +3210,73 @@ gboolean gst_pad_send_event (GstPad * pad, GstEvent * event) { gboolean result = FALSE; - GstRealPad *rpad; + GstRealPad *realpad; GstPadEventFunction eventfunc; g_return_val_if_fail (GST_IS_PAD (pad), FALSE); g_return_val_if_fail (event != NULL, FALSE); - rpad = GST_PAD_REALIZE (pad); + GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad); if (GST_EVENT_SRC (event) == NULL) - GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT (rpad)); + GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT_CAST (realpad)); GST_CAT_DEBUG (GST_CAT_EVENT, "have event type %d on pad %s:%s", - GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (rpad)); - - if (GST_PAD_IS_SINK (pad)) { - if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH) { - GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event"); - GST_LOCK (pad); - if (GST_EVENT_FLUSH_DONE (event)) { - GST_CAT_DEBUG (GST_CAT_EVENT, "clear flush flag"); - GST_FLAG_UNSET (pad, GST_PAD_FLUSHING); - } else { - GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag"); - GST_FLAG_SET (pad, GST_PAD_FLUSHING); - } - GST_UNLOCK (pad); + GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (realpad)); + + if (GST_PAD_IS_SINK (realpad)) { + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH: + GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event"); + if (GST_EVENT_FLUSH_DONE (event)) { + GST_RPAD_UNSET_FLUSHING (realpad); + GST_CAT_DEBUG (GST_CAT_EVENT, "cleared flush flag"); + } else { + /* can't even accept a flush begin event when flushing */ + if (GST_RPAD_IS_FLUSHING (realpad)) + goto flushing; + GST_RPAD_SET_FLUSHING (realpad); + GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag"); + } + break; + default: + if (GST_RPAD_IS_FLUSHING (realpad)) + goto flushing; + break; } } - if ((eventfunc = GST_RPAD_EVENTFUNC (rpad)) == NULL) + if ((eventfunc = GST_RPAD_EVENTFUNC (realpad)) == NULL) goto no_function; - result = eventfunc (GST_PAD_CAST (rpad), event); + gst_object_ref (GST_OBJECT_CAST (realpad)); + GST_UNLOCK (realpad); + + result = eventfunc (GST_PAD_CAST (realpad), event); + + gst_object_unref (GST_OBJECT_CAST (realpad)); return result; /* ERROR handling */ +lost_ghostpad: + { + GST_CAT_DEBUG (GST_CAT_EVENT, "lost ghostpad"); + gst_event_unref (event); + return FALSE; + } no_function: { g_warning ("pad %s:%s has no event handler, file a bug.", - GST_DEBUG_PAD_NAME (rpad)); + GST_DEBUG_PAD_NAME (realpad)); + GST_UNLOCK (realpad); + gst_event_unref (event); + return FALSE; + } +flushing: + { + GST_UNLOCK (realpad); + GST_CAT_DEBUG (GST_CAT_EVENT, "received event on flushing pad"); gst_event_unref (event); return FALSE; } @@ -3444,24 +3523,107 @@ gst_pad_get_element_private (GstPad * pad) } gboolean -gst_pad_start_task (GstPad * pad) +gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data) { - g_warning ("implement gst_pad_start_task()"); - return FALSE; + GstElement *parent; + GstScheduler *sched; + GstTask *task; + + g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + g_return_val_if_fail (func != NULL, FALSE); + + GST_LOCK (pad); + parent = GST_PAD_PARENT (pad); + + if (parent == NULL || !GST_IS_ELEMENT (parent)) + goto no_parent; + + sched = GST_ELEMENT_SCHEDULER (parent); + if (sched == NULL) + goto no_sched; + + task = GST_RPAD_TASK (pad); + if (task == NULL) { + task = gst_scheduler_create_task (sched, func, data); + gst_task_set_lock (task, GST_STREAM_GET_LOCK (pad)); + + GST_RPAD_TASK (pad) = task; + } + GST_UNLOCK (pad); + + gst_task_start (task); + + return TRUE; + + /* ERRORS */ +no_parent: + { + GST_UNLOCK (pad); + GST_DEBUG ("no parent"); + return FALSE; + } +no_sched: + { + GST_UNLOCK (pad); + GST_DEBUG ("no scheduler"); + return FALSE; + } } gboolean gst_pad_pause_task (GstPad * pad) { - g_warning ("implement gst_pad_pause_task()"); - return FALSE; + GstTask *task; + + g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + + GST_LOCK (pad); + task = GST_RPAD_TASK (pad); + if (task == NULL) + goto no_task; + gst_task_pause (task); + GST_UNLOCK (pad); + + GST_STREAM_LOCK (pad); + GST_STREAM_UNLOCK (pad); + + return TRUE; + +no_task: + { + GST_UNLOCK (pad); + return TRUE; + } } gboolean gst_pad_stop_task (GstPad * pad) { - g_warning ("implement gst_pad_stop_task()"); - return FALSE; + GstTask *task; + + g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + + GST_LOCK (pad); + task = GST_RPAD_TASK (pad); + if (task == NULL) + goto no_task; + GST_RPAD_TASK (pad) = NULL; + GST_UNLOCK (pad); + + gst_task_stop (task); + + GST_STREAM_LOCK (pad); + GST_STREAM_UNLOCK (pad); + + gst_object_unref (GST_OBJECT_CAST (task)); + + return TRUE; + +no_task: + { + GST_UNLOCK (pad); + return TRUE; + } } diff --git a/gst/gstpad.h b/gst/gstpad.h index d8fcd9f..915db75 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -174,8 +174,7 @@ typedef enum { } GstPadDirection; typedef enum { - GST_PAD_ACTIVE = GST_OBJECT_FLAG_LAST, - GST_PAD_BLOCKED, + GST_PAD_BLOCKED = GST_OBJECT_FLAG_LAST, GST_PAD_FLUSHING, GST_PAD_IN_GETCAPS, GST_PAD_IN_SETCAPS, @@ -242,6 +241,8 @@ struct _GstRealPad { GstPadGetRangeFunction getrangefunc; GstPadEventFunction eventfunc; + GstActivateMode mode; + /* ghostpads */ GList *ghostpads; guint32 ghostpads_cookie; @@ -304,6 +305,7 @@ struct _GstGhostPadClass { #define GST_RPAD_CHECKGETRANGEFUNC(pad) (GST_REAL_PAD_CAST(pad)->checkgetrangefunc) #define GST_RPAD_GETRANGEFUNC(pad) (GST_REAL_PAD_CAST(pad)->getrangefunc) #define GST_RPAD_EVENTFUNC(pad) (GST_REAL_PAD_CAST(pad)->eventfunc) +#define GST_RPAD_ACTIVATE_MODE(pad) (GST_REAL_PAD_CAST(pad)->mode) #define GST_RPAD_QUERYTYPEFUNC(pad) (GST_REAL_PAD_CAST(pad)->querytypefunc) #define GST_RPAD_QUERYFUNC(pad) (GST_REAL_PAD_CAST(pad)->queryfunc) #define GST_RPAD_INTLINKFUNC(pad) (GST_REAL_PAD_CAST(pad)->intlinkfunc) @@ -321,16 +323,18 @@ struct _GstGhostPadClass { #define GST_RPAD_BUFFERALLOCFUNC(pad) (GST_REAL_PAD_CAST(pad)->bufferallocfunc) #define GST_RPAD_IS_LINKED(pad) (GST_RPAD_PEER(pad) != NULL) -#define GST_RPAD_IS_ACTIVE(pad) (GST_FLAG_IS_SET (pad, GST_PAD_ACTIVE)) #define GST_RPAD_IS_BLOCKED(pad) (GST_FLAG_IS_SET (pad, GST_PAD_BLOCKED)) #define GST_RPAD_IS_FLUSHING(pad) (GST_FLAG_IS_SET (pad, GST_PAD_FLUSHING)) #define GST_RPAD_IS_IN_GETCAPS(pad) (GST_FLAG_IS_SET (pad, GST_PAD_IN_GETCAPS)) #define GST_RPAD_IS_IN_SETCAPS(pad) (GST_FLAG_IS_SET (pad, GST_PAD_IN_SETCAPS)) #define GST_RPAD_IS_USABLE(pad) (GST_RPAD_IS_LINKED (pad) && \ - GST_RPAD_IS_ACTIVE(pad) && GST_RPAD_IS_ACTIVE(GST_RPAD_PEER (pad))) + !GST_RPAD_IS_FLUSHING(pad) && !GST_RPAD_IS_FLUSHING(GST_RPAD_PEER (pad))) #define GST_RPAD_IS_SRC(pad) (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC) #define GST_RPAD_IS_SINK(pad) (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) +#define GST_RPAD_SET_FLUSHING(pad) (GST_FLAG_SET (pad, GST_PAD_FLUSHING)) +#define GST_RPAD_UNSET_FLUSHING(pad) (GST_FLAG_UNSET (pad, GST_PAD_FLUSHING)) + #define GST_STREAM_GET_LOCK(pad) (GST_PAD_REALIZE(pad)->stream_rec_lock) #define GST_STREAM_LOCK(pad) (g_static_rec_mutex_lock(GST_STREAM_GET_LOCK(pad))) #define GST_STREAM_TRYLOCK(pad) (g_static_rec_mutex_trylock(GST_STREAM_GET_LOCK(pad))) @@ -360,9 +364,10 @@ struct _GstGhostPadClass { #define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE (pad)) #define GST_PAD_PEER(pad) GST_PAD_CAST(GST_RPAD_PEER(GST_PAD_REALIZE(pad))) +#define GST_PAD_TASK(pad) GST_RPAD_TASK(pad) + /* Some check functions (unused?) */ #define GST_PAD_IS_LINKED(pad) (GST_RPAD_IS_LINKED(GST_PAD_REALIZE(pad))) -#define GST_PAD_IS_ACTIVE(pad) (GST_RPAD_IS_ACTIVE(GST_PAD_REALIZE(pad))) #define GST_PAD_IS_BLOCKED(pad) (GST_RPAD_IS_BLOCKED(GST_PAD_REALIZE(pad))) #define GST_PAD_IS_FLUSHING(pad) (GST_RPAD_IS_FLUSHING(GST_PAD_REALIZE(pad))) #define GST_PAD_IS_IN_GETCAPS(pad) (GST_RPAD_IS_IN_GETCAPS(GST_PAD_REALIZE(pad))) @@ -509,17 +514,23 @@ gboolean gst_pad_peer_accept_caps (GstPad * pad, GstCaps *caps); GstCaps * gst_pad_get_allowed_caps (GstPad * srcpad); GstCaps * gst_pad_get_negotiated_caps (GstPad * pad); -/* data passing functions */ +/* data passing functions to peer */ GstFlowReturn gst_pad_push (GstPad *pad, GstBuffer *buffer); gboolean gst_pad_check_pull_range (GstPad *pad); GstFlowReturn gst_pad_pull_range (GstPad *pad, guint64 offset, guint size, GstBuffer **buffer); gboolean gst_pad_push_event (GstPad *pad, GstEvent *event); -gboolean gst_pad_send_event (GstPad *pad, GstEvent *event); gboolean gst_pad_event_default (GstPad *pad, GstEvent *event); +/* data passing functions on pad */ +GstFlowReturn gst_pad_chain (GstPad *pad, GstBuffer *buffer); +GstFlowReturn gst_pad_get_range (GstPad *pad, guint64 offset, guint size, + GstBuffer **buffer); +gboolean gst_pad_send_event (GstPad *pad, GstEvent *event); + /* pad tasks */ -gboolean gst_pad_start_task (GstPad *pad); +gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func, + gpointer data); gboolean gst_pad_pause_task (GstPad *pad); gboolean gst_pad_stop_task (GstPad *pad); diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 2fd8f6e..256d0d1 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -483,9 +483,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) /* forward event */ gst_pad_event_default (pad, event); if (GST_EVENT_FLUSH_DONE (event)) { - GST_STREAM_LOCK (queue->srcpad); - gst_task_start (GST_RPAD_TASK (queue->srcpad)); - GST_STREAM_UNLOCK (queue->srcpad); + gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, + queue->srcpad); } else { /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK; @@ -498,10 +497,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) g_cond_signal (queue->item_add); /* make sure it stops */ - GST_STREAM_LOCK (queue->srcpad); - gst_task_pause (GST_RPAD_TASK (queue->srcpad)); + gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); - GST_STREAM_UNLOCK (queue->srcpad); } goto done; case GST_EVENT_EOS: @@ -555,8 +552,6 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - GST_STREAM_LOCK (pad); - /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK; @@ -633,6 +628,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) STATUS (queue, "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); + if (GST_RPAD_IS_FLUSHING (pad)) + goto out_flushing; + /* if there's a pending state change for this queue * or its manager, switch back to iterator so bottom * half of state change executes */ @@ -663,13 +661,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); return GST_FLOW_OK; out_unref: GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); gst_buffer_unref (buffer); @@ -678,8 +674,7 @@ out_unref: out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); GST_QUEUE_MUTEX_UNLOCK; - gst_task_pause (GST_RPAD_TASK (queue->srcpad)); - GST_STREAM_UNLOCK (pad); + gst_pad_pause_task (queue->srcpad); gst_buffer_unref (buffer); @@ -695,8 +690,6 @@ gst_queue_loop (GstPad * pad) queue = GST_QUEUE (GST_PAD_PARENT (pad)); - GST_STREAM_LOCK (pad); - /* have to lock for thread-safety */ GST_QUEUE_MUTEX_LOCK; @@ -770,14 +763,12 @@ restart: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); return; out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); - gst_task_pause (GST_RPAD_TASK (pad)); + gst_pad_pause_task (pad); GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); return; } @@ -860,17 +851,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); if (mode == GST_ACTIVATE_PUSH) { - /* if we have a scheduler we can start the task */ - if (GST_ELEMENT_SCHEDULER (queue)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue), - (GstTaskFunction) gst_queue_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); } else { /* step 1, unblock chain and loop functions */ GST_QUEUE_MUTEX_LOCK; @@ -879,13 +860,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode) GST_QUEUE_MUTEX_UNLOCK; /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - /* step 3, stop the task */ - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); } return result; } diff --git a/gst/gsttask.c b/gst/gsttask.c index 31ea0f9..332724b 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -72,6 +72,7 @@ gst_task_class_init (GstTaskClass * klass) static void gst_task_init (GstTask * task) { + task->lock = NULL; task->cond = g_cond_new (); task->state = GST_TASK_STOPPED; } @@ -108,6 +109,24 @@ gst_task_create (GstTaskFunction func, gpointer data) } /** + * gst_task_set_lock: + * @task: The #GstTask to use + * @mutex: The GMutex to use + * + * Set the mutex used by the task. + * + * MT safe. + */ +void +gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex) +{ + GST_LOCK (task); + task->lock = mutex; + GST_UNLOCK (task); +} + + +/** * gst_task_get_state: * @task: The #GstTask to query * diff --git a/gst/gsttask.h b/gst/gsttask.h index 935a4c7..6bed518 100644 --- a/gst/gsttask.h +++ b/gst/gsttask.h @@ -54,15 +54,22 @@ typedef enum { #define GST_TASK_SIGNAL(task) g_cond_signal(GST_TASK_GET_COND (task)) #define GST_TASK_BROADCAST(task) g_cond_breadcast(GST_TASK_GET_COND (task)) +#define GST_TASK_GET_LOCK(task) (GST_TASK_CAST(task)->lock) +#define GST_TASK_LOCK(task) g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task)) +#define GST_TASK_UNLOCK(task) g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task)) + struct _GstTask { GstObject object; - /*< public >*/ /* with TASK_LOCK */ - GstTaskState state; - GCond *cond; + + /*< public >*/ /* with LOCK */ + GstTaskState state; + GCond *cond; + + GStaticRecMutex *lock; GstTaskFunction func; - gpointer data; + gpointer data; /*< private >*/ gpointer _gst_reserved[GST_PADDING]; @@ -83,6 +90,7 @@ struct _GstTaskClass { GType gst_task_get_type (void); GstTask* gst_task_create (GstTaskFunction func, gpointer data); +void gst_task_set_lock (GstTask *task, GStaticRecMutex *mutex); GstTaskState gst_task_get_state (GstTask *task); diff --git a/gst/schedulers/threadscheduler.c b/gst/schedulers/threadscheduler.c index 75f68da..1ba67d8 100644 --- a/gst/schedulers/threadscheduler.c +++ b/gst/schedulers/threadscheduler.c @@ -142,9 +142,16 @@ gst_thread_scheduler_task_start (GstTask * task) GstThreadScheduler *tsched = GST_THREAD_SCHEDULER (GST_OBJECT_PARENT (GST_OBJECT (task))); GstTaskState old; + GStaticRecMutex *lock; GST_DEBUG_OBJECT (task, "Starting task %p", task); + if ((lock = GST_TASK_GET_LOCK (task)) == NULL) { + lock = g_new (GStaticRecMutex, 1); + g_static_rec_mutex_init (lock); + GST_TASK_GET_LOCK (task) = lock; + } + GST_LOCK (ttask); old = GST_TASK_CAST (ttask)->state; GST_TASK_CAST (ttask)->state = GST_TASK_STARTED; @@ -269,11 +276,18 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * ttask, GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task, g_thread_self ()); + /* locking order is TASK_LOCK, LOCK */ + GST_TASK_LOCK (task); GST_LOCK (task); while (G_LIKELY (task->state != GST_TASK_STOPPED)) { while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) { + GST_TASK_UNLOCK (task); GST_TASK_SIGNAL (task); GST_TASK_WAIT (task); + GST_UNLOCK (task); + /* locking order.. */ + GST_TASK_LOCK (task); + GST_LOCK (task); if (task->state == GST_TASK_STOPPED) goto done; } @@ -285,6 +299,7 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * ttask, } done: GST_UNLOCK (task); + GST_TASK_UNLOCK (task); GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ()); diff --git a/libs/gst/base/gstadapter.c b/libs/gst/base/gstadapter.c index 993c78c..09239d0 100644 --- a/libs/gst/base/gstadapter.c +++ b/libs/gst/base/gstadapter.c @@ -160,7 +160,7 @@ gst_adapter_peek (GstAdapter * adapter, guint size) if (adapter->assembled_size < size) { adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE; - GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n", + GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u", adapter->assembled_size); adapter->assembled_data = g_realloc (adapter->assembled_data, adapter->assembled_size); @@ -198,7 +198,7 @@ gst_adapter_flush (GstAdapter * adapter, guint flush) g_return_if_fail (flush > 0); g_return_if_fail (flush <= adapter->size); - GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush); + GST_LOG_OBJECT (adapter, "flushing %u bytes", flush); adapter->size -= flush; adapter->assembled_len = 0; while (flush > 0) { diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index e677f0e..b8604c8 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -378,6 +378,8 @@ gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad, if (basesink->preroll_queue->length == 0) { GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink); + GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); if (bclass->preroll) bclass->preroll (basesink, buffer); } @@ -448,7 +450,7 @@ PrerollReturn gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad, GstBuffer * buffer) { - gboolean usable; + gboolean flushing; DEBUG ("finish preroll %p <\n", basesink); /* lock order is important */ @@ -461,13 +463,13 @@ gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad, gst_element_commit_state (GST_ELEMENT (basesink)); GST_STATE_UNLOCK (basesink); + gst_basesink_preroll_queue_push (basesink, pad, buffer); + GST_LOCK (pad); - usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad); + flushing = GST_RPAD_IS_FLUSHING (pad); GST_UNLOCK (pad); - if (!usable) - goto unusable; - - gst_basesink_preroll_queue_push (basesink, pad, buffer); + if (flushing) + goto flushing; if (basesink->need_preroll) goto still_queueing; @@ -490,7 +492,7 @@ no_preroll: GST_STATE_UNLOCK (basesink); return PREROLL_PLAYING; } -unusable: +flushing: { GST_DEBUG ("pad is flushing"); GST_PREROLL_UNLOCK (pad); @@ -726,12 +728,8 @@ gst_basesink_chain (GstPad * pad, GstBuffer * buf) g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode == GST_ACTIVATE_PUSH); - GST_STREAM_LOCK (pad); - result = gst_basesink_chain_unlocked (pad, buf); - GST_STREAM_UNLOCK (pad); - return result; } @@ -748,8 +746,6 @@ gst_basesink_loop (GstPad * pad) g_assert (basesink->pad_mode == GST_ACTIVATE_PULL); - GST_STREAM_LOCK (pad); - result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf); if (result != GST_FLOW_OK) goto paused; @@ -759,12 +755,10 @@ gst_basesink_loop (GstPad * pad) goto paused; /* default */ - GST_STREAM_UNLOCK (pad); return; paused: - gst_task_pause (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); + gst_pad_pause_task (pad); return; } @@ -787,16 +781,8 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode) /* if we have a scheduler we can start the task */ g_return_val_if_fail (basesink->has_loop, FALSE); gst_pad_peer_set_active (pad, mode); - if (GST_ELEMENT_SCHEDULER (basesink)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink), - (GstTaskFunction) gst_basesink_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = + gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad); break; case GST_ACTIVATE_NONE: /* step 1, unblock clock sync (if any) or any other blocking thing */ @@ -816,16 +802,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode) GST_PREROLL_UNLOCK (pad); /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - /* step 3, stop the task */ - if (GST_RPAD_TASK (pad)) { - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); break; } basesink->pad_mode = mode; @@ -911,9 +888,6 @@ gst_basesink_change_state (GstElement * element) basesink->have_preroll = FALSE; GST_PREROLL_UNLOCK (basesink->sinkpad); - /* make sure the element is finished processing */ - GST_STREAM_LOCK (basesink->sinkpad); - GST_STREAM_UNLOCK (basesink->sinkpad); /* clear EOS state */ basesink->eos = FALSE; break; diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 43ddf5a..5ac5adb 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -333,9 +333,8 @@ gst_basesrc_do_seek (GstBaseSrc * src, GstEvent * event) } /* and restart the task */ - if (GST_RPAD_TASK (src->srcpad)) { - gst_task_start (GST_RPAD_TASK (src->srcpad)); - } + gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop, + src->srcpad); GST_STREAM_UNLOCK (src->srcpad); gst_event_unref (event); @@ -447,7 +446,7 @@ gst_basesrc_get_property (GObject * object, guint prop_id, GValue * value, } static GstFlowReturn -gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length, +gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length, GstBuffer ** buf) { GstFlowReturn ret; @@ -499,21 +498,6 @@ unexpected_length: } } -static GstFlowReturn -gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length, - GstBuffer ** ret) -{ - GstFlowReturn fret; - - GST_STREAM_LOCK (pad); - - fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret); - - GST_STREAM_UNLOCK (pad); - - return fret; -} - static gboolean gst_basesrc_check_get_range (GstPad * pad) { @@ -538,9 +522,7 @@ gst_basesrc_loop (GstPad * pad) src = GST_BASESRC (GST_OBJECT_PARENT (pad)); - GST_STREAM_LOCK (pad); - - ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf); + ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf); if (ret != GST_FLOW_OK) goto eos; @@ -550,22 +532,19 @@ gst_basesrc_loop (GstPad * pad) if (ret != GST_FLOW_OK) goto pause; - GST_STREAM_UNLOCK (pad); return; eos: { GST_DEBUG_OBJECT (src, "going to EOS"); - gst_task_pause (GST_RPAD_TASK (pad)); + gst_pad_pause_task (pad); gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS)); - GST_STREAM_UNLOCK (pad); return; } pause: { GST_DEBUG_OBJECT (src, "pausing task"); - gst_task_pause (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); + gst_pad_pause_task (pad); return; } } @@ -733,17 +712,8 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode) result = FALSE; switch (mode) { case GST_ACTIVATE_PUSH: - /* if we have a scheduler we can start the task */ - if (GST_ELEMENT_SCHEDULER (basesrc)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc), - (GstTaskFunction) gst_basesrc_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = + gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad); break; case GST_ACTIVATE_PULL: result = TRUE; @@ -753,16 +723,7 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode) gst_basesrc_unlock (basesrc); /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - /* step 3, stop the task */ - if (GST_RPAD_TASK (pad)) { - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); break; } return result; diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 2fd8f6e..256d0d1 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -483,9 +483,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) /* forward event */ gst_pad_event_default (pad, event); if (GST_EVENT_FLUSH_DONE (event)) { - GST_STREAM_LOCK (queue->srcpad); - gst_task_start (GST_RPAD_TASK (queue->srcpad)); - GST_STREAM_UNLOCK (queue->srcpad); + gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, + queue->srcpad); } else { /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK; @@ -498,10 +497,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) g_cond_signal (queue->item_add); /* make sure it stops */ - GST_STREAM_LOCK (queue->srcpad); - gst_task_pause (GST_RPAD_TASK (queue->srcpad)); + gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); - GST_STREAM_UNLOCK (queue->srcpad); } goto done; case GST_EVENT_EOS: @@ -555,8 +552,6 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - GST_STREAM_LOCK (pad); - /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK; @@ -633,6 +628,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) STATUS (queue, "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); + if (GST_RPAD_IS_FLUSHING (pad)) + goto out_flushing; + /* if there's a pending state change for this queue * or its manager, switch back to iterator so bottom * half of state change executes */ @@ -663,13 +661,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); return GST_FLOW_OK; out_unref: GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); gst_buffer_unref (buffer); @@ -678,8 +674,7 @@ out_unref: out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); GST_QUEUE_MUTEX_UNLOCK; - gst_task_pause (GST_RPAD_TASK (queue->srcpad)); - GST_STREAM_UNLOCK (pad); + gst_pad_pause_task (queue->srcpad); gst_buffer_unref (buffer); @@ -695,8 +690,6 @@ gst_queue_loop (GstPad * pad) queue = GST_QUEUE (GST_PAD_PARENT (pad)); - GST_STREAM_LOCK (pad); - /* have to lock for thread-safety */ GST_QUEUE_MUTEX_LOCK; @@ -770,14 +763,12 @@ restart: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); return; out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); - gst_task_pause (GST_RPAD_TASK (pad)); + gst_pad_pause_task (pad); GST_QUEUE_MUTEX_UNLOCK; - GST_STREAM_UNLOCK (pad); return; } @@ -860,17 +851,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); if (mode == GST_ACTIVATE_PUSH) { - /* if we have a scheduler we can start the task */ - if (GST_ELEMENT_SCHEDULER (queue)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue), - (GstTaskFunction) gst_queue_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); } else { /* step 1, unblock chain and loop functions */ GST_QUEUE_MUTEX_LOCK; @@ -879,13 +860,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode) GST_QUEUE_MUTEX_UNLOCK; /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - /* step 3, stop the task */ - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); } return result; } diff --git a/plugins/elements/gsttee.c b/plugins/elements/gsttee.c index cd7213c..9806606 100644 --- a/plugins/elements/gsttee.c +++ b/plugins/elements/gsttee.c @@ -370,27 +370,10 @@ gst_tee_sink_activate (GstPad * pad, GstActivateMode mode) break; case GST_ACTIVATE_PULL: g_return_val_if_fail (tee->has_sink_loop, FALSE); - if (GST_ELEMENT_SCHEDULER (tee)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee), - (GstTaskFunction) gst_tee_loop, pad); - - gst_pad_start_task (pad); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } + result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad); break; case GST_ACTIVATE_NONE: - GST_STREAM_LOCK (pad); - if (GST_RPAD_TASK (pad)) { - gst_pad_stop_task (pad); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; + result = gst_pad_stop_task (pad); break; } tee->sink_mode = mode;