offsets may be negative (nobody ever seeked beackwards, hu?)
[platform/upstream/gstreamer.git] / gst / gstthread.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Benjamin Otte <in7y118@public.uni-hamburg.de>
5  *
6  * gstthread.c: Threaded container object
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 #include "gstthread.h"
25 #include "gstscheduler.h"
26 #include "gstinfo.h"
27 #include "gstlog.h"
28
29 #define STACK_SIZE 0x200000
30
31 GstElementDetails gst_thread_details = {
32   "Threaded container",
33   "Generic/Bin",
34   "LGPL",
35   "Container that creates/manages a thread",
36   VERSION,
37   "Erik Walthinsen <omega@cse.ogi.edu>,"
38   "Benjamin Otte <in7y118@informatik.uni-hamburg.de",
39   "(C) 1999-2003",
40 };
41
42
43 /* Thread signals and args */
44 enum {
45   SHUTDOWN,
46   /* FILL ME */
47   LAST_SIGNAL
48 };
49
50 enum {
51   SPINUP=0,
52   STATECHANGE,
53   STARTUP
54 };
55
56 enum {
57   ARG_0,
58   ARG_PRIORITY,
59 };
60
61
62
63 static void             gst_thread_class_init           (GstThreadClass *klass);
64 static void             gst_thread_init                 (GstThread *thread);
65
66 static void             gst_thread_dispose              (GObject *object);
67
68 static void             gst_thread_set_property         (GObject *object, guint prop_id, 
69                                                          const GValue *value, GParamSpec *pspec);
70 static void             gst_thread_get_property         (GObject *object, guint prop_id,
71                                                          GValue *value, GParamSpec *pspec);
72 static GstElementStateReturn gst_thread_change_state    (GstElement *element);
73 static void             gst_thread_child_state_change   (GstBin *bin, GstElementState oldstate, 
74                                                          GstElementState newstate, GstElement *element);
75
76 static void             gst_thread_catch                (GstThread *thread);
77 static void             gst_thread_release              (GstThread *thread);
78
79 #ifndef GST_DISABLE_LOADSAVE
80 static xmlNodePtr       gst_thread_save_thyself         (GstObject *object,
81                                                          xmlNodePtr parent);
82 static void             gst_thread_restore_thyself      (GstObject *object,
83                                                          xmlNodePtr self);
84 #endif
85
86 static void *           gst_thread_main_loop            (void *arg);
87
88 #define GST_TYPE_THREAD_PRIORITY (gst_thread_priority_get_type())
89 static GType
90 gst_thread_priority_get_type(void) 
91 {
92   static GType thread_priority_type = 0;
93   static GEnumValue thread_priority[] = 
94   {
95     { G_THREAD_PRIORITY_LOW,    "LOW",     "Low Priority Scheduling" },
96     { G_THREAD_PRIORITY_NORMAL, "NORMAL",  "Normal Scheduling" },
97     { G_THREAD_PRIORITY_HIGH,   "HIGH",    "High Priority Scheduling" },
98     { G_THREAD_PRIORITY_URGENT, "URGENT",  "Urgent Scheduling" },
99     { 0, NULL, NULL },
100   };
101   if (!thread_priority_type) {
102     thread_priority_type = g_enum_register_static("GstThreadPriority", thread_priority);
103   }
104   return thread_priority_type;
105 }
106
107 static GstBinClass *parent_class = NULL;
108 static guint gst_thread_signals[LAST_SIGNAL] = { 0 };
109 GPrivate *gst_thread_current;
110
111 GType
112 gst_thread_get_type(void) {
113   static GType thread_type = 0;
114
115   if (!thread_type) {
116     static const GTypeInfo thread_info = {
117       sizeof (GstThreadClass), NULL, NULL,
118       (GClassInitFunc) gst_thread_class_init, NULL, NULL,
119       sizeof (GstThread),
120       4,
121       (GInstanceInitFunc) gst_thread_init,
122       NULL
123     };
124     thread_type = g_type_register_static (GST_TYPE_BIN, "GstThread",
125                                           &thread_info, 0);
126   }
127   return thread_type;
128 }
129
130 static void do_nothing (gpointer hi) {}
131 static void
132 gst_thread_class_init (GstThreadClass *klass)
133 {
134   GObjectClass *gobject_class;
135   GstObjectClass *gstobject_class;
136   GstElementClass *gstelement_class;
137   GstBinClass *gstbin_class;
138
139   /* setup gst_thread_current */
140   gst_thread_current = g_private_new (do_nothing);
141
142   gobject_class =       (GObjectClass*)klass;
143   gstobject_class =     (GstObjectClass*)klass;
144   gstelement_class =    (GstElementClass*)klass;
145   gstbin_class =        (GstBinClass*)klass;
146
147   parent_class = g_type_class_ref (GST_TYPE_BIN);
148
149   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PRIORITY,
150     g_param_spec_enum ("priority", "Scheduling Policy", "The scheduling priority of the thread",
151                        GST_TYPE_THREAD_PRIORITY, G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE));
152
153   gst_thread_signals[SHUTDOWN] =
154     g_signal_new ("shutdown", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
155                   G_STRUCT_OFFSET (GstThreadClass, shutdown), NULL, NULL,
156                   gst_marshal_VOID__VOID, G_TYPE_NONE, 0);
157
158   gobject_class->dispose =              gst_thread_dispose;
159
160 #ifndef GST_DISABLE_LOADSAVE
161   gstobject_class->save_thyself =       GST_DEBUG_FUNCPTR (gst_thread_save_thyself);
162   gstobject_class->restore_thyself =    GST_DEBUG_FUNCPTR (gst_thread_restore_thyself);
163 #endif
164
165   gstelement_class->change_state =      GST_DEBUG_FUNCPTR (gst_thread_change_state);
166
167   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property);
168   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property);
169
170   gstbin_class->child_state_change =    GST_DEBUG_FUNCPTR (gst_thread_child_state_change);
171 }
172
173 static void
174 gst_thread_init (GstThread *thread)
175 {
176   GstScheduler *scheduler;
177
178   GST_DEBUG (GST_CAT_THREAD, "initializing thread");
179
180   /* threads are managing bins and iterate themselves */
181   /* CR1: the GstBin code checks these flags */
182   GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
183   GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE);
184
185   scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread));
186   g_assert (scheduler);
187
188   thread->lock = g_mutex_new ();
189   thread->cond = g_cond_new ();
190
191   thread->thread_id = (GThread *) NULL; /* set in NULL -> READY */
192   thread->priority = G_THREAD_PRIORITY_NORMAL;
193 }
194
195 static void
196 gst_thread_dispose (GObject *object)
197 {
198   GstThread *thread = GST_THREAD (object);
199
200   GST_DEBUG (GST_CAT_REFCOUNTING, "GstThread: dispose");
201
202   G_OBJECT_CLASS (parent_class)->dispose (object);
203
204   g_assert (GST_STATE (thread) == GST_STATE_NULL);
205
206   g_mutex_free (thread->lock);
207   g_cond_free (thread->cond);
208
209   gst_object_replace ((GstObject **)&GST_ELEMENT_SCHED (thread), NULL);
210 }
211
212 /**
213  * gst_thread_set_priority:
214  * @thread: the thread to change 
215  * @priority: the new priority for the thread
216  *
217  * change the thread's priority
218  */
219 void
220 gst_thread_set_priority (GstThread *thread, GThreadPriority priority)
221 {
222   g_return_if_fail (GST_IS_THREAD (thread));
223
224   thread->priority = priority;
225 }
226
227 static void
228 gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
229 {
230   GstThread *thread;
231
232   thread = GST_THREAD (object);
233
234   switch (prop_id) {
235     case ARG_PRIORITY:
236       thread->priority = g_value_get_enum (value);
237       break;
238     default:
239       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
240       break;
241   }
242 }
243
244 static void
245 gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
246 {
247   GstThread *thread;
248
249   thread = GST_THREAD (object);
250
251   switch (prop_id) {
252     case ARG_PRIORITY:
253       g_value_set_enum (value, thread->priority);
254       break;
255     default:
256       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
257       break;
258   }
259 }
260
261
262 /**
263  * gst_thread_new:
264  * @name: the name of the thread
265  *
266  * Create a new thread with the given name.
267  *
268  * Returns: The new thread
269  */
270 GstElement*
271 gst_thread_new (const gchar *name)
272 {
273   return gst_element_factory_make ("thread", name);
274 }
275
276 /**
277  * gst_thread_get_current:
278  *
279  * Create a new thread with the given name.
280  *
281  * Returns: The current GstThread or NULL if you are not running inside a 
282  *          #GstThread.
283  */
284 GstThread *
285 gst_thread_get_current (void)
286 {
287   return (GstThread *) g_private_get (gst_thread_current);
288 }
289
290 static inline void
291 gst_thread_release_children_locks (GstThread *thread)
292 {
293   GstRealPad *peer = NULL;
294   GstElement *peerelement;
295   GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
296
297   while (elements) {
298     GstElement *element = GST_ELEMENT (elements->data);
299     GList *pads;
300
301     g_assert (element);
302     GST_DEBUG (GST_CAT_THREAD, "waking element \"%s\"", GST_ELEMENT_NAME (element));
303     elements = g_list_next (elements);
304
305     if (!gst_element_release_locks (element))
306       g_warning ("element %s could not release locks", GST_ELEMENT_NAME (element));
307
308     pads = GST_ELEMENT_PADS (element);
309
310     while (pads) {
311       if (GST_PAD_PEER (pads->data)) {
312         peer = GST_REAL_PAD (GST_PAD_PEER (pads->data));
313         pads = g_list_next (pads);
314       } else {
315         pads = g_list_next (pads);
316         continue;
317       }
318
319       if (!peer)
320         continue;
321
322       peerelement = GST_PAD_PARENT (peer);
323       if (!peerelement)
324         continue; /* FIXME: deal with case where there's no peer */
325
326       if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) {
327         GST_DEBUG (GST_CAT_THREAD, "element \"%s\" has pad cross sched boundary", GST_ELEMENT_NAME (element));
328         GST_DEBUG (GST_CAT_THREAD, "waking element \"%s\"", GST_ELEMENT_NAME (peerelement));
329         if (!gst_element_release_locks (peerelement))
330           g_warning ("element %s could not release locks", GST_ELEMENT_NAME (peerelement));
331       }
332     }
333   }
334 }
335 /* stops the main thread, if there is one and grabs the thread's mutex */
336 static void
337 gst_thread_catch (GstThread *thread)
338 {
339   gboolean wait;
340   if (thread == gst_thread_get_current()) {
341     /* we're trying to catch ourself */
342     if (!GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
343       g_mutex_lock (thread->lock);
344       GST_FLAG_SET (thread, GST_THREAD_MUTEX_LOCKED);
345     }
346     GST_DEBUG (GST_CAT_THREAD, "%s is catching itself", GST_ELEMENT_NAME (thread));
347     GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
348   } else {
349     /* another thread is trying to catch us */
350     g_mutex_lock (thread->lock);
351     wait = !GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING);
352     while (!wait) {
353       GTimeVal tv;
354       GST_DEBUG (GST_CAT_THREAD, "catching %s...", GST_ELEMENT_NAME (thread));
355       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
356       g_cond_signal (thread->cond);
357       gst_thread_release_children_locks (thread);
358       g_get_current_time (&tv);
359       g_time_val_add (&tv, 1000); /* wait a millisecond to catch the thread */
360       wait = g_cond_timed_wait (thread->cond, thread->lock, &tv);
361     }
362     GST_DEBUG (GST_CAT_THREAD, "caught %s", GST_ELEMENT_NAME (thread));
363   }
364   g_assert (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING));
365 }
366
367 static void
368 gst_thread_release (GstThread *thread)
369 {
370   if (thread != gst_thread_get_current()) {
371     g_cond_signal (thread->cond);
372     g_mutex_unlock (thread->lock);
373   }
374 }
375
376 static GstElementStateReturn
377 gst_thread_change_state (GstElement *element)
378 {
379   GstThread *thread;
380   GstElementStateReturn ret;
381   gint transition;
382
383   g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
384   transition = GST_STATE_TRANSITION (element);
385
386   thread = GST_THREAD (element);
387
388   GST_DEBUG (GST_CAT_STATES, "%s is changing state from %s to %s",
389              GST_ELEMENT_NAME (element), gst_element_state_get_name (GST_STATE (element)),
390              gst_element_state_get_name (GST_STATE_PENDING (element)));
391
392   gst_thread_catch (thread);
393
394   /* FIXME: (or GStreamers ideas about "threading"): the element variables are
395      commonly accessed by multiple threads at the same time (see bug #111146
396      for an example) */
397   if (transition != GST_STATE_TRANSITION (element)) {
398     g_warning ("inconsistent state information, fix threading please");
399   }
400
401   switch (transition) {
402     case GST_STATE_NULL_TO_READY:
403       /* create the thread */
404       GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
405       thread->thread_id = g_thread_create_full(gst_thread_main_loop,
406           thread, STACK_SIZE, FALSE, TRUE, thread->priority,
407           NULL);
408       if (!thread->thread_id){
409         GST_DEBUG (GST_CAT_THREAD, "g_thread_create_full for %sfailed", GST_ELEMENT_NAME (element));
410         goto error_out;
411       }
412       GST_DEBUG (GST_CAT_THREAD, "GThread created");
413
414       /* wait for it to 'spin up' */
415       g_cond_wait (thread->cond, thread->lock);
416       break;
417     case GST_STATE_READY_TO_PAUSED:
418       break;
419     case GST_STATE_PAUSED_TO_PLAYING:
420     {
421       /* FIXME: recurse into sub-bins */
422       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
423       while (elements) {
424         gst_element_enable_threadsafe_properties ((GstElement*)elements->data);
425         elements = g_list_next (elements);
426       }
427       /* reset self to spinning */
428       if (thread == gst_thread_get_current()) 
429         GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
430       break;
431     }
432     case GST_STATE_PLAYING_TO_PAUSED:
433     {
434       GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
435       while (elements) {
436         gst_element_disable_threadsafe_properties ((GstElement*)elements->data);
437         elements = g_list_next (elements);
438       }
439       break;
440     }
441     case GST_STATE_PAUSED_TO_READY:
442       break;
443     case GST_STATE_READY_TO_NULL:
444       /* we can't join the threads here, because this could have been triggered
445          by ourself (ouch) */
446       GST_DEBUG (GST_CAT_THREAD, "destroying GThread %p", thread->thread_id);
447       GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING);
448       thread->thread_id = NULL;
449       if (thread == gst_thread_get_current()) {
450         /* or should we continue? */
451         g_warning ("Thread %s is destroying itself. Function call will not return!", GST_ELEMENT_NAME (thread));
452         gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
453         
454         /* unlock and signal - we are out */
455         gst_thread_release (thread);
456
457         GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
458                   GST_ELEMENT_NAME (thread));
459
460         g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
461
462         g_thread_exit (NULL);
463       }
464       /* now wait for the thread to destroy itself */
465       g_cond_signal (thread->cond);
466       g_cond_wait (thread->cond, thread->lock);
467       /* it should be dead now */
468       break;
469     default:
470       GST_DEBUG_ELEMENT (GST_CAT_THREAD, element, "UNHANDLED STATE CHANGE! %x", 
471                          GST_STATE_TRANSITION (element));
472       g_assert_not_reached ();
473       break;
474   }
475
476   if (GST_ELEMENT_CLASS (parent_class)->change_state) {
477     ret = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (thread));
478   } else {
479     ret = GST_STATE_SUCCESS;
480   }
481
482   gst_thread_release (thread);
483   return ret;
484   
485 error_out:
486   GST_DEBUG (GST_CAT_STATES, "changing state from %s to %s failed for %s",
487              gst_element_state_get_name (GST_STATE (element)),
488              gst_element_state_get_name (GST_STATE_PENDING (element)),
489              GST_ELEMENT_NAME (element));
490   gst_thread_release (thread);
491   return GST_STATE_FAILURE;
492 }
493
494 /* state changes work this way: We grab the lock and stop the thread from 
495    spinning (via gst_thread_catch) - then we change the state. After that the
496    thread may spin on. */
497 static void
498 gst_thread_child_state_change (GstBin *bin, GstElementState oldstate, 
499                                GstElementState newstate, GstElement *element)
500 {
501   GST_DEBUG (GST_CAT_THREAD, "%s (from thread %s) child %s changed state from %s to %s",
502               GST_ELEMENT_NAME (bin), 
503               gst_thread_get_current() ? GST_ELEMENT_NAME (gst_thread_get_current()) : "(none)", 
504               GST_ELEMENT_NAME (element), gst_element_state_get_name (oldstate),
505               gst_element_state_get_name (newstate));
506   if (parent_class->child_state_change)
507     parent_class->child_state_change (bin, oldstate, newstate, element);
508   /* We'll wake up the main thread now. Note that we can't lock the thread here, 
509      because we might be called from inside gst_thread_change_state when holding
510      the lock. But this doesn't cause any problems. */
511   if (newstate == GST_STATE_PLAYING)
512     g_cond_signal (GST_THREAD (bin)->cond);
513 }
514 /**
515  * gst_thread_main_loop:
516  * @arg: the thread to start
517  *
518  * The main loop of the thread. The thread will iterate
519  * while the state is GST_THREAD_STATE_SPINNING.
520  */
521 static void *
522 gst_thread_main_loop (void *arg)
523 {
524   GstThread *thread = NULL;
525   gboolean status;
526
527   thread = GST_THREAD (arg);
528   g_mutex_lock (thread->lock);
529   GST_DEBUG (GST_CAT_THREAD, "Thread %s started main loop", GST_ELEMENT_NAME (thread));
530
531   /* initialize gst_thread_current */
532   g_private_set (gst_thread_current, thread);
533
534   /* set up the element's scheduler */
535   gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
536   GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
537
538   g_cond_signal (thread->cond);
539   while (!(GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))) {
540     if (GST_STATE (thread) == GST_STATE_PLAYING) {
541       GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING);
542       status = TRUE;
543       GST_DEBUG (GST_CAT_THREAD, "%s starts iterating", GST_ELEMENT_NAME (thread));
544       while (status && GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
545         g_mutex_unlock (thread->lock);
546         status = gst_bin_iterate (GST_BIN (thread));
547         if (GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) {
548           GST_FLAG_UNSET (thread, GST_THREAD_MUTEX_LOCKED);
549         } else {
550           g_mutex_lock (thread->lock);
551         }
552       }
553       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
554     }
555     if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))
556       break;
557     GST_DEBUG (GST_CAT_THREAD, "%s was caught", GST_ELEMENT_NAME (thread));
558     g_cond_signal (thread->cond);
559     g_cond_wait (thread->cond, thread->lock);
560   }
561
562   /* we need to destroy the scheduler here because it has mapped it's
563    * stack into the threads stack space */
564   gst_scheduler_reset (GST_ELEMENT_SCHED (thread));
565
566   /* must do that before releasing the lock - we might get disposed before being done */
567   g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0);
568
569   /* unlock and signal - we are out */
570   g_cond_signal (thread->cond);
571   g_mutex_unlock (thread->lock);
572
573   GST_INFO (GST_CAT_THREAD, "gstthread: thread \"%s\" is stopped",
574             GST_ELEMENT_NAME (thread));
575
576   return NULL;
577 }
578
579 #ifndef GST_DISABLE_LOADSAVE
580 static xmlNodePtr
581 gst_thread_save_thyself (GstObject *object,
582                          xmlNodePtr self)
583 {
584   if (GST_OBJECT_CLASS (parent_class)->save_thyself)
585     GST_OBJECT_CLASS (parent_class)->save_thyself (object, self);
586   return NULL;
587 }
588
589 static void
590 gst_thread_restore_thyself (GstObject *object,
591                             xmlNodePtr self)
592 {
593   GST_DEBUG (GST_CAT_THREAD,"gstthread: restore");
594
595   if (GST_OBJECT_CLASS (parent_class)->restore_thyself)
596     GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self);
597 }
598 #endif /* GST_DISABLE_LOADSAVE */