fix debugging segfault
[platform/upstream/gstreamer.git] / gst / gstthread.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Benjamin Otte <in7y118@public.uni-hamburg.de>
5  *
6  * gstthread.c: Threaded container object
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 #include "gstthread.h"
25 #include "gstscheduler.h"
26 #include "gstinfo.h"
27 #include "gstlog.h"
28
29 #define STACK_SIZE 0x200000
30
31 GstElementDetails gst_thread_details = {
32   "Threaded container",
33   "Generic/Bin",
34   "LGPL",
35   "Container that creates/manages a thread",
36   VERSION,
37   "Erik Walthinsen <omega@cse.ogi.edu>,"
38   "Benjamin Otte <in7y118@informatik.uni-hamburg.de",
39   "(C) 1999-2003",
40 };
41
42
43 /* Thread signals and args */
44 enum {
45   SHUTDOWN,
46   /* FILL ME */
47   LAST_SIGNAL
48 };
49
50 enum {
51   SPINUP=0,
52   STATECHANGE,
53   STARTUP
54 };
55
56 enum {
57   ARG_0,
58   ARG_PRIORITY,
59 };
60
61
62
63 static void             gst_thread_class_init           (GstThreadClass *klass);
64 static void             gst_thread_init                 (GstThread *thread);
65
66 static void             gst_thread_dispose              (GObject *object);
67
68 static void             gst_thread_set_property         (GObject *object, guint prop_id, 
69                                                          const GValue *value, GParamSpec *pspec);
70 static void             gst_thread_get_property         (GObject *object, guint prop_id,
71                                                          GValue *value, GParamSpec *pspec);
72 static GstElementStateReturn gst_thread_change_state    (GstElement *element);
73 static void             gst_thread_child_state_change   (GstBin *bin, GstElementState oldstate, 
74                                                          GstElementState newstate, GstElement *element);
75
76 static void             gst_thread_catch                (GstThread *thread);
77 static void             gst_thread_release              (GstThread *thread);
78
79 #ifndef GST_DISABLE_LOADSAVE
80 static xmlNodePtr       gst_thread_save_thyself         (GstObject *object,
81                                                          xmlNodePtr parent);
82 static void             gst_thread_restore_thyself      (GstObject *object,
83                                                          xmlNodePtr self);
84 #endif
85
86 static void *           gst_thread_main_loop            (void *arg);
87
88 #define GST_TYPE_THREAD_PRIORITY (gst_thread_priority_get_type())
89 static GType
90 gst_thread_priority_get_type(void) 
91 {
92   static GType thread_priority_type = 0;
93   static GEnumValue thread_priority[] = 
94   {
95     { G_THREAD_PRIORITY_LOW,    "LOW",     "Low Priority Scheduling" },
96     { G_THREAD_PRIORITY_NORMAL, "NORMAL",  "Normal Scheduling" },
97     { G_THREAD_PRIORITY_HIGH,   "HIGH",    "High Priority Scheduling" },
98     { G_THREAD_PRIORITY_URGENT, "URGENT",  "Urgent Scheduling" },
99     { 0, NULL, NULL },
100   };
101   if (!thread_priority_type) {
102     thread_priority_type = g_enum_register_static("GstThreadPriority", thread_priority);
103   }
104   return thread_priority_type;
105 }
106
107 static GstBinClass *parent_class = NULL;
108 static guint gst_thread_signals[LAST_SIGNAL] = { 0 };
109 GPrivate *gst_thread_current;
110
111 GType
112 gst_thread_get_type(void) {
113   static GType thread_type = 0;
114
115   if (!thread_type) {
116     static const GTypeInfo thread_info = {
117       sizeof (GstThreadClass), NULL, NULL,
118       (GClassInitFunc) gst_thread_class_init, NULL, NULL,
119       sizeof (GstThread),
120       4,
121       (GInstanceInitFunc) gst_thread_init,
122       NULL
123     };
124     thread_type = g_type_register_static (GST_TYPE_BIN, "GstThread",
125                                           &thread_info, 0);
126   }
127   return thread_type;
128 }
129
130 static void do_nothing (gpointer hi) {}
131 static void
132 gst_thread_class_init (GstThreadClass *klass)
133 {
134   GObjectClass *gobject_class;
135   GstObjectClass *gstobject_class;
136   GstElementClass *gstelement_class;
137   GstBinClass *gstbin_class;
138
139   /* setup gst_thread_current */
140   gst_thread_current = g_private_new (do_nothing);
141
142   gobject_class =       (GObjectClass*)klass;
143   gstobject_class =     (GstObjectClass*)klass;
144   gstelement_class =    (GstElementClass*)klass;
145   gstbin_class =        (GstBinClass*)klass;
146
147   parent_class = g_type_class_ref (GST_TYPE_BIN);
148
149   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PRIORITY,
150     g_param_spec_enum ("priority", "Scheduling Policy", "The scheduling priority of the thread",
151                        GST_TYPE_THREAD_PRIORITY, G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE));
152
153   gst_thread_signals[SHUTDOWN] =
154     g_signal_new ("shutdown", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
155                   G_STRUCT_OFFSET (GstThreadClass, shutdown), NULL, NULL,
156                   gst_marshal_VOID__VOID, G_TYPE_NONE, 0);
157
158   gobject_class->dispose =              gst_thread_dispose;
159
160 #ifndef GST_DISABLE_LOADSAVE
161   gstobject_class->save_thyself =       GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
162   gstobject_class->restore_thyself =    GST_DEBUG_FUNCPTR (gst_thread_restore_thyself);
163 #endif
164
165   gstelement_class->change_state =      GST_DEBUG_FUNCPTR (gst_thread_change_state);
166
167   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
168   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
169
170   gstbin_class->child_state_change =    GST_DEBUG_FUNCPTR (gst_thread_child_state_change);
171 }
172
173 static void
174 gst_thread_init (GstThread *thread)
175 {
176   GstScheduler *scheduler;
177
178   GST_DEBUG (GST_CAT_THREAD, "initializing thread");
179
180   /* threads are managing bins and iterate themselves */
181   /* CR1: the GstBin code checks these flags */
182   GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
183   GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
184
185   scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread));
186   g_assert (scheduler);
187
188   thread->lock = g_mutex_new ();
189   thread->cond = g_cond_new ();
190
191   thread->thread_id = (GThread *) NULL; /* set in NULL -> READY */
192   thread->priority = G_THREAD_PRIORITY_NORMAL;
193 }
194
195 static void
196 gst_thread_dispose (GObject *object)
197 {
198   GstThread *thread = GST_THREAD (object);
199
200   GST_DEBUG (GST_CAT_REFCOUNTING, "GstThread: dispose");
201
202   G_OBJECT_CLASS (parent_class)->dispose (object);
203
204   g_assert (GST_STATE (thread) == GST_STATE_NULL);
205
206   g_mutex_free (thread->lock);
207   g_cond_free (thread->cond);
208
209   gst_object_replace ((GstObject **)&GST_ELEMENT_SCHED (thread), NULL);
210 }
211
212 /**
213  * gst_thread_set_priority:
214  * @thread: the thread to change 
215  * @priority: the new priority for the thread
216  *
217  * change the thread's priority
218  */
219 void
220 gst_thread_set_priority (GstThread *thread, GThreadPriority priority)
221 {
222   g_return_if_fail (GST_IS_THREAD (thread));
223
224   thread->priority = priority;
225 }
226
227 static void
228 gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
229 {
230   GstThread *thread;
231
232   thread = GST_THREAD (object);
233
234   switch (prop_id) {
235     case ARG_PRIORITY:
236       thread->priority = g_value_get_enum (value);
237       break;
238     default:
239       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
240       break;
241   }
242 }
243
244 static void
245 gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
246 {
247   GstThread *thread;
248
249   thread = GST_THREAD (object);
250
251   switch (prop_id) {
252     case ARG_PRIORITY:
253       g_value_set_enum (value, thread->priority);
254       break;
255     default:
256       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
257       break;
258   }
259 }
260
261
262 /**
263  * gst_thread_new:
264  * @name: the name of the thread
265  *
266  * Create a new thread with the given name.
267  *
268  * Returns: The new thread
269  */
270 GstElement*
271 gst_thread_new (const gchar *name)
272 {
273   return gst_element_factory_make ("thread", name);
274 }
275
276 /**
277  * gst_thread_get_current:
278  *
279  * Create a new thread with the given name.
280  *
281  * Returns: The current GstThread or NULL if you are not running inside a 
282  *          #GstThread.
283  */
284 GstThread *
285 gst_thread_get_current (void)
286 {
287   return (GstThread *) g_private_get (gst_thread_current);
288 }
289
290 static inline void
291 gst_thread_release_children_locks (GstThread *thread)
292 {
293   GstRealPad *peer = NULL;
294   GstElement *peerelement;
295   GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
296
297   while (elements) {
298     GstElement *element = GST_ELEMENT (elements->data);
299     GList *pads;
300
301     g_assert (element);
302     GST_DEBUG (GST_CAT_THREAD, "waking element \"%s\"", GST_ELEMENT_NAME (element));
303     elements = g_list_next (elements);
304
305     if (!gst_element_release_locks (element))
306       g_warning ("element %s could not release locks", GST_ELEMENT_NAME (element));
307
308     pads = GST_ELEMENT_PADS (element);
309
310     while (pads) {
311       if (GST_PAD_PEER (pads->data)) {
312         peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
313         pads = g_list_next (pads);
314       } else {
315         pads = g_list_next (pads);
316         continue;
317       }
318
319       if (!peer)
320         continue;
321
322       peerelement = GST_PAD_PARENT (peer);
323       if (!peerelement)
324         continue; /* FIXME: deal with case where there's no peer */
325
326       if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
327         GST_DEBUG (GST_CAT_THREAD, "element \"%s\" has pad cross sched boundary", GST_ELEMENT_NAME (element));
328         GST_DEBUG (GST_CAT_THREAD, "waking element \"%s\"", GST_ELEMENT_NAME (peerelement));
329         if (!gst_element_release_locks (peerelement))
330           g_warning ("element %s could not release locks", GST_ELEMENT_NAME (peerelement));
331       }
332     }
333   }
334 }
335 /* stops the main thread, if there is one and grabs the thread's mutex */
336 static void
337 gst_thread_catch (GstThread *thread)
338 {
339   gboolean wait;
340   if (thread == gst_thread_get_current()) {
341     /* we're trying to catch ourself */
342     if (!GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
343       g_mutex_lock (thread->lock);
344       GST_FLAG_SET (thread, GST_THREAD_MUTEX_LOCKED);
345     }
346     GST_DEBUG (GST_CAT_THREAD, "%s is catching itself", GST_ELEMENT_NAME (thread));
347     GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
348   } else {
349     /* another thread is trying to catch us */
350     g_mutex_lock (thread->lock);
351     wait = !GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING);
352     while (!wait) {
353       GTimeVal tv;
354       GST_DEBUG (GST_CAT_THREAD, "catching %s...", GST_ELEMENT_NAME (thread));
355       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
356       g_cond_signal (thread->cond);
357       gst_thread_release_children_locks (thread);
358       g_get_current_time (&tv);
359       g_time_val_add (&tv, 1000); /* wait a millisecond to catch the thread */
360       wait = g_cond_timed_wait (thread->cond, thread->lock, &tv);
361     }
362     GST_DEBUG (GST_CAT_THREAD, "caught %s", GST_ELEMENT_NAME (thread));
363   }
364   g_assert (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING));
365 }
366 static void
367 gst_thread_release (GstThread *thread)
368 {
369   if (thread != gst_thread_get_current()) {
370     g_cond_signal (thread->cond);
371     g_mutex_unlock (thread->lock);
372   }
373 }
374 static GstElementStateReturn
375 gst_thread_change_state (GstElement *element)
376 {
377   GstThread *thread;
378   GstElementStateReturn ret;
379   gint transition;
380
381   g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
382   transition = GST_STATE_TRANSITION (element);
383
384   thread = GST_THREAD (element);
385
386   GST_DEBUG (GST_CAT_STATES, "%s is changing state from %s to %s",
387              GST_ELEMENT_NAME (element), gst_element_state_get_name (GST_STATE (element)),
388              gst_element_state_get_name (GST_STATE_PENDING (element)));
389
390   gst_thread_catch (thread);
391
392   /* FIXME: (or GStreamers ideas about "threading"): the element variables are
393      commonly accessed by multiple threads at the same time (see bug #111146
394      for an example) */
395   if (transition != GST_STATE_TRANSITION (element)) {
396     g_warning ("inconsistent state information, fix threading please");
397   }
398
399   switch (transition) {
400     case GST_STATE_NULL_TO_READY:
401       /* create the thread */
402       GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
403       thread->thread_id = g_thread_create_full(gst_thread_main_loop,
404           thread, STACK_SIZE, FALSE, TRUE, thread->priority,
405           NULL);
406       if (!thread->thread_id){
407         GST_DEBUG (GST_CAT_THREAD, "g_thread_create_full for %sfailed", GST_ELEMENT_NAME (element));
408         goto error_out;
409       }
410       GST_DEBUG (GST_CAT_THREAD, "GThread created");
411
412       /* wait for it to 'spin up' */
413       g_cond_wait (thread->cond, thread->lock);
414       break;
415     case GST_STATE_READY_TO_PAUSED:
416       break;
417     case GST_STATE_PAUSED_TO_PLAYING:
418     {
419       /* FIXME: recurse into sub-bins */
420       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
421       while (elements) {
422         gst_element_enable_threadsafe_properties ((GstElement*)elements->data);
423         elements = g_list_next (elements);
424       }
425       break;
426     }
427     case GST_STATE_PLAYING_TO_PAUSED:
428     {
429       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
430       while (elements) {
431         gst_element_disable_threadsafe_properties ((GstElement*)elements->data);
432         elements = g_list_next (elements);
433       }
434       break;
435     }
436     case GST_STATE_PAUSED_TO_READY:
437       break;
438     case GST_STATE_READY_TO_NULL:
439       /* we can't join the threads here, because this could have been triggered
440          by ourself (ouch) */
441       GST_DEBUG (GST_CAT_THREAD, "destroying GThread %p", thread->thread_id);
442       GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
443       thread->thread_id = NULL;
444       if (thread == gst_thread_get_current()) {
445         /* or should we continue? */
446         g_warning ("Thread %s is destroying itself. Function call will not return!", GST_ELEMENT_NAME (thread));
447         gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
448         
449         /* unlock and signal - we are out */
450         gst_thread_release (thread);
451
452         GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
453                   GST_ELEMENT_NAME (thread));
454
455         g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
456
457         g_thread_exit (NULL);
458       }
459       /* now wait for the thread to destroy itself */
460       g_cond_signal (thread->cond);
461       g_cond_wait (thread->cond, thread->lock);
462       /* it should be dead now */
463       break;
464     default:
465       GST_DEBUG_ELEMENT (GST_CAT_THREAD, element, "UNHANDLED STATE CHANGE! %x", 
466                          GST_STATE_TRANSITION (element));
467       g_assert_not_reached ();
468       break;
469   }
470
471   if (GST_ELEMENT_CLASS (parent_class)->change_state) {
472     ret = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (thread));
473   } else {
474     ret = GST_STATE_SUCCESS;
475   }
476
477   gst_thread_release (thread);
478   return ret;
479   
480 error_out:
481   GST_DEBUG (GST_CAT_STATES, "changing state from %s to %s failed for %s",
482              gst_element_state_get_name (GST_STATE (element)),
483              gst_element_state_get_name (GST_STATE_PENDING (element)),
484              GST_ELEMENT_NAME (element));
485   gst_thread_release (thread);
486   return GST_STATE_FAILURE;
487 }
488
489 /* state changes work this way: We grab the lock and stop the thread from 
490    spinning (via gst_thread_catch) - then we change the state. After that the
491    thread may spin on. */
492 static void
493 gst_thread_child_state_change (GstBin *bin, GstElementState oldstate, 
494                                GstElementState newstate, GstElement *element)
495 {
496   GST_DEBUG (GST_CAT_THREAD, "%s (from thread %s) child %s changed state from %s to %s\n",
497               GST_ELEMENT_NAME (bin), 
498               gst_thread_get_current() ? GST_ELEMENT_NAME (gst_thread_get_current()) : "(none)", 
499               GST_ELEMENT_NAME (element), gst_element_state_get_name (oldstate),
500               gst_element_state_get_name (newstate));
501   if (parent_class->child_state_change)
502     parent_class->child_state_change (bin, oldstate, newstate, element);
503   /* We'll wake up the main thread now. Note that we can't lock the thread here, 
504      because we might be called from inside gst_thread_change_state when holding
505      the lock. But this doesn't cause any problems. */
506   if (newstate == GST_STATE_PLAYING)
507     g_cond_signal (GST_THREAD (bin)->cond);
508 }
509 /**
510  * gst_thread_main_loop:
511  * @arg: the thread to start
512  *
513  * The main loop of the thread. The thread will iterate
514  * while the state is GST_THREAD_STATE_SPINNING.
515  */
516 static void *
517 gst_thread_main_loop (void *arg)
518 {
519   GstThread *thread = NULL;
520   gboolean status;
521
522   thread = GST_THREAD (arg);
523   g_mutex_lock (thread->lock);
524   GST_DEBUG (GST_CAT_THREAD, "Thread %s started main loop", GST_ELEMENT_NAME (thread));
525
526   /* initialize gst_thread_current */
527   g_private_set (gst_thread_current, thread);
528
529   /* set up the element's scheduler */
530   gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
531   GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
532
533   g_cond_signal (thread->cond);
534   while (!(GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))) {
535     if (GST_STATE (thread) == GST_STATE_PLAYING) {
536       GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
537       status = TRUE;
538       GST_DEBUG (GST_CAT_THREAD, "%s starts iterating\n", GST_ELEMENT_NAME (thread));
539       while (status && GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
540         g_mutex_unlock (thread->lock);
541         status = gst_bin_iterate (GST_BIN (thread));
542         if (GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
543           GST_FLAG_UNSET (thread, GST_THREAD_MUTEX_LOCKED);
544         } else {
545           g_mutex_lock (thread->lock);
546         }
547       }
548       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
549     }
550     if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))
551       break;
552     GST_DEBUG (GST_CAT_THREAD, "%s was caught\n", GST_ELEMENT_NAME (thread));
553     g_cond_signal (thread->cond);
554     g_cond_wait (thread->cond, thread->lock);
555   }
556
557   /* we need to destroy the scheduler here because it has mapped it's
558    * stack into the threads stack space */
559   gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
560
561   /* must do that before releasing the lock - we might get disposed before being done */
562   g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
563
564   /* unlock and signal - we are out */
565   g_cond_signal (thread->cond);
566   g_mutex_unlock (thread->lock);
567
568   GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
569             GST_ELEMENT_NAME (thread));
570
571   return NULL;
572 }
573
574 #ifndef GST_DISABLE_LOADSAVE
575 static xmlNodePtr
576 gst_thread_save_thyself (GstObject *object,
577                          xmlNodePtr self)
578 {
579   if (GST_OBJECT_CLASS (parent_class)->save_thyself)
580     GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
581   return NULL;
582 }
583
584 static void
585 gst_thread_restore_thyself (GstObject *object,
586                             xmlNodePtr self)
587 {
588   GST_DEBUG (GST_CAT_THREAD,"gstthread: restore");
589
590   if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
591     GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
592 }
593 #endif /* GST_DISABLE_LOADSAVE */