3252cfef80d93e6823d83503e43ffeea149fd7d4
[platform/upstream/gstreamer.git] / gst / gstthread.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Benjamin Otte <in7y118@public.uni-hamburg.de>
5  *
6  * gstthread.c: Threaded container object
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 #include "gst_private.h"
25
26 #include "gstthread.h"
27 #include "gstmarshal.h"
28 #include "gstscheduler.h"
29 #include "gstinfo.h"
30
31 #define GST_CAT_DEFAULT GST_CAT_THREAD
32 #define STACK_SIZE 0x200000
33
34 static GstElementDetails gst_thread_details =
35 GST_ELEMENT_DETAILS ("Threaded container",
36     "Generic/Bin",
37     "Container that creates/manages a thread",
38     "Erik Walthinsen <omega@cse.ogi.edu>, "
39     "Benjamin Otte <in7y118@informatik.uni-hamburg.de");
40
41 /* Thread signals and args */
42 enum
43 {
44   SHUTDOWN,
45   /* FILL ME */
46   LAST_SIGNAL
47 };
48
49 enum
50 {
51   SPINUP = 0,
52   STATECHANGE,
53   STARTUP
54 };
55
56 enum
57 {
58   ARG_0,
59   ARG_PRIORITY
60 };
61
62
63 static void gst_thread_base_init (gpointer g_class);
64 static void gst_thread_class_init (gpointer g_class, gpointer class_data);
65 static void gst_thread_init (GTypeInstance * instance, gpointer g_class);
66
67 static void gst_thread_dispose (GObject * object);
68
69 static void gst_thread_set_property (GObject * object, guint prop_id,
70     const GValue * value, GParamSpec * pspec);
71 static void gst_thread_get_property (GObject * object, guint prop_id,
72     GValue * value, GParamSpec * pspec);
73 static GstElementStateReturn gst_thread_change_state (GstElement * element);
74 static void gst_thread_child_state_change (GstBin * bin,
75     GstElementState oldstate, GstElementState newstate, GstElement * element);
76
77 static void gst_thread_catch (GstThread * thread);
78 static void gst_thread_release (GstThread * thread);
79
80 #ifndef GST_DISABLE_LOADSAVE
81 static xmlNodePtr gst_thread_save_thyself (GstObject * object,
82     xmlNodePtr parent);
83 static void gst_thread_restore_thyself (GstObject * object, xmlNodePtr self);
84 #endif
85
86 static void *gst_thread_main_loop (void *arg);
87
88 #define GST_TYPE_THREAD_PRIORITY (gst_thread_priority_get_type())
89 static GType
90 gst_thread_priority_get_type (void)
91 {
92   static GType thread_priority_type = 0;
93   static GEnumValue thread_priority[] = {
94     {G_THREAD_PRIORITY_LOW, "LOW", "Low Priority Scheduling"},
95     {G_THREAD_PRIORITY_NORMAL, "NORMAL", "Normal Scheduling"},
96     {G_THREAD_PRIORITY_HIGH, "HIGH", "High Priority Scheduling"},
97     {G_THREAD_PRIORITY_URGENT, "URGENT", "Urgent Scheduling"},
98     {0, NULL, NULL},
99   };
100
101   if (!thread_priority_type) {
102     thread_priority_type =
103         g_enum_register_static ("GstThreadPriority", thread_priority);
104   }
105   return thread_priority_type;
106 }
107
108 static GstBinClass *parent_class = NULL;
109 static guint gst_thread_signals[LAST_SIGNAL] = { 0 };
110 GPrivate *gst_thread_current;
111
112 GType
113 gst_thread_get_type (void)
114 {
115   static GType thread_type = 0;
116
117   if (!thread_type) {
118     static const GTypeInfo thread_info = {
119       sizeof (GstThreadClass),
120       gst_thread_base_init,
121       NULL,
122       gst_thread_class_init,
123       NULL,
124       NULL,
125       sizeof (GstThread),
126       0,
127       gst_thread_init,
128       NULL
129     };
130
131     thread_type = g_type_register_static (GST_TYPE_BIN, "GstThread",
132         &thread_info, 0);
133   }
134   return thread_type;
135 }
136
137 static void
138 gst_thread_base_init (gpointer g_class)
139 {
140   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
141
142   gst_element_class_set_details (gstelement_class, &gst_thread_details);
143 }
144 static void
145 do_nothing (gpointer hi)
146 {
147 }
148 static void
149 gst_thread_class_init (gpointer g_class, gpointer class_data)
150 {
151   GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
152
153 #ifndef GST_DISABLE_LOADSAVE
154   GstObjectClass *gstobject_class = GST_OBJECT_CLASS (g_class);
155 #endif
156   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
157   GstBinClass *gstbin_class = GST_BIN_CLASS (g_class);
158   GstThreadClass *klass = GST_THREAD_CLASS (g_class);
159
160   /* setup gst_thread_current */
161   gst_thread_current = g_private_new (do_nothing);
162
163   parent_class = g_type_class_peek_parent (g_class);
164
165   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PRIORITY,
166       g_param_spec_enum ("priority", "Scheduling Policy",
167           "The scheduling priority of the thread", GST_TYPE_THREAD_PRIORITY,
168           G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE));
169
170   gst_thread_signals[SHUTDOWN] =
171       g_signal_new ("shutdown", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
172       G_STRUCT_OFFSET (GstThreadClass, shutdown), NULL, NULL,
173       gst_marshal_VOID__VOID, G_TYPE_NONE, 0);
174
175   gobject_class->dispose = gst_thread_dispose;
176
177 #ifndef GST_DISABLE_LOADSAVE
178   gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
179   gstobject_class->restore_thyself =
180       GST_DEBUG_FUNCPTR (gst_thread_restore_thyself);
181 #endif
182
183   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_thread_change_state);
184
185   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
186   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
187
188   gstbin_class->child_state_change =
189       GST_DEBUG_FUNCPTR (gst_thread_child_state_change);
190 }
191
192 static void
193 gst_thread_init (GTypeInstance * instance, gpointer g_class)
194 {
195   GstScheduler *scheduler;
196   GstThread *thread = GST_THREAD (instance);
197
198   GST_DEBUG ("initializing thread");
199
200   /* threads are managing bins and iterate themselves */
201   /* CR1: the GstBin code checks these flags */
202   GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
203   GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
204
205   scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread));
206   g_assert (scheduler);
207
208   thread->lock = g_mutex_new ();
209   thread->cond = g_cond_new ();
210
211   thread->thread_id = (GThread *) NULL; /* set in NULL -> READY */
212   thread->priority = G_THREAD_PRIORITY_NORMAL;
213 }
214
215 static void
216 gst_thread_dispose (GObject * object)
217 {
218   GstThread *thread = GST_THREAD (object);
219
220   GST_CAT_DEBUG (GST_CAT_REFCOUNTING, "GstThread: dispose");
221
222   G_OBJECT_CLASS (parent_class)->dispose (object);
223
224   g_assert (GST_STATE (thread) == GST_STATE_NULL);
225
226   GST_CAT_DEBUG (GST_CAT_REFCOUNTING, "GstThread: dispose, freeing locks");
227
228   g_mutex_free (thread->lock);
229   g_cond_free (thread->cond);
230
231   gst_object_replace ((GstObject **) & GST_ELEMENT_SCHED (thread), NULL);
232 }
233
234 /**
235  * gst_thread_set_priority:
236  * @thread: the thread to change 
237  * @priority: the new priority for the thread
238  *
239  * change the thread's priority
240  */
241 void
242 gst_thread_set_priority (GstThread * thread, GThreadPriority priority)
243 {
244   g_return_if_fail (GST_IS_THREAD (thread));
245
246   thread->priority = priority;
247 }
248
249 static void
250 gst_thread_set_property (GObject * object, guint prop_id, const GValue * value,
251     GParamSpec * pspec)
252 {
253   GstThread *thread;
254
255   thread = GST_THREAD (object);
256
257   switch (prop_id) {
258     case ARG_PRIORITY:
259       thread->priority = g_value_get_enum (value);
260       break;
261     default:
262       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
263       break;
264   }
265 }
266
267 static void
268 gst_thread_get_property (GObject * object, guint prop_id, GValue * value,
269     GParamSpec * pspec)
270 {
271   GstThread *thread;
272
273   thread = GST_THREAD (object);
274
275   switch (prop_id) {
276     case ARG_PRIORITY:
277       g_value_set_enum (value, thread->priority);
278       break;
279     default:
280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
281       break;
282   }
283 }
284
285
286 /**
287  * gst_thread_new:
288  * @name: the name of the thread
289  *
290  * Create a new thread with the given name.
291  *
292  * Returns: The new thread
293  */
294 GstElement *
295 gst_thread_new (const gchar * name)
296 {
297   return gst_element_factory_make ("thread", name);
298 }
299
300 /**
301  * gst_thread_get_current:
302  *
303  * Gets the current GstThread.
304  *
305  * Returns: The current GstThread or NULL if you are not running inside a 
306  *          #GstThread.
307  */
308 GstThread *
309 gst_thread_get_current (void)
310 {
311   return (GstThread *) g_private_get (gst_thread_current);
312 }
313
314 static inline void
315 gst_thread_release_children_locks (GstThread * thread)
316 {
317   GstRealPad *peer = NULL;
318   GstElement *peerelement;
319   GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
320
321   while (elements) {
322     GstElement *element = GST_ELEMENT (elements->data);
323     GList *pads;
324
325     g_assert (element);
326     GST_DEBUG_OBJECT (thread, "waking element \"%s\"",
327         GST_ELEMENT_NAME (element));
328     elements = g_list_next (elements);
329
330     if (!gst_element_release_locks (element))
331       g_warning ("element %s could not release locks",
332           GST_ELEMENT_NAME (element));
333
334     pads = GST_ELEMENT_PADS (element);
335
336     while (pads) {
337       if (GST_PAD_PEER (pads->data)) {
338         peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
339         pads = g_list_next (pads);
340       } else {
341         pads = g_list_next (pads);
342         continue;
343       }
344
345       if (!peer)
346         continue;
347
348       peerelement = GST_PAD_PARENT (peer);
349       if (!peerelement)
350         continue;               /* FIXME: deal with case where there's no peer */
351
352       if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
353         GST_LOG_OBJECT (thread, "element \"%s\" has pad cross sched boundary",
354             GST_ELEMENT_NAME (element));
355         GST_LOG_OBJECT (thread, "waking element \"%s\"",
356             GST_ELEMENT_NAME (peerelement));
357         if (!gst_element_release_locks (peerelement))
358           g_warning ("element %s could not release locks",
359               GST_ELEMENT_NAME (peerelement));
360       }
361     }
362   }
363 }
364
365 /* stops the main thread, if there is one and grabs the thread's mutex */
366 static void
367 gst_thread_catch (GstThread * thread)
368 {
369   gboolean wait;
370
371   if (thread == gst_thread_get_current ()) {
372     /* we're trying to catch ourself */
373     if (!GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
374       GST_DEBUG_OBJECT (thread, "catching itself, grabbing lock");
375       g_mutex_lock (thread->lock);
376       GST_FLAG_SET (thread, GST_THREAD_MUTEX_LOCKED);
377     }
378     GST_DEBUG_OBJECT (thread, "catching itself");
379     GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
380   } else {
381     GST_DEBUG_OBJECT (thread, "catching thread, grabbing lock");
382     /* another thread is trying to catch us */
383     g_mutex_lock (thread->lock);
384     wait = !GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING);
385     while (!wait) {
386       GTimeVal tv;
387
388       GST_LOG_OBJECT (thread, "catching thread...");
389       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
390       g_cond_signal (thread->cond);
391       gst_thread_release_children_locks (thread);
392       g_get_current_time (&tv);
393       g_time_val_add (&tv, 1000);       /* wait a millisecond to catch the thread */
394       wait = g_cond_timed_wait (thread->cond, thread->lock, &tv);
395     }
396     GST_LOG_OBJECT (thread, "caught thread");
397   }
398   g_assert (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING));
399 }
400
401 static void
402 gst_thread_release (GstThread * thread)
403 {
404   if (thread != gst_thread_get_current ()) {
405     g_cond_signal (thread->cond);
406     g_mutex_unlock (thread->lock);
407   }
408 }
409
410 static GstElementStateReturn
411 gst_thread_change_state (GstElement * element)
412 {
413   GstThread *thread;
414   GstElementStateReturn ret;
415   gint transition;
416
417   g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
418   transition = GST_STATE_TRANSITION (element);
419
420   thread = GST_THREAD (element);
421
422   GST_DEBUG_OBJECT (element, "changing state from %s to %s",
423       gst_element_state_get_name (GST_STATE (element)),
424       gst_element_state_get_name (GST_STATE_PENDING (element)));
425
426   gst_thread_catch (thread);
427
428   /* FIXME: (or GStreamers ideas about "threading"): the element variables are
429      commonly accessed by multiple threads at the same time (see bug #111146
430      for an example) */
431   if (transition != GST_STATE_TRANSITION (element)) {
432     g_warning ("inconsistent state information, fix threading please");
433   }
434
435   switch (transition) {
436     case GST_STATE_NULL_TO_READY:
437       /* create the thread */
438       GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
439       thread->thread_id = g_thread_create_full (gst_thread_main_loop,
440           thread, STACK_SIZE, FALSE, TRUE, thread->priority, NULL);
441       if (!thread->thread_id) {
442         GST_ERROR_OBJECT (element, "g_thread_create_full failed");
443         goto error_out;
444       }
445       GST_LOG_OBJECT (element, "GThread created");
446
447       /* wait for it to 'spin up' */
448       g_cond_wait (thread->cond, thread->lock);
449       break;
450     case GST_STATE_READY_TO_PAUSED:
451       break;
452     case GST_STATE_PAUSED_TO_PLAYING:
453     {
454       /* FIXME: recurse into sub-bins */
455       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
456
457       while (elements) {
458         gst_element_enable_threadsafe_properties ((GstElement *) elements->
459             data);
460         elements = g_list_next (elements);
461       }
462       /* reset self to spinning */
463       if (thread == gst_thread_get_current ())
464         GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
465       break;
466     }
467     case GST_STATE_PLAYING_TO_PAUSED:
468     {
469       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
470
471       while (elements) {
472         gst_element_disable_threadsafe_properties ((GstElement *) elements->
473             data);
474         elements = g_list_next (elements);
475       }
476       break;
477     }
478     case GST_STATE_PAUSED_TO_READY:
479       break;
480     case GST_STATE_READY_TO_NULL:
481       /* we can't join the threads here, because this could have been triggered
482          by ourself (ouch) */
483       GST_LOG_OBJECT (thread, "destroying GThread %p", thread->thread_id);
484       GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
485       thread->thread_id = NULL;
486       if (thread == gst_thread_get_current ()) {
487         /* or should we continue? */
488         g_warning
489             ("Thread %s is destroying itself. Function call will not return!",
490             GST_ELEMENT_NAME (thread));
491         gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
492
493         /* unlock and signal - we are out */
494         gst_thread_release (thread);
495
496         GST_INFO_OBJECT (thread, "GThread %p is exiting", g_thread_self ());
497
498         g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
499
500         g_thread_exit (NULL);
501       }
502       /* now wait for the thread to destroy itself */
503       g_cond_signal (thread->cond);
504       g_cond_wait (thread->cond, thread->lock);
505       /* it should be dead now */
506       break;
507     default:
508       break;
509   }
510
511   if (GST_ELEMENT_CLASS (parent_class)->change_state) {
512     ret = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (thread));
513   } else {
514     ret = GST_STATE_SUCCESS;
515   }
516
517   gst_thread_release (thread);
518   return ret;
519
520 error_out:
521   GST_CAT_DEBUG (GST_CAT_STATES, "changing state from %s to %s failed for %s",
522       gst_element_state_get_name (GST_STATE (element)),
523       gst_element_state_get_name (GST_STATE_PENDING (element)),
524       GST_ELEMENT_NAME (element));
525   gst_thread_release (thread);
526   return GST_STATE_FAILURE;
527 }
528
529 /* state changes work this way: We grab the lock and stop the thread from 
530    spinning (via gst_thread_catch) - then we change the state. After that the
531    thread may spin on. */
532 static void
533 gst_thread_child_state_change (GstBin * bin, GstElementState oldstate,
534     GstElementState newstate, GstElement * element)
535 {
536   GST_LOG_OBJECT (bin, "(from thread %s) child %s changed state from %s to %s",
537       gst_thread_get_current ()? GST_ELEMENT_NAME (gst_thread_get_current ()) :
538       "(none)", GST_ELEMENT_NAME (element),
539       gst_element_state_get_name (oldstate),
540       gst_element_state_get_name (newstate));
541   if (parent_class->child_state_change)
542     parent_class->child_state_change (bin, oldstate, newstate, element);
543   /* We'll wake up the main thread now. Note that we can't lock the thread here, 
544      because we might be called from inside gst_thread_change_state when holding
545      the lock. But this doesn't cause any problems. */
546   if (newstate == GST_STATE_PLAYING)
547     g_cond_signal (GST_THREAD (bin)->cond);
548 }
549
550 /**
551  * gst_thread_main_loop:
552  * @arg: the thread to start
553  *
554  * The main loop of the thread. The thread will iterate
555  * while the state is GST_THREAD_STATE_SPINNING.
556  */
557 static void *
558 gst_thread_main_loop (void *arg)
559 {
560   GstThread *thread = NULL;
561   gboolean status;
562   GstScheduler *sched;
563
564   thread = GST_THREAD (arg);
565   g_mutex_lock (thread->lock);
566   GST_LOG_OBJECT (thread, "Started main loop");
567
568   /* initialize gst_thread_current */
569   g_private_set (gst_thread_current, thread);
570
571   /* set up the element's scheduler */
572   gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
573   GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
574
575   g_cond_signal (thread->cond);
576   while (!(GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))) {
577     if (GST_STATE (thread) == GST_STATE_PLAYING) {
578       GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
579       status = TRUE;
580       GST_LOG_OBJECT (thread, "starting to iterate");
581       while (status && GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
582         g_mutex_unlock (thread->lock);
583         status = gst_bin_iterate (GST_BIN (thread));
584         if (!status)
585           GST_DEBUG_OBJECT (thread, "iterate returned false");
586         if (GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
587           GST_FLAG_UNSET (thread, GST_THREAD_MUTEX_LOCKED);
588         } else {
589           g_mutex_lock (thread->lock);
590         }
591       }
592       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
593     }
594     if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))
595       break;
596     GST_LOG_OBJECT (thread, "we're caught");
597     g_cond_signal (thread->cond);
598     g_cond_wait (thread->cond, thread->lock);
599   }
600
601   /* we need to destroy the scheduler here because it has mapped it's
602    * stack into the threads stack space */
603   sched = GST_ELEMENT_SCHED (thread);
604   if (sched)
605     gst_scheduler_reset (sched);
606
607   /* must do that before releasing the lock - we might get disposed before being done */
608   g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
609
610   /* unlock and signal - we are out */
611
612   GST_LOG_OBJECT (thread, "Thread %p exits main loop", g_thread_self ());
613   g_cond_signal (thread->cond);
614   g_mutex_unlock (thread->lock);
615   /* don't assume the GstThread object exists anymore now */
616
617   return NULL;
618 }
619
620 #ifndef GST_DISABLE_LOADSAVE
621 static xmlNodePtr
622 gst_thread_save_thyself (GstObject * object, xmlNodePtr self)
623 {
624   if (GST_OBJECT_CLASS (parent_class)->save_thyself)
625     GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
626   return NULL;
627 }
628
629 static void
630 gst_thread_restore_thyself (GstObject * object, xmlNodePtr self)
631 {
632   GST_LOG_OBJECT (object, "restoring");
633
634   if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
635     GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
636 }
637 #endif /* GST_DISABLE_LOADSAVE */