2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstthread.c: Threaded container object
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
25 /* #define GST_DEBUG_ENABLED */
26 #include "gst_private.h"
29 #include "gstthread.h"
30 #include "gstscheduler.h"
33 GstElementDetails gst_thread_details = {
36 "Container that creates/manages a thread",
38 "Erik Walthinsen <omega@cse.ogi.edu>",
43 /* Thread signals and args */
64 static void gst_thread_class_init (GstThreadClass *klass);
65 static void gst_thread_init (GstThread *thread);
67 static void gst_thread_dispose (GObject *object);
69 static void gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
70 static void gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
72 static GstElementStateReturn gst_thread_change_state (GstElement *element);
74 #ifndef GST_DISABLE_LOADSAVE
75 static xmlNodePtr gst_thread_save_thyself (GstObject *object, xmlNodePtr parent);
76 static void gst_thread_restore_thyself (GstObject *object, xmlNodePtr self);
79 static void* gst_thread_main_loop (void *arg);
81 #define GST_TYPE_THREAD_SCHEDPOLICY (gst_thread_schedpolicy_get_type())
83 gst_thread_schedpolicy_get_type(void) {
84 static GType thread_schedpolicy_type = 0;
85 static GEnumValue thread_schedpolicy[] = {
86 {SCHED_OTHER, "SCHED_OTHER", "Normal Scheduling"},
87 {SCHED_FIFO, "SCHED_FIFO", "FIFO Scheduling (requires root)"},
88 {SCHED_RR, "SCHED_RR", "Round-Robin Scheduling (requires root)"},
91 if (!thread_schedpolicy_type) {
92 thread_schedpolicy_type = g_enum_register_static("GstThreadSchedPolicy", thread_schedpolicy);
94 return thread_schedpolicy_type;
97 static GstBinClass *parent_class = NULL;
98 static guint gst_thread_signals[LAST_SIGNAL] = { 0 };
101 gst_thread_get_type(void) {
102 static GType thread_type = 0;
105 static const GTypeInfo thread_info = {
106 sizeof(GstThreadClass),
109 (GClassInitFunc)gst_thread_class_init,
114 (GInstanceInitFunc)gst_thread_init,
117 thread_type = g_type_register_static(GST_TYPE_BIN, "GstThread", &thread_info, 0);
123 gst_thread_class_init (GstThreadClass *klass)
125 GObjectClass *gobject_class;
126 GstObjectClass *gstobject_class;
127 GstElementClass *gstelement_class;
128 GstBinClass *gstbin_class;
130 gobject_class = (GObjectClass*)klass;
131 gstobject_class = (GstObjectClass*)klass;
132 gstelement_class = (GstElementClass*)klass;
133 gstbin_class = (GstBinClass*)klass;
135 parent_class = g_type_class_ref (GST_TYPE_BIN);
137 g_object_class_install_property(G_OBJECT_CLASS (klass), ARG_SCHEDPOLICY,
138 g_param_spec_enum("schedpolicy", "Scheduling Policy", "The scheduling policy of the thread",
139 GST_TYPE_THREAD_SCHEDPOLICY, SCHED_OTHER, G_PARAM_READWRITE));
140 g_object_class_install_property(G_OBJECT_CLASS (klass), ARG_PRIORITY,
141 g_param_spec_int("priority", "Scheduling Priority", "The scheduling priority of the thread",
142 0, 99, 0, G_PARAM_READWRITE));
144 gst_thread_signals[SHUTDOWN] =
145 g_signal_new ("shutdown", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
146 G_STRUCT_OFFSET (GstThreadClass, shutdown), NULL, NULL,
147 gst_marshal_VOID__VOID, G_TYPE_NONE, 0);
149 gobject_class->dispose = gst_thread_dispose;
151 #ifndef GST_DISABLE_LOADSAVE
152 gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
153 gstobject_class->restore_thyself = GST_DEBUG_FUNCPTR (gst_thread_restore_thyself);
156 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_thread_change_state);
158 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
159 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
164 gst_thread_init (GstThread *thread)
166 GstScheduler *scheduler;
168 GST_DEBUG (GST_CAT_THREAD, "initializing thread");
170 /* threads are managing bins and iterate themselves */
171 /* CR1: the GstBin code checks these flags */
172 GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
173 GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
175 scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread));
177 thread->lock = g_mutex_new ();
178 thread->cond = g_cond_new ();
180 thread->ppid = getpid ();
181 thread->thread_id = -1;
182 thread->sched_policy = SCHED_OTHER;
183 thread->priority = 0;
187 gst_thread_dispose (GObject *object)
189 GstThread *thread = GST_THREAD (object);
191 GST_DEBUG (GST_CAT_REFCOUNTING, "dispose");
193 g_mutex_free (thread->lock);
194 g_cond_free (thread->cond);
196 G_OBJECT_CLASS (parent_class)->dispose (object);
198 if (GST_ELEMENT_SCHED (thread)) {
199 gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
204 gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
208 /* it's not null if we got it, but it might not be ours */
209 g_return_if_fail (GST_IS_THREAD (object));
211 thread = GST_THREAD (object);
214 case ARG_SCHEDPOLICY:
215 thread->sched_policy = g_value_get_enum (value);
218 thread->priority = g_value_get_int (value);
221 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
227 gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
231 /* it's not null if we got it, but it might not be ours */
232 g_return_if_fail (GST_IS_THREAD (object));
234 thread = GST_THREAD (object);
237 case ARG_SCHEDPOLICY:
238 g_value_set_enum (value, thread->sched_policy);
241 g_value_set_int (value, thread->priority);
244 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
252 * @name: the name of the thread
254 * Create a new thread with the given name.
256 * Returns: The new thread
259 gst_thread_new (const gchar *name)
261 return gst_element_factory_make ("thread", name);
265 #define THR_INFO(format,args...) \
266 GST_INFO_ELEMENT(GST_CAT_THREAD, thread, "sync(" GST_DEBUG_THREAD_FORMAT "): " format , \
267 GST_DEBUG_THREAD_ARGS(thread->pid) , ## args )
268 #define THR_DEBUG(format,args...) \
269 GST_DEBUG_ELEMENT(GST_CAT_THREAD, thread, "sync(" GST_DEBUG_THREAD_FORMAT "): " format , \
270 GST_DEBUG_THREAD_ARGS(thread->pid) , ## args )
272 #define THR_INFO_MAIN(format,args...) \
273 GST_INFO_ELEMENT(GST_CAT_THREAD, thread, "sync-main(" GST_DEBUG_THREAD_FORMAT "): " format , \
274 GST_DEBUG_THREAD_ARGS(thread->ppid) , ## args )
275 #define THR_DEBUG_MAIN(format,args...) \
276 GST_DEBUG_ELEMENT(GST_CAT_THREAD, thread, "sync-main(" GST_DEBUG_THREAD_FORMAT "): " format , \
277 GST_DEBUG_THREAD_ARGS(thread->ppid) , ## args )
279 static GstElementStateReturn
280 gst_thread_update_state (GstThread *thread)
282 /* check for state change */
283 if (GST_STATE_PENDING(thread) != GST_STATE_VOID_PENDING) {
284 /* punt and change state on all the children */
285 if (GST_ELEMENT_CLASS (parent_class)->change_state)
286 return GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));
289 return GST_STATE_SUCCESS;
293 static GstElementStateReturn
294 gst_thread_change_state (GstElement * element)
297 gboolean stateset = GST_STATE_SUCCESS;
299 pthread_t self = pthread_self ();
302 g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
303 g_return_val_if_fail (gst_has_threads (), GST_STATE_FAILURE);
305 thread = GST_THREAD (element);
307 transition = GST_STATE_TRANSITION (element);
309 THR_INFO ("changing state from %s to %s",
310 gst_element_state_get_name (GST_STATE (element)),
311 gst_element_state_get_name (GST_STATE_PENDING (element)));
313 if (pthread_equal (self, thread->thread_id)) {
314 GST_DEBUG (GST_CAT_THREAD,
315 "no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to spinning",
316 GST_DEBUG_THREAD_ARGS (thread->pid));
317 return gst_thread_update_state (thread);
320 switch (transition) {
321 case GST_STATE_NULL_TO_READY:
322 /* set the state to idle */
323 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
325 THR_DEBUG ("creating thread \"%s\"", GST_ELEMENT_NAME (element));
327 g_mutex_lock (thread->lock);
329 if (pthread_attr_init (&thread->attr) != 0)
330 g_warning ("pthread_attr_init returned an error !");
332 if (gst_scheduler_get_preferred_stack (GST_ELEMENT_SCHED (element), &thread->stack, &stacksize)) {
333 if (pthread_attr_setstack (&thread->attr, thread->stack, stacksize) != 0) {
334 g_warning ("pthread_attr_setstack failed");
335 return GST_STATE_FAILURE;
337 GST_DEBUG (GST_CAT_THREAD, "pthread attr set stack at %p of size %ld",
338 thread->stack, stacksize);
341 /* create the thread */
342 THR_DEBUG ("going to pthread_create...");
343 if (pthread_create (&thread->thread_id, &thread->attr, gst_thread_main_loop, thread) != 0) {
344 THR_DEBUG ("pthread create failed");
345 g_mutex_unlock (thread->lock);
346 THR_DEBUG ("could not create thread \"%s\"", GST_ELEMENT_NAME (element));
347 return GST_STATE_FAILURE;
349 THR_DEBUG ("pthread created");
351 /* wait for it to 'spin up' */
352 THR_DEBUG ("waiting for child thread spinup");
353 g_cond_wait (thread->cond, thread->lock);
354 THR_DEBUG ("thread claims to be up");
355 g_mutex_unlock (thread->lock);
357 case GST_STATE_READY_TO_PAUSED:
358 THR_INFO ("readying thread");
359 g_mutex_lock (thread->lock);
360 THR_DEBUG ("signaling");
361 g_cond_signal (thread->cond);
362 THR_DEBUG ("waiting for ack");
363 g_cond_wait (thread->cond, thread->lock);
364 THR_DEBUG ("got ack");
365 g_mutex_unlock (thread->lock);
367 case GST_STATE_PAUSED_TO_PLAYING:
369 /* fixme: recurse into sub-bins */
370 const GList *elements = gst_bin_get_list (GST_BIN (thread));
372 gst_element_enable_threadsafe_properties ((GstElement*)elements->data);
373 elements = g_list_next (elements);
376 THR_DEBUG ("telling thread to start spinning");
377 g_mutex_lock (thread->lock);
378 THR_DEBUG ("signaling");
379 g_cond_signal (thread->cond);
380 THR_DEBUG ("waiting for ack");
381 g_cond_wait (thread->cond, thread->lock);
382 THR_DEBUG ("got ack");
383 g_mutex_unlock (thread->lock);
386 case GST_STATE_PLAYING_TO_PAUSED:
388 const GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
390 THR_INFO ("pausing thread");
392 /* the following code ensures that the bottom half of thread will run
393 * to perform each elements' change_state() (by calling gstbin.c::
395 * + the pending state was already set by gstelement.c::set_state()
396 * + unlock all elements so the bottom half can start the state change.
398 g_mutex_lock (thread->lock);
400 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
403 GstElement *element = GST_ELEMENT (elements->data);
408 THR_DEBUG (" waking element \"%s\"", GST_ELEMENT_NAME (element));
409 elements = g_list_next (elements);
411 if (!gst_element_release_locks (element)) {
412 g_warning ("element %s could not release locks", GST_ELEMENT_NAME (element));
415 pads = GST_ELEMENT_PADS (element);
418 GstRealPad *peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
419 GstElement *peerelement;
421 pads = g_list_next (pads);
426 peerelement = GST_PAD_PARENT (peer);
428 continue; /* deal with case where there's no peer */
430 if (!GST_FLAG_IS_SET (peerelement, GST_ELEMENT_DECOUPLED)) {
431 GST_DEBUG (GST_CAT_THREAD, "peer element isn't DECOUPLED");
435 if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
436 THR_DEBUG (" element \"%s\" has pad cross sched boundary", GST_ELEMENT_NAME (element));
437 THR_DEBUG (" waking element \"%s\"", GST_ELEMENT_NAME (peerelement));
438 if (!gst_element_release_locks (peerelement)) {
439 g_warning ("element %s could not release locks", GST_ELEMENT_NAME (peerelement));
445 THR_DEBUG ("telling thread to pause, signaling");
446 g_cond_signal (thread->cond);
447 THR_DEBUG ("waiting for ack");
448 g_cond_wait (thread->cond, thread->lock);
449 THR_DEBUG ("got ack");
450 g_mutex_unlock (thread->lock);
452 elements = gst_bin_get_list (GST_BIN (thread));
454 gst_element_disable_threadsafe_properties ((GstElement*)elements->data);
455 elements = g_list_next (elements);
459 case GST_STATE_READY_TO_NULL:
460 THR_DEBUG ("telling thread to pause (null) - and joining");
461 /* MattH FIXME revisit */
462 g_mutex_lock (thread->lock);
463 THR_DEBUG ("signaling");
464 g_cond_signal (thread->cond);
465 THR_DEBUG ("waiting for ack");
466 g_cond_wait (thread->cond, thread->lock);
467 THR_DEBUG ("got ack");
468 if (pthread_join (thread->thread_id, NULL) != 0)
469 g_warning ("pthread_join has failed !\n");
470 if (pthread_attr_destroy (&thread->attr) != 0)
471 g_warning ("pthread_attr_destroy has failed !\n");
472 thread->thread_id = -1;
473 g_mutex_unlock (thread->lock);
476 GST_DEBUG (GST_CAT_THREAD, "freeing allocated stack (%p)", thread->stack);
477 free (thread->stack);
478 thread->stack = NULL;
481 GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
482 GST_FLAG_UNSET (thread, GST_THREAD_STATE_STARTED);
483 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
486 case GST_STATE_PAUSED_TO_READY:
487 THR_DEBUG ("telling thread to stop spinning");
488 g_mutex_lock (thread->lock);
489 THR_DEBUG ("signaling");
490 g_cond_signal (thread->cond);
491 THR_DEBUG ("waiting for ack");
492 g_cond_wait (thread->cond, thread->lock);
493 THR_DEBUG ("got ack");
494 g_mutex_unlock (thread->lock);
498 GST_DEBUG_ELEMENT (GST_CAT_THREAD, element, "UNHANDLED STATE CHANGE! %x", transition);
506 * gst_thread_main_loop:
507 * @arg: the thread to start
509 * The main loop of the thread. The thread will iterate
510 * while the state is GST_THREAD_STATE_SPINNING.
513 gst_thread_main_loop (void *arg)
515 GstThread *thread = NULL;
518 GST_DEBUG (GST_CAT_THREAD, "gst_thread_main_loop started");
519 thread = GST_THREAD (arg);
520 g_mutex_lock (thread->lock);
522 if (thread->sched_policy != SCHED_OTHER) {
523 struct sched_param sched_param;
525 memset (&sched_param, 0, sizeof (sched_param));
526 if (thread->priority == 0) {
527 thread->priority = sched_get_priority_max (thread->sched_policy);
529 sched_param.sched_priority = thread->priority;
531 if (sched_setscheduler (0, thread->sched_policy, &sched_param) != 0) {
532 GST_DEBUG (GST_CAT_THREAD, "not running with real-time priority");
536 gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
537 GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
539 thread->pid = getpid();
540 THR_INFO_MAIN("thread is running");
542 /* first we need to change the state of all the children */
543 if (GST_ELEMENT_CLASS (parent_class)->change_state) {
544 stateset = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));
546 if (stateset != GST_STATE_SUCCESS) {
547 THR_DEBUG_MAIN ("state change of children failed");
552 THR_DEBUG_MAIN ("indicating spinup");
553 g_cond_signal (thread->cond);
554 /* don't unlock the mutex because we hold it into the top of the while loop */
555 THR_DEBUG_MAIN ("thread has indicated spinup to parent process");
557 /***** THREAD IS NOW IN READY STATE *****/
559 /* CR1: most of this code is handshaking */
560 /* do this while the thread lives */
561 while (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING)) {
562 /* NOTE we hold the thread lock at this point */
563 /* what we do depends on what state we're in */
564 switch (GST_STATE (thread)) {
565 /* NOTE: cannot be in NULL, we're not running in that state at all */
566 case GST_STATE_READY:
567 /* wait to be set to either the NULL or PAUSED states */
568 THR_DEBUG_MAIN ("thread in %s state, waiting for either %s or %s",
569 gst_element_state_get_name (GST_STATE_READY),
570 gst_element_state_get_name (GST_STATE_NULL),
571 gst_element_state_get_name (GST_STATE_PAUSED));
572 g_cond_wait (thread->cond,thread->lock);
574 /* this must have happened by a state change in the thread context */
575 if (GST_STATE_PENDING (thread) != GST_STATE_NULL &&
576 GST_STATE_PENDING (thread) != GST_STATE_PAUSED) {
577 g_cond_signal (thread->cond);
581 /* been signaled, we need to state transition now and signal back */
582 gst_thread_update_state (thread);
583 THR_DEBUG_MAIN ("done with state transition, signaling back to parent process");
584 g_cond_signal (thread->cond);
585 /* now we decide what to do next */
586 if (GST_STATE (thread) == GST_STATE_NULL) {
587 /* REAPING must be set, we can simply break this iteration */
588 GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
591 case GST_STATE_PAUSED:
592 /* wait to be set to either the READY or PLAYING states */
593 THR_DEBUG_MAIN("thread in %s state, waiting for either %s or %s",
594 gst_element_state_get_name (GST_STATE_PAUSED),
595 gst_element_state_get_name (GST_STATE_READY),
596 gst_element_state_get_name (GST_STATE_PLAYING));
597 g_cond_wait (thread->cond, thread->lock);
599 /* this must have happened by a state change in the thread context */
600 if (GST_STATE_PENDING (thread) != GST_STATE_READY &&
601 GST_STATE_PENDING (thread) != GST_STATE_PLAYING) {
602 g_cond_signal (thread->cond);
606 /* been signaled, we need to state transition now and signal back */
607 gst_thread_update_state (thread);
608 /* now we decide what to do next */
609 if (GST_STATE (thread) != GST_STATE_PLAYING) {
610 /* either READY or the state change failed for some reason */
611 g_cond_signal (thread->cond);
615 GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
616 /* PLAYING is coming up, so we can now start spinning */
617 while (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
620 g_cond_signal (thread->cond);
621 g_mutex_unlock (thread->lock);
622 status = gst_bin_iterate (GST_BIN (thread));
623 g_mutex_lock (thread->lock);
624 /* g_cond_signal(thread->cond); */
626 if (!status || GST_STATE_PENDING (thread) != GST_STATE_VOID_PENDING)
627 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
629 /* looks like we were stopped because of a statechange */
630 if (GST_STATE_PENDING (thread)) {
631 gst_thread_update_state (thread);
633 /* once we're here, SPINNING has stopped, we should signal that we're done */
634 THR_DEBUG_MAIN ("SPINNING stopped, signaling back to parent process");
635 g_cond_signal (thread->cond);
636 /* now we can wait for PAUSED */
639 case GST_STATE_PLAYING:
640 /* wait to be set to PAUSED */
641 THR_DEBUG_MAIN ("thread in %s state, waiting for %s",
642 gst_element_state_get_name(GST_STATE_PLAYING),
643 gst_element_state_get_name(GST_STATE_PAUSED));
644 g_cond_wait (thread->cond,thread->lock);
646 /* been signaled, we need to state transition now and signal back */
647 gst_thread_update_state (thread);
648 g_cond_signal (thread->cond);
649 /* now we decide what to do next */
650 /* there's only PAUSED, we we just wait for it */
653 THR_DEBUG_MAIN ("thread in %s state, preparing to die",
654 gst_element_state_get_name(GST_STATE_NULL));
655 GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
658 g_assert_not_reached ();
662 /* we need to destroy the scheduler here because it has mapped it's
663 * stack into the threads stack space */
664 gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
666 /* since we don't unlock at the end of the while loop, do it here */
667 g_mutex_unlock (thread->lock);
669 GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
670 GST_ELEMENT_NAME (thread));
672 g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
677 #ifndef GST_DISABLE_LOADSAVE
679 gst_thread_save_thyself (GstObject *object,
682 if (GST_OBJECT_CLASS (parent_class)->save_thyself)
683 GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
688 gst_thread_restore_thyself (GstObject *object,
691 GST_DEBUG (GST_CAT_THREAD,"gstthread: restore");
693 if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
694 GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
696 #endif /* GST_DISABLE_LOADSAVE */