* gst_pad_start_task:
* @pad: the #GstPad to start the task of
* @func: the task function to call
- * @data: data passed to the task function
+ * @user_data: user data passed to the task function
+ * @notify: called when @user_data is no longer referenced
*
- * Starts a task that repeatedly calls @func with @data. This function
+ * Starts a task that repeatedly calls @func with @user_data. This function
* is mostly used in pad activation functions to start the dataflow.
* The #GST_PAD_STREAM_LOCK of @pad will automatically be acquired
* before @func is called.
* Returns: a %TRUE if the task could be started.
*/
gboolean
-gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data)
+gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer user_data,
+ GDestroyNotify notify)
{
GstTask *task;
gboolean res;
GST_OBJECT_LOCK (pad);
task = GST_PAD_TASK (pad);
if (task == NULL) {
- task = gst_task_new (func, data);
+ task = gst_task_new (func, user_data, notify);
gst_task_set_lock (task, GST_PAD_GET_STREAM_LOCK (pad));
gst_task_set_thread_callbacks (task, &thr_callbacks, pad, NULL);
GST_INFO_OBJECT (pad, "created task %p", task);
/* pad tasks */
gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func,
- gpointer data);
+ gpointer user_data, GDestroyNotify notify);
gboolean gst_pad_pause_task (GstPad *pad);
gboolean gst_pad_stop_task (GstPad *pad);
priv->thr_notify = NULL;
priv->thr_user_data = NULL;
+ if (task->notify)
+ task->notify (task->user_data);
+
gst_object_unref (priv->pool);
/* task thread cannot be running here since it holds a ref
GST_OBJECT_UNLOCK (task);
}
- task->func (task->data);
+ task->func (task->user_data);
}
done:
g_rec_mutex_unlock (lock);
/**
* gst_task_new:
* @func: The #GstTaskFunction to use
- * @data: (closure): User data to pass to @func
+ * @user_data: User data to pass to @func
+ * @notify: the function to call when @user_data is no longer needed.
*
* Create a new Task that will repeatedly call the provided @func
- * with @data as a parameter. Typically the task will run in
+ * with @user_data as a parameter. Typically the task will run in
* a new thread.
*
* The function cannot be changed after the task has been created. You
* MT safe.
*/
GstTask *
-gst_task_new (GstTaskFunction func, gpointer data)
+gst_task_new (GstTaskFunction func, gpointer user_data, GDestroyNotify notify)
{
GstTask *task;
task = g_object_newv (GST_TYPE_TASK, 0, NULL);
task->func = func;
- task->data = data;
+ task->user_data = user_data;
+ task->notify = notify;
GST_DEBUG ("Created task %p", task);
* A function that will repeatedly be called in the thread created by
* a #GstTask.
*/
-typedef void (*GstTaskFunction) (void *user_data);
+typedef void (*GstTaskFunction) (gpointer user_data);
/* --- standard type macros --- */
#define GST_TYPE_TASK (gst_task_get_type ())
* @cond: used to pause/resume the task
* @lock: The lock taken when iterating the task function
* @func: the function executed by this task
- * @data: data passed to the task function
+ * @user_data: user_data passed to the task function
+ * @notify: GDestroyNotify for @user_data
* @running: a flag indicating that the task is running
*
* The #GstTask object.
GRecMutex *lock;
GstTaskFunction func;
- gpointer data;
+ gpointer user_data;
+ GDestroyNotify notify;
gboolean running;
GType gst_task_get_type (void);
-GstTask* gst_task_new (GstTaskFunction func, gpointer data);
+GstTask* gst_task_new (GstTaskFunction func,
+ gpointer user_data, GDestroyNotify notify);
+
void gst_task_set_lock (GstTask *task, GRecMutex *mutex);
GstTaskPool * gst_task_get_pool (GstTask *task);
goto baseparse_push;
return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop,
- sinkpad);
+ sinkpad, NULL);
/* fallback */
baseparse_push:
{
/* Start streaming thread if paused */
gst_pad_start_task (parse->sinkpad,
- (GstTaskFunction) gst_base_parse_loop, parse->sinkpad);
+ (GstTaskFunction) gst_base_parse_loop, parse->sinkpad, NULL);
GST_PAD_STREAM_UNLOCK (parse->sinkpad);
if (active) {
/* start task */
result = gst_pad_start_task (basesink->sinkpad,
- (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
+ (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad, NULL);
} else {
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (basesink->sinkpad);
/* and restart the task in case it got paused explicitly or by
* the FLUSH_START event we pushed out. */
tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
- src->srcpad);
+ src->srcpad, NULL);
if (res && !tres)
res = FALSE;
GST_OBJECT_UNLOCK (basesrc->srcpad);
if (start)
gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
- basesrc->srcpad);
+ basesrc->srcpad, NULL);
GST_DEBUG_OBJECT (basesrc, "signal");
GST_LIVE_SIGNAL (basesrc);
}
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result =
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
- sq->srcpad);
+ sq->srcpad, NULL);
}
return result;
}
queue->eos = FALSE;
queue->unexpected = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
- queue->srcpad);
+ queue->srcpad, NULL);
GST_QUEUE_MUTEX_UNLOCK (queue);
STATUS (queue, pad, "after flush");
queue->eos = FALSE;
queue->unexpected = FALSE;
result =
- gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
+ gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
+ NULL);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* step 1, unblock loop function */
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
- queue->srcpad);
+ queue->srcpad, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
- result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
+ result =
+ gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* unblock loop function */
gst_segment_do_seek (&seeksegment, rate, format, flags,
cur_type, cur, stop_type, stop, NULL);
- flush = !!(flags & GST_SEEK_FLAG_FLUSH);
+ flush = ! !(flags & GST_SEEK_FLAG_FLUSH);
GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment);
/* restart our task since it might have been stopped when we did the
* flush. */
gst_pad_start_task (typefind->sink,
- (GstTaskFunction) gst_type_find_element_loop, typefind->sink);
+ (GstTaskFunction) gst_type_find_element_loop, typefind->sink, NULL);
/* streaming can continue now */
GST_PAD_STREAM_UNLOCK (typefind->sink);
/* only start our task if we ourselves decide to start in pull mode */
return gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop,
- pad);
+ pad, NULL);
typefind_push:
{
/* create a task with some dummy function, we're not actually going to run
* the task here */
- task = gst_task_new ((GstTaskFunction) gst_object_unref, NULL);
+ task = gst_task_new ((GstTaskFunction) gst_object_unref, NULL, NULL);
ASSERT_OBJECT_REFCOUNT (task, "task", 1);
GstTask *t;
gboolean ret;
- t = gst_task_new (task_func2, &t);
+ t = gst_task_new (task_func2, &t, NULL);
fail_if (t == NULL);
g_rec_mutex_init (&task_mutex);
GstTask *t;
gboolean ret;
- t = gst_task_new (task_func, NULL);
+ t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL);
g_rec_mutex_init (&task_mutex);
GstTask *t;
gboolean ret;
- t = gst_task_new (task_func, NULL);
+ t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL);
g_rec_mutex_init (&task_mutex);
GstTask *t;
gboolean ret;
- t = gst_task_new (task_func, NULL);
+ t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL);
/* stop should be possible without lock */
{
GstTask *t;
- t = gst_task_new (task_func, NULL);
+ t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL);
gst_object_unref (t);