more debug info
[platform/upstream/gstreamer.git] / gst / gstthread.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Benjamin Otte <in7y118@public.uni-hamburg.de>
5  *
6  * gstthread.c: Threaded container object
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 #include "gst_private.h"
25
26 #include "gstthread.h"
27 #include "gstmarshal.h"
28 #include "gstscheduler.h"
29 #include "gstinfo.h"
30
31 #define GST_CAT_DEFAULT GST_CAT_THREAD
32 #define STACK_SIZE 0x200000
33
34 static GstElementDetails gst_thread_details =
35 GST_ELEMENT_DETAILS ("Threaded container",
36     "Generic/Bin",
37     "Container that creates/manages a thread",
38     "Erik Walthinsen <omega@cse.ogi.edu>, "
39     "Benjamin Otte <in7y118@informatik.uni-hamburg.de");
40
41 /* Thread signals and args */
42 enum
43 {
44   SHUTDOWN,
45   /* FILL ME */
46   LAST_SIGNAL
47 };
48
49 enum
50 {
51   SPINUP = 0,
52   STATECHANGE,
53   STARTUP
54 };
55
56 enum
57 {
58   ARG_0,
59   ARG_PRIORITY
60 };
61
62
63 static void gst_thread_base_init (gpointer g_class);
64 static void gst_thread_class_init (gpointer g_class, gpointer class_data);
65 static void gst_thread_init (GTypeInstance * instance, gpointer g_class);
66
67 static void gst_thread_dispose (GObject * object);
68
69 static void gst_thread_set_property (GObject * object, guint prop_id,
70     const GValue * value, GParamSpec * pspec);
71 static void gst_thread_get_property (GObject * object, guint prop_id,
72     GValue * value, GParamSpec * pspec);
73 static GstElementStateReturn gst_thread_change_state (GstElement * element);
74 static void gst_thread_child_state_change (GstBin * bin,
75     GstElementState oldstate, GstElementState newstate, GstElement * element);
76
77 static void gst_thread_catch (GstThread * thread);
78 static void gst_thread_release (GstThread * thread);
79
80 #ifndef GST_DISABLE_LOADSAVE
81 static xmlNodePtr gst_thread_save_thyself (GstObject * object,
82     xmlNodePtr parent);
83 static void gst_thread_restore_thyself (GstObject * object, xmlNodePtr self);
84 #endif
85
86 static void *gst_thread_main_loop (void *arg);
87
88 #define GST_TYPE_THREAD_PRIORITY (gst_thread_priority_get_type())
89 static GType
90 gst_thread_priority_get_type (void)
91 {
92   static GType thread_priority_type = 0;
93   static GEnumValue thread_priority[] = {
94     {G_THREAD_PRIORITY_LOW, "LOW", "Low Priority Scheduling"},
95     {G_THREAD_PRIORITY_NORMAL, "NORMAL", "Normal Scheduling"},
96     {G_THREAD_PRIORITY_HIGH, "HIGH", "High Priority Scheduling"},
97     {G_THREAD_PRIORITY_URGENT, "URGENT", "Urgent Scheduling"},
98     {0, NULL, NULL},
99   };
100
101   if (!thread_priority_type) {
102     thread_priority_type =
103         g_enum_register_static ("GstThreadPriority", thread_priority);
104   }
105   return thread_priority_type;
106 }
107
108 static GstBinClass *parent_class = NULL;
109 static guint gst_thread_signals[LAST_SIGNAL] = { 0 };
110 GPrivate *gst_thread_current;
111
112 GType
113 gst_thread_get_type (void)
114 {
115   static GType thread_type = 0;
116
117   if (!thread_type) {
118     static const GTypeInfo thread_info = {
119       sizeof (GstThreadClass),
120       gst_thread_base_init,
121       NULL,
122       gst_thread_class_init,
123       NULL,
124       NULL,
125       sizeof (GstThread),
126       0,
127       gst_thread_init,
128       NULL
129     };
130
131     thread_type = g_type_register_static (GST_TYPE_BIN, "GstThread",
132         &thread_info, 0);
133   }
134   return thread_type;
135 }
136
137 static void
138 gst_thread_base_init (gpointer g_class)
139 {
140   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
141
142   gst_element_class_set_details (gstelement_class, &gst_thread_details);
143 }
144 static void
145 do_nothing (gpointer hi)
146 {
147 }
148 static void
149 gst_thread_class_init (gpointer g_class, gpointer class_data)
150 {
151   GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
152
153 #ifndef GST_DISABLE_LOADSAVE
154   GstObjectClass *gstobject_class = GST_OBJECT_CLASS (g_class);
155 #endif
156   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
157   GstBinClass *gstbin_class = GST_BIN_CLASS (g_class);
158   GstThreadClass *klass = GST_THREAD_CLASS (g_class);
159
160   /* setup gst_thread_current */
161   gst_thread_current = g_private_new (do_nothing);
162
163   parent_class = g_type_class_peek_parent (g_class);
164
165   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PRIORITY,
166       g_param_spec_enum ("priority", "Scheduling Policy",
167           "The scheduling priority of the thread", GST_TYPE_THREAD_PRIORITY,
168           G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE));
169
170   gst_thread_signals[SHUTDOWN] =
171       g_signal_new ("shutdown", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
172       G_STRUCT_OFFSET (GstThreadClass, shutdown), NULL, NULL,
173       gst_marshal_VOID__VOID, G_TYPE_NONE, 0);
174
175   gobject_class->dispose = gst_thread_dispose;
176
177 #ifndef GST_DISABLE_LOADSAVE
178   gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
179   gstobject_class->restore_thyself =
180       GST_DEBUG_FUNCPTR (gst_thread_restore_thyself);
181 #endif
182
183   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_thread_change_state);
184
185   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
186   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
187
188   gstbin_class->child_state_change =
189       GST_DEBUG_FUNCPTR (gst_thread_child_state_change);
190 }
191
192 static void
193 gst_thread_init (GTypeInstance * instance, gpointer g_class)
194 {
195   GstScheduler *scheduler;
196   GstThread *thread = GST_THREAD (instance);
197
198   GST_DEBUG ("initializing thread");
199
200   /* threads are managing bins and iterate themselves */
201   /* CR1: the GstBin code checks these flags */
202   GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
203   GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
204
205   scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread));
206   g_assert (scheduler);
207
208   thread->lock = g_mutex_new ();
209   thread->cond = g_cond_new ();
210
211   thread->thread_id = (GThread *) NULL; /* set in NULL -> READY */
212   thread->priority = G_THREAD_PRIORITY_NORMAL;
213 }
214
215 static void
216 gst_thread_dispose (GObject * object)
217 {
218   GstThread *thread = GST_THREAD (object);
219
220   GST_CAT_DEBUG (GST_CAT_REFCOUNTING, "GstThread: dispose");
221
222   G_OBJECT_CLASS (parent_class)->dispose (object);
223
224   g_assert (GST_STATE (thread) == GST_STATE_NULL);
225
226   g_mutex_free (thread->lock);
227   g_cond_free (thread->cond);
228
229   gst_object_replace ((GstObject **) & GST_ELEMENT_SCHED (thread), NULL);
230 }
231
232 /**
233  * gst_thread_set_priority:
234  * @thread: the thread to change 
235  * @priority: the new priority for the thread
236  *
237  * change the thread's priority
238  */
239 void
240 gst_thread_set_priority (GstThread * thread, GThreadPriority priority)
241 {
242   g_return_if_fail (GST_IS_THREAD (thread));
243
244   thread->priority = priority;
245 }
246
247 static void
248 gst_thread_set_property (GObject * object, guint prop_id, const GValue * value,
249     GParamSpec * pspec)
250 {
251   GstThread *thread;
252
253   thread = GST_THREAD (object);
254
255   switch (prop_id) {
256     case ARG_PRIORITY:
257       thread->priority = g_value_get_enum (value);
258       break;
259     default:
260       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
261       break;
262   }
263 }
264
265 static void
266 gst_thread_get_property (GObject * object, guint prop_id, GValue * value,
267     GParamSpec * pspec)
268 {
269   GstThread *thread;
270
271   thread = GST_THREAD (object);
272
273   switch (prop_id) {
274     case ARG_PRIORITY:
275       g_value_set_enum (value, thread->priority);
276       break;
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279       break;
280   }
281 }
282
283
284 /**
285  * gst_thread_new:
286  * @name: the name of the thread
287  *
288  * Create a new thread with the given name.
289  *
290  * Returns: The new thread
291  */
292 GstElement *
293 gst_thread_new (const gchar * name)
294 {
295   return gst_element_factory_make ("thread", name);
296 }
297
298 /**
299  * gst_thread_get_current:
300  *
301  * Gets the current GstThread.
302  *
303  * Returns: The current GstThread or NULL if you are not running inside a 
304  *          #GstThread.
305  */
306 GstThread *
307 gst_thread_get_current (void)
308 {
309   return (GstThread *) g_private_get (gst_thread_current);
310 }
311
312 static inline void
313 gst_thread_release_children_locks (GstThread * thread)
314 {
315   GstRealPad *peer = NULL;
316   GstElement *peerelement;
317   GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
318
319   while (elements) {
320     GstElement *element = GST_ELEMENT (elements->data);
321     GList *pads;
322
323     g_assert (element);
324     GST_DEBUG_OBJECT (thread, "waking element \"%s\"",
325         GST_ELEMENT_NAME (element));
326     elements = g_list_next (elements);
327
328     if (!gst_element_release_locks (element))
329       g_warning ("element %s could not release locks",
330           GST_ELEMENT_NAME (element));
331
332     pads = GST_ELEMENT_PADS (element);
333
334     while (pads) {
335       if (GST_PAD_PEER (pads->data)) {
336         peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
337         pads = g_list_next (pads);
338       } else {
339         pads = g_list_next (pads);
340         continue;
341       }
342
343       if (!peer)
344         continue;
345
346       peerelement = GST_PAD_PARENT (peer);
347       if (!peerelement)
348         continue;               /* FIXME: deal with case where there's no peer */
349
350       if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
351         GST_LOG_OBJECT (thread, "element \"%s\" has pad cross sched boundary",
352             GST_ELEMENT_NAME (element));
353         GST_LOG_OBJECT (thread, "waking element \"%s\"",
354             GST_ELEMENT_NAME (peerelement));
355         if (!gst_element_release_locks (peerelement))
356           g_warning ("element %s could not release locks",
357               GST_ELEMENT_NAME (peerelement));
358       }
359     }
360   }
361 }
362
363 /* stops the main thread, if there is one and grabs the thread's mutex */
364 static void
365 gst_thread_catch (GstThread * thread)
366 {
367   gboolean wait;
368
369   if (thread == gst_thread_get_current ()) {
370     /* we're trying to catch ourself */
371     if (!GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
372       g_mutex_lock (thread->lock);
373       GST_FLAG_SET (thread, GST_THREAD_MUTEX_LOCKED);
374     }
375     GST_DEBUG_OBJECT (thread, "catching itself");
376     GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
377   } else {
378     /* another thread is trying to catch us */
379     g_mutex_lock (thread->lock);
380     wait = !GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING);
381     while (!wait) {
382       GTimeVal tv;
383
384       GST_LOG_OBJECT (thread, "catching thread...");
385       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
386       g_cond_signal (thread->cond);
387       gst_thread_release_children_locks (thread);
388       g_get_current_time (&tv);
389       g_time_val_add (&tv, 1000);       /* wait a millisecond to catch the thread */
390       wait = g_cond_timed_wait (thread->cond, thread->lock, &tv);
391     }
392     GST_LOG_OBJECT (thread, "caught thread");
393   }
394   g_assert (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING));
395 }
396
397 static void
398 gst_thread_release (GstThread * thread)
399 {
400   if (thread != gst_thread_get_current ()) {
401     g_cond_signal (thread->cond);
402     g_mutex_unlock (thread->lock);
403   }
404 }
405
406 static GstElementStateReturn
407 gst_thread_change_state (GstElement * element)
408 {
409   GstThread *thread;
410   GstElementStateReturn ret;
411   gint transition;
412
413   g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
414   transition = GST_STATE_TRANSITION (element);
415
416   thread = GST_THREAD (element);
417
418   GST_DEBUG_OBJECT (element, "changing state from %s to %s",
419       gst_element_state_get_name (GST_STATE (element)),
420       gst_element_state_get_name (GST_STATE_PENDING (element)));
421
422   gst_thread_catch (thread);
423
424   /* FIXME: (or GStreamers ideas about "threading"): the element variables are
425      commonly accessed by multiple threads at the same time (see bug #111146
426      for an example) */
427   if (transition != GST_STATE_TRANSITION (element)) {
428     g_warning ("inconsistent state information, fix threading please");
429   }
430
431   switch (transition) {
432     case GST_STATE_NULL_TO_READY:
433       /* create the thread */
434       GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
435       thread->thread_id = g_thread_create_full (gst_thread_main_loop,
436           thread, STACK_SIZE, FALSE, TRUE, thread->priority, NULL);
437       if (!thread->thread_id) {
438         GST_ERROR_OBJECT (element, "g_thread_create_full failed");
439         goto error_out;
440       }
441       GST_LOG_OBJECT (element, "GThread created");
442
443       /* wait for it to 'spin up' */
444       g_cond_wait (thread->cond, thread->lock);
445       break;
446     case GST_STATE_READY_TO_PAUSED:
447       break;
448     case GST_STATE_PAUSED_TO_PLAYING:
449     {
450       /* FIXME: recurse into sub-bins */
451       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
452
453       while (elements) {
454         gst_element_enable_threadsafe_properties ((GstElement *) elements->
455             data);
456         elements = g_list_next (elements);
457       }
458       /* reset self to spinning */
459       if (thread == gst_thread_get_current ())
460         GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
461       break;
462     }
463     case GST_STATE_PLAYING_TO_PAUSED:
464     {
465       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
466
467       while (elements) {
468         gst_element_disable_threadsafe_properties ((GstElement *) elements->
469             data);
470         elements = g_list_next (elements);
471       }
472       break;
473     }
474     case GST_STATE_PAUSED_TO_READY:
475       break;
476     case GST_STATE_READY_TO_NULL:
477       /* we can't join the threads here, because this could have been triggered
478          by ourself (ouch) */
479       GST_LOG_OBJECT (thread, "destroying GThread %p", thread->thread_id);
480       GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
481       thread->thread_id = NULL;
482       if (thread == gst_thread_get_current ()) {
483         /* or should we continue? */
484         g_warning
485             ("Thread %s is destroying itself. Function call will not return!",
486             GST_ELEMENT_NAME (thread));
487         gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
488
489         /* unlock and signal - we are out */
490         gst_thread_release (thread);
491
492         GST_INFO_OBJECT (thread, "GThread %p is exiting", g_thread_self ());
493
494         g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
495
496         g_thread_exit (NULL);
497       }
498       /* now wait for the thread to destroy itself */
499       g_cond_signal (thread->cond);
500       g_cond_wait (thread->cond, thread->lock);
501       /* it should be dead now */
502       break;
503     default:
504       GST_ERROR_OBJECT (element, "unhandled state change! %x",
505           GST_STATE_TRANSITION (element));
506       g_warning ("thread %s: UNHANDLED STATE CHANGE! %x",
507           GST_STR_NULL (GST_OBJECT_NAME (element)),
508           GST_STATE_TRANSITION (element));
509       /* FIXME: not doable with current threading mess:
510          g_assert_not_reached ();
511        */
512       break;
513   }
514
515   if (GST_ELEMENT_CLASS (parent_class)->change_state) {
516     ret = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (thread));
517   } else {
518     ret = GST_STATE_SUCCESS;
519   }
520
521   gst_thread_release (thread);
522   return ret;
523
524 error_out:
525   GST_CAT_DEBUG (GST_CAT_STATES, "changing state from %s to %s failed for %s",
526       gst_element_state_get_name (GST_STATE (element)),
527       gst_element_state_get_name (GST_STATE_PENDING (element)),
528       GST_ELEMENT_NAME (element));
529   gst_thread_release (thread);
530   return GST_STATE_FAILURE;
531 }
532
533 /* state changes work this way: We grab the lock and stop the thread from 
534    spinning (via gst_thread_catch) - then we change the state. After that the
535    thread may spin on. */
536 static void
537 gst_thread_child_state_change (GstBin * bin, GstElementState oldstate,
538     GstElementState newstate, GstElement * element)
539 {
540   GST_LOG_OBJECT (bin, "(from thread %s) child %s changed state from %s to %s",
541       gst_thread_get_current ()? GST_ELEMENT_NAME (gst_thread_get_current ()) :
542       "(none)", GST_ELEMENT_NAME (element),
543       gst_element_state_get_name (oldstate),
544       gst_element_state_get_name (newstate));
545   if (parent_class->child_state_change)
546     parent_class->child_state_change (bin, oldstate, newstate, element);
547   /* We'll wake up the main thread now. Note that we can't lock the thread here, 
548      because we might be called from inside gst_thread_change_state when holding
549      the lock. But this doesn't cause any problems. */
550   if (newstate == GST_STATE_PLAYING)
551     g_cond_signal (GST_THREAD (bin)->cond);
552 }
553
554 /**
555  * gst_thread_main_loop:
556  * @arg: the thread to start
557  *
558  * The main loop of the thread. The thread will iterate
559  * while the state is GST_THREAD_STATE_SPINNING.
560  */
561 static void *
562 gst_thread_main_loop (void *arg)
563 {
564   GstThread *thread = NULL;
565   gboolean status;
566
567   thread = GST_THREAD (arg);
568   g_mutex_lock (thread->lock);
569   GST_LOG_OBJECT (thread, "Started main loop");
570
571   /* initialize gst_thread_current */
572   g_private_set (gst_thread_current, thread);
573
574   /* set up the element's scheduler */
575   gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
576   GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
577
578   g_cond_signal (thread->cond);
579   while (!(GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))) {
580     if (GST_STATE (thread) == GST_STATE_PLAYING) {
581       GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
582       status = TRUE;
583       GST_LOG_OBJECT (thread, "starting to iterate");
584       while (status && GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
585         g_mutex_unlock (thread->lock);
586         status = gst_bin_iterate (GST_BIN (thread));
587         if (!status)
588           GST_DEBUG_OBJECT (thread, "iterate returned false");
589         if (GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
590           GST_FLAG_UNSET (thread, GST_THREAD_MUTEX_LOCKED);
591         } else {
592           g_mutex_lock (thread->lock);
593         }
594       }
595       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
596     }
597     if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))
598       break;
599     GST_LOG_OBJECT (thread, "we're caught");
600     g_cond_signal (thread->cond);
601     g_cond_wait (thread->cond, thread->lock);
602   }
603
604   /* we need to destroy the scheduler here because it has mapped it's
605    * stack into the threads stack space */
606   gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
607
608   /* must do that before releasing the lock - we might get disposed before being done */
609   g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
610
611   /* unlock and signal - we are out */
612
613   GST_LOG_OBJECT (thread, "Thread %p exits main loop", g_thread_self ());
614   g_cond_signal (thread->cond);
615   g_mutex_unlock (thread->lock);
616   /* don't assume the GstThread object exists anymore now */
617
618   return NULL;
619 }
620
621 #ifndef GST_DISABLE_LOADSAVE
622 static xmlNodePtr
623 gst_thread_save_thyself (GstObject * object, xmlNodePtr self)
624 {
625   if (GST_OBJECT_CLASS (parent_class)->save_thyself)
626     GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
627   return NULL;
628 }
629
630 static void
631 gst_thread_restore_thyself (GstObject * object, xmlNodePtr self)
632 {
633   GST_LOG_OBJECT (object, "restoring");
634
635   if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
636     GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
637 }
638 #endif /* GST_DISABLE_LOADSAVE */