2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstthread.c: Threaded container object
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
25 /* #define GST_DEBUG_ENABLED */
26 #include "gst_private.h"
28 #include "gstthread.h"
29 #include "gstscheduler.h"
32 GstElementDetails gst_thread_details = {
35 "Container that creates/manages a thread",
37 "Erik Walthinsen <omega@cse.ogi.edu>",
42 /* Thread signals and args */
60 static void gst_thread_class_init (GstThreadClass *klass);
61 static void gst_thread_init (GstThread *thread);
63 static void gst_thread_dispose (GObject *object);
65 static void gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
66 static void gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
68 static GstElementStateReturn gst_thread_change_state (GstElement *element);
70 #ifndef GST_DISABLE_LOADSAVE
71 static xmlNodePtr gst_thread_save_thyself (GstObject *object, xmlNodePtr parent);
72 static void gst_thread_restore_thyself (GstObject *object, xmlNodePtr self);
75 static void* gst_thread_main_loop (void *arg);
77 static GstBinClass *parent_class = NULL;
78 /* static guint gst_thread_signals[LAST_SIGNAL] = { 0 }; */
81 gst_thread_get_type(void) {
82 static GType thread_type = 0;
85 static const GTypeInfo thread_info = {
86 sizeof(GstThreadClass),
89 (GClassInitFunc)gst_thread_class_init,
94 (GInstanceInitFunc)gst_thread_init,
97 thread_type = g_type_register_static(GST_TYPE_BIN, "GstThread", &thread_info, 0);
103 gst_thread_class_init (GstThreadClass *klass)
105 GObjectClass *gobject_class;
106 GstObjectClass *gstobject_class;
107 GstElementClass *gstelement_class;
108 GstBinClass *gstbin_class;
110 gobject_class = (GObjectClass*)klass;
111 gstobject_class = (GstObjectClass*)klass;
112 gstelement_class = (GstElementClass*)klass;
113 gstbin_class = (GstBinClass*)klass;
115 parent_class = g_type_class_ref (GST_TYPE_BIN);
117 gobject_class->dispose = gst_thread_dispose;
119 #ifndef GST_DISABLE_LOADSAVE
120 gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
121 gstobject_class->restore_thyself = GST_DEBUG_FUNCPTR(gst_thread_restore_thyself);
124 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_thread_change_state);
126 /* gstbin_class->schedule = gst_thread_schedule_dummy; */
128 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
129 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
134 gst_thread_init (GstThread *thread)
136 const gchar *schedname;
137 GstScheduler *scheduler;
139 GST_DEBUG (GST_CAT_THREAD, "initializing thread");
141 /* we're a manager by default */
142 /* CR1: the GstBin code checks these flags */
143 GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
144 GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
146 schedname = gst_scheduler_factory_get_default_name ();
148 scheduler = gst_scheduler_factory_make (schedname, GST_ELEMENT (thread));
150 GST_ELEMENT_SCHED (thread) = scheduler;
152 gst_object_ref (GST_OBJECT (scheduler));
153 gst_object_sink (GST_OBJECT (scheduler));
155 thread->lock = g_mutex_new ();
156 thread->cond = g_cond_new ();
158 thread->ppid = getpid ();
159 thread->thread_id = -1;
163 gst_thread_dispose (GObject *object)
165 GstThread *thread = GST_THREAD (object);
167 GST_DEBUG (GST_CAT_REFCOUNTING, "dispose");
169 g_mutex_free (thread->lock);
170 g_cond_free (thread->cond);
172 G_OBJECT_CLASS (parent_class)->dispose (object);
174 if (GST_ELEMENT_SCHED (thread)) {
175 gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
180 gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
182 /* it's not null if we got it, but it might not be ours */
183 g_return_if_fail (GST_IS_THREAD (object));
187 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
193 gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
195 /* it's not null if we got it, but it might not be ours */
196 g_return_if_fail (GST_IS_THREAD (object));
200 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
208 * @name: the name of the thread
210 * Create a new thread with the given name.
212 * Returns: The new thread
215 gst_thread_new (const gchar *name)
217 return gst_element_factory_make ("thread", name);
221 #define THR_INFO(format,args...) \
222 GST_INFO_ELEMENT(GST_CAT_THREAD, thread, "sync(" GST_DEBUG_THREAD_FORMAT "): " format , \
223 GST_DEBUG_THREAD_ARGS(thread->pid) , ## args )
224 #define THR_DEBUG(format,args...) \
225 GST_DEBUG_ELEMENT(GST_CAT_THREAD, thread, "sync(" GST_DEBUG_THREAD_FORMAT "): " format , \
226 GST_DEBUG_THREAD_ARGS(thread->pid) , ## args )
228 #define THR_INFO_MAIN(format,args...) \
229 GST_INFO_ELEMENT(GST_CAT_THREAD, thread, "sync-main(" GST_DEBUG_THREAD_FORMAT "): " format , \
230 GST_DEBUG_THREAD_ARGS(thread->ppid) , ## args )
231 #define THR_DEBUG_MAIN(format,args...) \
232 GST_DEBUG_ELEMENT(GST_CAT_THREAD, thread, "sync-main(" GST_DEBUG_THREAD_FORMAT "): " format , \
233 GST_DEBUG_THREAD_ARGS(thread->ppid) , ## args )
235 static GstElementStateReturn
236 gst_thread_update_state (GstThread *thread)
238 /* check for state change */
239 if (GST_STATE_PENDING(thread) != GST_STATE_VOID_PENDING) {
240 /* punt and change state on all the children */
241 if (GST_ELEMENT_CLASS (parent_class)->change_state)
242 return GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));
245 return GST_STATE_SUCCESS;
249 static GstElementStateReturn
250 gst_thread_change_state (GstElement * element)
253 gboolean stateset = GST_STATE_SUCCESS;
255 pthread_t self = pthread_self ();
257 g_return_val_if_fail (GST_IS_THREAD (element), FALSE);
259 thread = GST_THREAD (element);
261 transition = GST_STATE_TRANSITION (element);
263 THR_INFO ("changing state from %s to %s",
264 gst_element_statename (GST_STATE (element)),
265 gst_element_statename (GST_STATE_PENDING (element)));
267 if (pthread_equal (self, thread->thread_id)) {
268 GST_DEBUG (GST_CAT_THREAD,
269 "no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to spinning",
270 GST_DEBUG_THREAD_ARGS (thread->pid));
271 return gst_thread_update_state (thread);
274 switch (transition) {
275 case GST_STATE_NULL_TO_READY:
276 /* set the state to idle */
277 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
279 THR_DEBUG ("creating thread \"%s\"", GST_ELEMENT_NAME (element));
281 g_mutex_lock (thread->lock);
283 /* create the thread */
284 pthread_create (&thread->thread_id, NULL, gst_thread_main_loop, thread);
286 /* wait for it to 'spin up' */
287 THR_DEBUG ("waiting for child thread spinup");
288 g_cond_wait (thread->cond, thread->lock);
289 THR_DEBUG ("thread claims to be up");
290 g_mutex_unlock (thread->lock);
292 case GST_STATE_READY_TO_PAUSED:
293 THR_INFO ("readying thread");
294 g_mutex_lock (thread->lock);
295 THR_DEBUG ("signaling");
296 g_cond_signal (thread->cond);
297 THR_DEBUG ("waiting for ack");
298 g_cond_wait (thread->cond, thread->lock);
299 THR_DEBUG ("got ack");
300 g_mutex_unlock (thread->lock);
302 case GST_STATE_PAUSED_TO_PLAYING:
303 THR_DEBUG ("telling thread to start spinning");
304 g_mutex_lock (thread->lock);
305 THR_DEBUG ("signaling");
306 g_cond_signal (thread->cond);
307 THR_DEBUG ("waiting for ack");
308 g_cond_wait (thread->cond, thread->lock);
309 THR_DEBUG ("got ack");
310 g_mutex_unlock (thread->lock);
312 case GST_STATE_PLAYING_TO_PAUSED:
314 GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
316 THR_INFO ("pausing thread");
318 /* the following code ensures that the bottom half of thread will run
319 * to perform each elements' change_state() (by calling gstbin.c::
321 * + the pending state was already set by gstelement.c::set_state()
322 * + find every queue we manage, and signal its empty and full conditions
324 g_mutex_lock (thread->lock);
326 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
329 GstElement *element = GST_ELEMENT (elements->data);
332 THR_DEBUG (" element \"%s\"", GST_ELEMENT_NAME (element));
333 elements = g_list_next (elements);
334 if (GST_IS_QUEUE (element)) {
335 GstQueue *queue = GST_QUEUE (element);
336 /* FIXME make this more efficient by only waking queues that are asleep
337 * FIXME and only waking the appropriate condition (depending on if it's
338 * FIXME on up- or down-stream side)
339 * FIXME also make this more efficient by keeping list of managed queues
341 THR_DEBUG ("waking queue \"%s\"", GST_ELEMENT_NAME (element));
342 g_mutex_lock (queue->qlock);
343 GST_STATE_PENDING (element) = GST_STATE_PAUSED;
344 g_cond_signal (queue->not_full);
345 g_cond_signal (queue->not_empty);
346 g_mutex_unlock (queue->qlock);
349 GList *pads = GST_ELEMENT_PADS (element);
352 GstRealPad *peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
353 GstElement *peerelement;
355 pads = g_list_next (pads);
360 peerelement = GST_PAD_PARENT (peer);
362 continue; /* deal with case where there's no peer */
364 if (!GST_FLAG_IS_SET (peerelement, GST_ELEMENT_DECOUPLED)) {
365 GST_DEBUG (GST_CAT_THREAD, "peer element isn't DECOUPLED");
369 /* FIXME this needs to go away eventually */
370 if (!GST_IS_QUEUE (peerelement)) {
371 GST_DEBUG (GST_CAT_THREAD, "peer element isn't a Queue");
375 if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
376 GstQueue *queue = GST_QUEUE (peerelement);
378 THR_DEBUG (" element \"%s\" has pad cross sched boundary", GST_ELEMENT_NAME (element));
380 g_mutex_lock (queue->qlock);
381 g_cond_signal (queue->not_full);
382 g_cond_signal (queue->not_empty);
383 g_mutex_unlock (queue->qlock);
388 THR_DEBUG ("telling thread to pause, signaling");
389 g_cond_signal (thread->cond);
390 THR_DEBUG ("waiting for ack");
391 g_cond_wait (thread->cond, thread->lock);
392 THR_DEBUG ("got ack");
393 g_mutex_unlock (thread->lock);
396 case GST_STATE_READY_TO_NULL:
397 THR_DEBUG ("telling thread to pause (null) - and joining");
398 /* MattH FIXME revisit */
399 g_mutex_lock (thread->lock);
400 THR_DEBUG ("signaling");
401 g_cond_signal (thread->cond);
402 THR_DEBUG ("waiting for ack");
403 g_cond_wait (thread->cond, thread->lock);
404 THR_DEBUG ("got ack");
405 pthread_join (thread->thread_id, NULL);
406 thread->thread_id = -1;
407 g_mutex_unlock (thread->lock);
409 GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
410 GST_FLAG_UNSET (thread, GST_THREAD_STATE_STARTED);
411 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
414 case GST_STATE_PAUSED_TO_READY:
415 THR_DEBUG ("telling thread to stop spinning");
416 g_mutex_lock (thread->lock);
417 THR_DEBUG ("signaling");
418 g_cond_signal (thread->cond);
419 THR_DEBUG ("waiting for ack");
420 g_cond_wait (thread->cond, thread->lock);
421 THR_DEBUG ("got ack");
422 g_mutex_unlock (thread->lock);
426 GST_DEBUG_ELEMENT (GST_CAT_THREAD, element, "UNHANDLED STATE CHANGE! %x", transition);
434 * gst_thread_main_loop:
435 * @arg: the thread to start
437 * The main loop of the thread. The thread will iterate
438 * while the state is GST_THREAD_STATE_SPINNING.
441 gst_thread_main_loop (void *arg)
443 GstThread *thread = GST_THREAD (arg);
446 g_mutex_lock (thread->lock);
448 gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
449 GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
451 thread->pid = getpid();
452 THR_INFO_MAIN("thread is running");
454 /* first we need to change the state of all the children */
455 if (GST_ELEMENT_CLASS (parent_class)->change_state) {
456 stateset = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));
458 if (stateset != GST_STATE_SUCCESS) {
459 THR_DEBUG_MAIN ("state change of children failed");
464 THR_DEBUG_MAIN ("indicating spinup");
465 g_cond_signal (thread->cond);
466 /* don't unlock the mutex because we hold it into the top of the while loop */
467 THR_DEBUG_MAIN ("thread has indicated spinup to parent process");
469 /***** THREAD IS NOW IN READY STATE *****/
471 /* CR1: most of this code is handshaking */
472 /* do this while the thread lives */
473 while (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING)) {
474 /* NOTE we hold the thread lock at this point */
475 /* what we do depends on what state we're in */
476 switch (GST_STATE (thread)) {
477 /* NOTE: cannot be in NULL, we're not running in that state at all */
478 case GST_STATE_READY:
479 /* wait to be set to either the NULL or PAUSED states */
480 THR_DEBUG_MAIN ("thread in %s state, waiting for either %s or %s",
481 gst_element_statename (GST_STATE_READY),
482 gst_element_statename (GST_STATE_NULL),
483 gst_element_statename (GST_STATE_PAUSED));
484 g_cond_wait (thread->cond,thread->lock);
486 /* this must have happened by a state change in the thread context */
487 if (GST_STATE_PENDING (thread) != GST_STATE_NULL &&
488 GST_STATE_PENDING (thread) != GST_STATE_PAUSED) {
489 g_cond_signal (thread->cond);
493 /* been signaled, we need to state transition now and signal back */
494 gst_thread_update_state (thread);
495 THR_DEBUG_MAIN ("done with state transition, signaling back to parent process");
496 g_cond_signal (thread->cond);
497 /* now we decide what to do next */
498 if (GST_STATE (thread) == GST_STATE_NULL) {
499 /* REAPING must be set, we can simply break this iteration */
500 GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
503 case GST_STATE_PAUSED:
504 /* wait to be set to either the READY or PLAYING states */
505 THR_DEBUG_MAIN("thread in %s state, waiting for either %s or %s",
506 gst_element_statename (GST_STATE_PAUSED),
507 gst_element_statename (GST_STATE_READY),
508 gst_element_statename (GST_STATE_PLAYING));
509 g_cond_wait (thread->cond, thread->lock);
511 /* this must have happened by a state change in the thread context */
512 if (GST_STATE_PENDING (thread) != GST_STATE_READY &&
513 GST_STATE_PENDING (thread) != GST_STATE_PLAYING) {
514 g_cond_signal (thread->cond);
518 /* been signaled, we need to state transition now and signal back */
519 gst_thread_update_state (thread);
520 /* now we decide what to do next */
521 if (GST_STATE (thread) != GST_STATE_PLAYING) {
522 /* either READY or the state change failed for some reason */
523 g_cond_signal (thread->cond);
527 GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
528 /* PLAYING is coming up, so we can now start spinning */
529 while (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
532 g_cond_signal (thread->cond);
533 g_mutex_unlock (thread->lock);
534 status = gst_bin_iterate (GST_BIN (thread));
535 g_mutex_lock (thread->lock);
536 /* g_cond_signal(thread->cond); */
538 if (!status || GST_STATE_PENDING (thread) != GST_STATE_VOID_PENDING)
539 GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
541 /* looks like we were stopped because of a statechange */
542 if (GST_STATE_PENDING (thread)) {
543 gst_thread_update_state (thread);
545 /* once we're here, SPINNING has stopped, we should signal that we're done */
546 THR_DEBUG_MAIN ("SPINNING stopped, signaling back to parent process");
547 g_cond_signal (thread->cond);
548 /* now we can wait for PAUSED */
551 case GST_STATE_PLAYING:
552 /* wait to be set to PAUSED */
553 THR_DEBUG_MAIN ("thread in %s state, waiting for %s",
554 gst_element_statename(GST_STATE_PLAYING),
555 gst_element_statename(GST_STATE_PAUSED));
556 g_cond_wait (thread->cond,thread->lock);
558 /* been signaled, we need to state transition now and signal back */
559 gst_thread_update_state (thread);
560 g_cond_signal (thread->cond);
561 /* now we decide what to do next */
562 /* there's only PAUSED, we we just wait for it */
565 THR_DEBUG_MAIN ("thread in %s state, preparing to die",
566 gst_element_statename(GST_STATE_NULL));
567 GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
570 g_assert_not_reached ();
574 /* we need to destroy the scheduler here because it has mapped it's
575 * stack into the threads stack space */
576 gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
578 /* since we don't unlock at the end of the while loop, do it here */
579 g_mutex_unlock (thread->lock);
581 GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
582 GST_ELEMENT_NAME (thread));
586 #ifndef GST_DISABLE_LOADSAVE
588 gst_thread_save_thyself (GstObject *object,
591 if (GST_OBJECT_CLASS (parent_class)->save_thyself)
592 GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
597 gst_thread_restore_thyself (GstObject *object,
600 GST_DEBUG (GST_CAT_THREAD,"gstthread: restore");
602 if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
603 GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
605 #endif /* GST_DISABLE_LOADSAVE */