2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstthread.c: Threaded container object
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
25 /* #define GST_DEBUG_ENABLED */
26 #include "gst_private.h"
29 #include "gstthread.h"
30 #include "gstscheduler.h"
33 GstElementDetails gst_thread_details = {
36 "Container that creates/manages a thread",
38 "Erik Walthinsen <omega@cse.ogi.edu>",
43 /* Thread signals and args */
61 static void gst_thread_class_init (GstThreadClass *klass);
62 static void gst_thread_init (GstThread *thread);
64 static void gst_thread_dispose (GObject *object);
66 static void gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
67 static void gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
69 static GstElementStateReturn gst_thread_change_state (GstElement *element);
71 #ifndef GST_DISABLE_LOADSAVE
72 static xmlNodePtr gst_thread_save_thyself (GstObject *object, xmlNodePtr parent);
73 static void gst_thread_restore_thyself (GstObject *object, xmlNodePtr self);
76 static void* gst_thread_main_loop (void *arg);
78 static GstBinClass *parent_class = NULL;
79 /* static guint gst_thread_signals[LAST_SIGNAL] = { 0 }; */
82 gst_thread_get_type(void) {
83 static GType thread_type = 0;
86 static const GTypeInfo thread_info = {
87 sizeof(GstThreadClass),
90 (GClassInitFunc)gst_thread_class_init,
95 (GInstanceInitFunc)gst_thread_init,
98 thread_type = g_type_register_static(GST_TYPE_BIN, "GstThread", &thread_info, 0);
104 gst_thread_class_init (GstThreadClass *klass)
106 GObjectClass *gobject_class;
107 GstObjectClass *gstobject_class;
108 GstElementClass *gstelement_class;
109 GstBinClass *gstbin_class;
111 gobject_class = (GObjectClass*)klass;
112 gstobject_class = (GstObjectClass*)klass;
113 gstelement_class = (GstElementClass*)klass;
114 gstbin_class = (GstBinClass*)klass;
116 parent_class = g_type_class_ref (GST_TYPE_BIN);
118 gobject_class->dispose = gst_thread_dispose;
120 #ifndef GST_DISABLE_LOADSAVE
121 gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
122 gstobject_class->restore_thyself = GST_DEBUG_FUNCPTR (gst_thread_restore_thyself);
125 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_thread_change_state);
127 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
128 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
133 gst_thread_init (GstThread *thread)
135 GstScheduler *scheduler;
137 GST_DEBUG (GST_CAT_THREAD, "initializing thread");
139 /* threads are managing bins and iterate themselves */
140 /* CR1: the GstBin code checks these flags */
141 GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
142 GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
144 scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread));
146 thread->lock = g_mutex_new ();
147 thread->cond = g_cond_new ();
149 thread->ppid = getpid ();
150 thread->thread_id = -1;
154 gst_thread_dispose (GObject *object)
156 GstThread *thread = GST_THREAD (object);
158 GST_DEBUG (GST_CAT_REFCOUNTING, "dispose");
160 g_mutex_free (thread->lock);
161 g_cond_free (thread->cond);
163 G_OBJECT_CLASS (parent_class)->dispose (object);
165 if (GST_ELEMENT_SCHED (thread)) {
166 gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
171 gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
173 /* it's not null if we got it, but it might not be ours */
174 g_return_if_fail (GST_IS_THREAD (object));
178 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
184 gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
186 /* it's not null if we got it, but it might not be ours */
187 g_return_if_fail (GST_IS_THREAD (object));
191 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
199 * @name: the name of the thread
201 * Create a new thread with the given name.
203 * Returns: The new thread
206 gst_thread_new (const gchar *name)
208 return gst_element_factory_make ("thread", name);
212 #define THR_INFO(format,args...) \
213 GST_INFO_ELEMENT(GST_CAT_THREAD, thread, "sync(" GST_DEBUG_THREAD_FORMAT "): " format , \
214 GST_DEBUG_THREAD_ARGS(thread->pid) , ## args )
215 #define THR_DEBUG(format,args...) \
216 GST_DEBUG_ELEMENT(GST_CAT_THREAD, thread, "sync(" GST_DEBUG_THREAD_FORMAT "): " format , \
217 GST_DEBUG_THREAD_ARGS(thread->pid) , ## args )
219 #define THR_INFO_MAIN(format,args...) \
220 GST_INFO_ELEMENT(GST_CAT_THREAD, thread, "sync-main(" GST_DEBUG_THREAD_FORMAT "): " format , \
221 GST_DEBUG_THREAD_ARGS(thread->ppid) , ## args )
222 #define THR_DEBUG_MAIN(format,args...) \
223 GST_DEBUG_ELEMENT(GST_CAT_THREAD, thread, "sync-main(" GST_DEBUG_THREAD_FORMAT "): " format , \
224 GST_DEBUG_THREAD_ARGS(thread->ppid) , ## args )
226 static GstElementStateReturn
227 gst_thread_update_state (GstThread *thread)
229 /* check for state change */
230 if (GST_STATE_PENDING(thread) != GST_STATE_VOID_PENDING) {
231 /* punt and change state on all the children */
232 if (GST_ELEMENT_CLASS (parent_class)->change_state)
233 return GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));
236 return GST_STATE_SUCCESS;
240 static GstElementStateReturn
241 gst_thread_change_state (GstElement * element)
244 gboolean stateset = GST_STATE_SUCCESS;
246 pthread_t self = pthread_self ();
250 g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
251 g_return_val_if_fail (gst_with_threads (), GST_STATE_FAILURE);
253 thread = GST_THREAD (element);
255 transition = GST_STATE_TRANSITION (element);
257 THR_INFO ("changing state from %s to %s",
258 gst_element_state_get_name (GST_STATE (element)),
259 gst_element_state_get_name (GST_STATE_PENDING (element)));
261 if (pthread_equal (self, thread->thread_id)) {
262 GST_DEBUG (GST_CAT_THREAD,
263 "no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to spinning",
264 GST_DEBUG_THREAD_ARGS (thread->pid));
265 return gst_thread_update_state (thread);
268 switch (transition) {
269 case GST_STATE_NULL_TO_READY:
270 /* set the state to idle */
271 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
273 THR_DEBUG ("creating thread \"%s\"", GST_ELEMENT_NAME (element));
275 g_mutex_lock (thread->lock);
277 pthread_attr_init (&thread->attr);
278 if (gst_scheduler_get_prefered_stack (GST_ELEMENT_SCHED (element), &stack, &stacksize)) {
279 pthread_attr_setstack (&thread->attr, stack, stacksize);
281 /* create the thread */
282 if (pthread_create (&thread->thread_id, &thread->attr, gst_thread_main_loop, thread) != 0) {
283 g_mutex_unlock (thread->lock);
284 THR_DEBUG ("could not create thread \"%s\"", GST_ELEMENT_NAME (element));
285 return GST_STATE_FAILURE;
288 /* wait for it to 'spin up' */
289 THR_DEBUG ("waiting for child thread spinup");
290 g_cond_wait (thread->cond, thread->lock);
291 THR_DEBUG ("thread claims to be up");
292 g_mutex_unlock (thread->lock);
294 case GST_STATE_READY_TO_PAUSED:
295 THR_INFO ("readying thread");
296 g_mutex_lock (thread->lock);
297 THR_DEBUG ("signaling");
298 g_cond_signal (thread->cond);
299 THR_DEBUG ("waiting for ack");
300 g_cond_wait (thread->cond, thread->lock);
301 THR_DEBUG ("got ack");
302 g_mutex_unlock (thread->lock);
304 case GST_STATE_PAUSED_TO_PLAYING:
306 /* fixme: recurse into sub-bins */
307 const GList *elements = gst_bin_get_list (GST_BIN (thread));
309 gst_element_enable_threadsafe_properties ((GstElement*)elements->data);
310 elements = g_list_next (elements);
313 THR_DEBUG ("telling thread to start spinning");
314 g_mutex_lock (thread->lock);
315 THR_DEBUG ("signaling");
316 g_cond_signal (thread->cond);
317 THR_DEBUG ("waiting for ack");
318 g_cond_wait (thread->cond, thread->lock);
319 THR_DEBUG ("got ack");
320 g_mutex_unlock (thread->lock);
323 case GST_STATE_PLAYING_TO_PAUSED:
325 const GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
327 THR_INFO ("pausing thread");
329 /* the following code ensures that the bottom half of thread will run
330 * to perform each elements' change_state() (by calling gstbin.c::
332 * + the pending state was already set by gstelement.c::set_state()
333 * + unlock all elements so the bottom half can start the state change.
335 g_mutex_lock (thread->lock);
337 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
340 GstElement *element = GST_ELEMENT (elements->data);
345 THR_DEBUG (" waking element \"%s\"", GST_ELEMENT_NAME (element));
346 elements = g_list_next (elements);
348 if (!gst_element_release_locks (element)) {
349 g_warning ("element %s could not release locks", GST_ELEMENT_NAME (element));
352 pads = GST_ELEMENT_PADS (element);
355 GstRealPad *peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
356 GstElement *peerelement;
358 pads = g_list_next (pads);
363 peerelement = GST_PAD_PARENT (peer);
365 continue; /* deal with case where there's no peer */
367 if (!GST_FLAG_IS_SET (peerelement, GST_ELEMENT_DECOUPLED)) {
368 GST_DEBUG (GST_CAT_THREAD, "peer element isn't DECOUPLED");
372 if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
373 THR_DEBUG (" element \"%s\" has pad cross sched boundary", GST_ELEMENT_NAME (element));
374 THR_DEBUG (" waking element \"%s\"", GST_ELEMENT_NAME (peerelement));
375 if (!gst_element_release_locks (peerelement)) {
376 g_warning ("element %s could not release locks", GST_ELEMENT_NAME (peerelement));
382 THR_DEBUG ("telling thread to pause, signaling");
383 g_cond_signal (thread->cond);
384 THR_DEBUG ("waiting for ack");
385 g_cond_wait (thread->cond, thread->lock);
386 THR_DEBUG ("got ack");
387 g_mutex_unlock (thread->lock);
389 elements = gst_bin_get_list (GST_BIN (thread));
391 gst_element_disable_threadsafe_properties ((GstElement*)elements->data);
392 elements = g_list_next (elements);
396 case GST_STATE_READY_TO_NULL:
397 THR_DEBUG ("telling thread to pause (null) - and joining");
398 /* MattH FIXME revisit */
399 g_mutex_lock (thread->lock);
400 THR_DEBUG ("signaling");
401 g_cond_signal (thread->cond);
402 THR_DEBUG ("waiting for ack");
403 g_cond_wait (thread->cond, thread->lock);
404 THR_DEBUG ("got ack");
405 pthread_join (thread->thread_id, NULL);
406 thread->thread_id = -1;
407 g_mutex_unlock (thread->lock);
409 GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
410 GST_FLAG_UNSET (thread, GST_THREAD_STATE_STARTED);
411 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
414 case GST_STATE_PAUSED_TO_READY:
415 THR_DEBUG ("telling thread to stop spinning");
416 g_mutex_lock (thread->lock);
417 THR_DEBUG ("signaling");
418 g_cond_signal (thread->cond);
419 THR_DEBUG ("waiting for ack");
420 g_cond_wait (thread->cond, thread->lock);
421 THR_DEBUG ("got ack");
422 g_mutex_unlock (thread->lock);
426 GST_DEBUG_ELEMENT (GST_CAT_THREAD, element, "UNHANDLED STATE CHANGE! %x", transition);
434 * gst_thread_main_loop:
435 * @arg: the thread to start
437 * The main loop of the thread. The thread will iterate
438 * while the state is GST_THREAD_STATE_SPINNING.
441 gst_thread_main_loop (void *arg)
443 GstThread *thread = GST_THREAD (arg);
446 g_mutex_lock (thread->lock);
448 gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
449 GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
451 thread->pid = getpid();
452 THR_INFO_MAIN("thread is running");
454 /* first we need to change the state of all the children */
455 if (GST_ELEMENT_CLASS (parent_class)->change_state) {
456 stateset = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));
458 if (stateset != GST_STATE_SUCCESS) {
459 THR_DEBUG_MAIN ("state change of children failed");
464 THR_DEBUG_MAIN ("indicating spinup");
465 g_cond_signal (thread->cond);
466 /* don't unlock the mutex because we hold it into the top of the while loop */
467 THR_DEBUG_MAIN ("thread has indicated spinup to parent process");
469 /***** THREAD IS NOW IN READY STATE *****/
471 /* CR1: most of this code is handshaking */
472 /* do this while the thread lives */
473 while (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING)) {
474 /* NOTE we hold the thread lock at this point */
475 /* what we do depends on what state we're in */
476 switch (GST_STATE (thread)) {
477 /* NOTE: cannot be in NULL, we're not running in that state at all */
478 case GST_STATE_READY:
479 /* wait to be set to either the NULL or PAUSED states */
480 THR_DEBUG_MAIN ("thread in %s state, waiting for either %s or %s",
481 gst_element_state_get_name (GST_STATE_READY),
482 gst_element_state_get_name (GST_STATE_NULL),
483 gst_element_state_get_name (GST_STATE_PAUSED));
484 g_cond_wait (thread->cond,thread->lock);
486 /* this must have happened by a state change in the thread context */
487 if (GST_STATE_PENDING (thread) != GST_STATE_NULL &&
488 GST_STATE_PENDING (thread) != GST_STATE_PAUSED) {
489 g_cond_signal (thread->cond);
493 /* been signaled, we need to state transition now and signal back */
494 gst_thread_update_state (thread);
495 THR_DEBUG_MAIN ("done with state transition, signaling back to parent process");
496 g_cond_signal (thread->cond);
497 /* now we decide what to do next */
498 if (GST_STATE (thread) == GST_STATE_NULL) {
499 /* REAPING must be set, we can simply break this iteration */
500 GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
503 case GST_STATE_PAUSED:
504 /* wait to be set to either the READY or PLAYING states */
505 THR_DEBUG_MAIN("thread in %s state, waiting for either %s or %s",
506 gst_element_state_get_name (GST_STATE_PAUSED),
507 gst_element_state_get_name (GST_STATE_READY),
508 gst_element_state_get_name (GST_STATE_PLAYING));
509 g_cond_wait (thread->cond, thread->lock);
511 /* this must have happened by a state change in the thread context */
512 if (GST_STATE_PENDING (thread) != GST_STATE_READY &&
513 GST_STATE_PENDING (thread) != GST_STATE_PLAYING) {
514 g_cond_signal (thread->cond);
518 /* been signaled, we need to state transition now and signal back */
519 gst_thread_update_state (thread);
520 /* now we decide what to do next */
521 if (GST_STATE (thread) != GST_STATE_PLAYING) {
522 /* either READY or the state change failed for some reason */
523 g_cond_signal (thread->cond);
527 GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
528 /* PLAYING is coming up, so we can now start spinning */
529 while (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
532 g_cond_signal (thread->cond);
533 g_mutex_unlock (thread->lock);
534 status = gst_bin_iterate (GST_BIN (thread));
535 g_mutex_lock (thread->lock);
536 /* g_cond_signal(thread->cond); */
538 if (!status || GST_STATE_PENDING (thread) != GST_STATE_VOID_PENDING)
539 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
541 /* looks like we were stopped because of a statechange */
542 if (GST_STATE_PENDING (thread)) {
543 gst_thread_update_state (thread);
545 /* once we're here, SPINNING has stopped, we should signal that we're done */
546 THR_DEBUG_MAIN ("SPINNING stopped, signaling back to parent process");
547 g_cond_signal (thread->cond);
548 /* now we can wait for PAUSED */
551 case GST_STATE_PLAYING:
552 /* wait to be set to PAUSED */
553 THR_DEBUG_MAIN ("thread in %s state, waiting for %s",
554 gst_element_state_get_name(GST_STATE_PLAYING),
555 gst_element_state_get_name(GST_STATE_PAUSED));
556 g_cond_wait (thread->cond,thread->lock);
558 /* been signaled, we need to state transition now and signal back */
559 gst_thread_update_state (thread);
560 g_cond_signal (thread->cond);
561 /* now we decide what to do next */
562 /* there's only PAUSED, we we just wait for it */
565 THR_DEBUG_MAIN ("thread in %s state, preparing to die",
566 gst_element_state_get_name(GST_STATE_NULL));
567 GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
570 g_assert_not_reached ();
574 /* we need to destroy the scheduler here because it has mapped it's
575 * stack into the threads stack space */
576 gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
578 /* since we don't unlock at the end of the while loop, do it here */
579 g_mutex_unlock (thread->lock);
581 GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
582 GST_ELEMENT_NAME (thread));
586 #ifndef GST_DISABLE_LOADSAVE
588 gst_thread_save_thyself (GstObject *object,
591 if (GST_OBJECT_CLASS (parent_class)->save_thyself)
592 GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
597 gst_thread_restore_thyself (GstObject *object,
600 GST_DEBUG (GST_CAT_THREAD,"gstthread: restore");
602 if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
603 GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
605 #endif /* GST_DISABLE_LOADSAVE */