Update theme submodule
[platform/upstream/gstreamer.git] / markdown / manual / advanced / threads.md
1 ---
2 title: Threads
3 ...
4
5 # Threads
6
7 GStreamer is inherently multi-threaded, and is fully thread-safe. Most
8 threading internals are hidden from the application, which should make
9 application development easier. However, in some cases, applications may
10 want to have influence on some parts of those. GStreamer allows
11 applications to force the use of multiple threads over some parts of a
12 pipeline. See [When would you want to force a
13 thread?](#when-would-you-want-to-force-a-thread).
14
15 GStreamer can also notify you when threads are created so that you can
16 configure things such as the thread priority or the threadpool to use.
17 See [Configuring Threads in
18 GStreamer](#configuring-threads-in-gstreamer).
19
20 ## Scheduling in GStreamer
21
22 Each element in the GStreamer pipeline decides how it is going to be
23 scheduled. Elements can choose if their pads are to be scheduled
24 push-based or pull-based. An element can, for example, choose to start a
25 thread to start pulling from the sink pad or/and start pushing on the
26 source pad. An element can also choose to use the upstream or downstream
27 thread for its data processing in push and pull mode respectively.
28 GStreamer does not pose any restrictions on how the element chooses to
29 be scheduled. See the Plugin Writer Guide for more details.
30
31 What will happen in any case is that some elements will start a thread
32 for their data processing, called the “streaming threads”. The streaming
33 threads, or `GstTask` objects, are created from a `GstTaskPool` when the
34 element needs to make a streaming thread. In the next section we see how
35 we can receive notifications of the tasks and pools.
36
37 ## Configuring Threads in GStreamer
38
39 A STREAM\_STATUS message is posted on the bus to inform you about the
40 status of the streaming threads. You will get the following information
41 from the message:
42
43   - When a new thread is about to be created, you will be notified of
44     this with a GST\_STREAM\_STATUS\_TYPE\_CREATE type. It is then
45     possible to configure a `GstTaskPool` in the `GstTask`. The custom
46     taskpool will provide custom threads for the task to implement the
47     streaming threads.
48
49     This message needs to be handled synchronously if you want to
50     configure a custom taskpool. If you don't configure the taskpool on
51     the task when this message returns, the task will use its default
52     pool.
53
54   - When a thread is entered or left. This is the moment where you could
55     configure thread priorities. You also get a notification when a
56     thread is destroyed.
57
58   - You get messages when the thread starts, pauses and stops. This
59     could be used to visualize the status of streaming threads in a gui
60     application.
61
62 We will now look at some examples in the next sections.
63
64 ### Boost priority of a thread
65
66 ```
67         .----------.    .----------.
68         | faksesrc |    | fakesink |
69         |         src->sink        |
70         '----------'    '----------'
71
72 ```
73
74 Let's look at the simple pipeline above. We would like to boost the
75 priority of the streaming thread. It will be the fakesrc element that
76 starts the streaming thread for generating the fake data pushing them to
77 the peer fakesink. The flow for changing the priority would go like
78 this:
79
80   - When going from READY to PAUSED state, fakesrc will require a
81     streaming thread for pushing data into the fakesink. It will post a
82     STREAM\_STATUS message indicating its requirement for a streaming
83     thread.
84
85   - The application will react to the STREAM\_STATUS messages with a
86     sync bus handler. It will then configure a custom `GstTaskPool` on
87     the `GstTask` inside the message. The custom taskpool is responsible
88     for creating the threads. In this example we will make a thread with
89     a higher priority.
90
91   - Alternatively, since the sync message is called in the thread
92     context, you can use thread ENTER/LEAVE notifications to change the
93     priority or scheduling pollicy of the current thread.
94
95 In a first step we need to implement a custom `GstTaskPool` that we can
96 configure on the task. Below is the implementation of a `GstTaskPool`
97 subclass that uses pthreads to create a SCHED\_RR real-time thread. Note
98 that creating real-time threads might require extra priveleges.
99
100 ``` c
101 #include <pthread.h>
102
103 typedef struct
104 {
105   pthread_t thread;
106 } TestRTId;
107
108 G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
109
110 static void
111 default_prepare (GstTaskPool * pool, GError ** error)
112 {
113   /* we don't do anything here. We could construct a pool of threads here that
114    * we could reuse later but we don't */
115 }
116
117 static void
118 default_cleanup (GstTaskPool * pool)
119 {
120 }
121
122 static gpointer
123 default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
124     GError ** error)
125 {
126   TestRTId *tid;
127   gint res;
128   pthread_attr_t attr;
129   struct sched_param param;
130
131   tid = g_slice_new0 (TestRTId);
132
133   pthread_attr_init (&attr);
134   if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
135     g_warning ("setschedpolicy: failure: %p", g_strerror (res));
136
137   param.sched_priority = 50;
138   if ((res = pthread_attr_setschedparam (&attr, &param)) != 0)
139     g_warning ("setschedparam: failure: %p", g_strerror (res));
140
141   if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
142     g_warning ("setinheritsched: failure: %p", g_strerror (res));
143
144   res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
145
146   if (res != 0) {
147     g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
148         "Error creating thread: %s", g_strerror (res));
149     g_slice_free (TestRTId, tid);
150     tid = NULL;
151   }
152
153   return tid;
154 }
155
156 static void
157 default_join (GstTaskPool * pool, gpointer id)
158 {
159   TestRTId *tid = (TestRTId *) id;
160
161   pthread_join (tid->thread, NULL);
162
163   g_slice_free (TestRTId, tid);
164 }
165
166 static void
167 test_rt_pool_class_init (TestRTPoolClass * klass)
168 {
169   GstTaskPoolClass *gsttaskpool_class;
170
171   gsttaskpool_class = (GstTaskPoolClass *) klass;
172
173   gsttaskpool_class->prepare = default_prepare;
174   gsttaskpool_class->cleanup = default_cleanup;
175   gsttaskpool_class->push = default_push;
176   gsttaskpool_class->join = default_join;
177 }
178
179 static void
180 test_rt_pool_init (TestRTPool * pool)
181 {
182 }
183
184 GstTaskPool *
185 test_rt_pool_new (void)
186 {
187   GstTaskPool *pool;
188
189   pool = g_object_new (TEST_TYPE_RT_POOL, NULL);
190
191   return pool;
192 }
193
194
195
196 ```
197
198 The important function to implement when writing an taskpool is the
199 “push” function. The implementation should start a thread that calls
200 the given function. More involved implementations might want to keep
201 some threads around in a pool because creating and destroying threads is
202 not always the fastest operation.
203
204 In a next step we need to actually configure the custom taskpool when
205 the fakesrc needs it. For this we intercept the STREAM\_STATUS messages
206 with a sync handler.
207
208 ``` c
209
210
211 static GMainLoop* loop;
212
213 static void
214 on_stream_status (GstBus     *bus,
215                   GstMessage *message,
216                   gpointer    user_data)
217 {
218   GstStreamStatusType type;
219   GstElement *owner;
220   const GValue *val;
221   GstTask *task = NULL;
222
223   gst_message_parse_stream_status (message, &type, &owner);
224
225   val = gst_message_get_stream_status_object (message);
226
227   /* see if we know how to deal with this object */
228   if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
229     task = g_value_get_object (val);
230   }
231
232   switch (type) {
233     case GST_STREAM_STATUS_TYPE_CREATE:
234       if (task) {
235         GstTaskPool *pool;
236
237         pool = test_rt_pool_new();
238
239         gst_task_set_pool (task, pool);
240       }
241       break;
242     default:
243       break;
244   }
245 }
246
247 static void
248 on_error (GstBus     *bus,
249           GstMessage *message,
250           gpointer    user_data)
251 {
252   g_message ("received ERROR");
253   g_main_loop_quit (loop);
254 }
255
256 static void
257 on_eos (GstBus     *bus,
258         GstMessage *message,
259         gpointer    user_data)
260 {
261   g_main_loop_quit (loop);
262 }
263
264 int
265 main (int argc, char *argv[])
266 {
267   GstElement *bin, *fakesrc, *fakesink;
268   GstBus *bus;
269   GstStateChangeReturn ret;
270
271   gst_init (&argc, &argv);
272
273   /* create a new bin to hold the elements */
274   bin = gst_pipeline_new ("pipeline");
275   g_assert (bin);
276
277   /* create a source */
278   fakesrc = gst_element_factory_make ("fakesrc", "fakesrc");
279   g_assert (fakesrc);
280   g_object_set (fakesrc, "num-buffers", 50, NULL);
281
282   /* and a sink */
283   fakesink = gst_element_factory_make ("fakesink", "fakesink");
284   g_assert (fakesink);
285
286   /* add objects to the main pipeline */
287   gst_bin_add_many (GST_BIN (bin), fakesrc, fakesink, NULL);
288
289   /* link the elements */
290   gst_element_link (fakesrc, fakesink);
291
292   loop = g_main_loop_new (NULL, FALSE);
293
294   /* get the bus, we need to install a sync handler */
295   bus = gst_pipeline_get_bus (GST_PIPELINE (bin));
296   gst_bus_enable_sync_message_emission (bus);
297   gst_bus_add_signal_watch (bus);
298
299   g_signal_connect (bus, "sync-message::stream-status",
300       (GCallback) on_stream_status, NULL);
301   g_signal_connect (bus, "message::error",
302       (GCallback) on_error, NULL);
303   g_signal_connect (bus, "message::eos",
304       (GCallback) on_eos, NULL);
305
306   /* start playing */
307   ret = gst_element_set_state (bin, GST_STATE_PLAYING);
308   if (ret != GST_STATE_CHANGE_SUCCESS) {
309     g_message ("failed to change state");
310     return -1;
311   }
312
313   /* Run event loop listening for bus messages until EOS or ERROR */
314   g_main_loop_run (loop);
315
316   /* stop the bin */
317   gst_element_set_state (bin, GST_STATE_NULL);
318   gst_object_unref (bus);
319   g_main_loop_unref (loop);
320
321   return 0;
322 }
323
324
325
326 ``` c
327
328 Note that this program likely needs root permissions in order to create
329 real-time threads. When the thread can't be created, the state change
330 function will fail, which we catch in the application above.
331
332 When there are multiple threads in the pipeline, you will receive
333 multiple STREAM\_STATUS messages. You should use the owner of the
334 message, which is likely the pad or the element that starts the thread,
335 to figure out what the function of this thread is in the context of the
336 application.
337
338 ## When would you want to force a thread?
339
340 We have seen that threads are created by elements but it is also
341 possible to insert elements in the pipeline for the sole purpose of
342 forcing a new thread in the pipeline.
343
344 There are several reasons to force the use of threads. However, for
345 performance reasons, you never want to use one thread for every element
346 out there, since that will create some overhead. Let's now list some
347 situations where threads can be particularly useful:
348
349   - Data buffering, for example when dealing with network streams or
350     when recording data from a live stream such as a video or audio
351     card. Short hickups elsewhere in the pipeline will not cause data
352     loss. See also [Stream
353     buffering](manual/advanced/buffering.md#stream-buffering) about network
354     buffering with queue2.
355
356     ![Data buffering, from a networked
357     source](images/thread-buffering.png "fig:")
358
359   - Synchronizing output devices, e.g. when playing a stream containing
360     both video and audio data. By using threads for both outputs, they
361     will run independently and their synchronization will be better.
362
363     ![Synchronizing audio and video
364     sinks](images/thread-synchronizing.png "fig:")
365
366 Above, we've mentioned the “queue” element several times now. A queue is
367 the thread boundary element through which you can force the use of
368 threads. It does so by using a classic provider/consumer model as
369 learned in threading classes at universities all around the world. By
370 doing this, it acts both as a means to make data throughput between
371 threads threadsafe, and it can also act as a buffer. Queues have several
372 `GObject` properties to be configured for specific uses. For example,
373 you can set lower and upper thresholds for the element. If there's less
374 data than the lower threshold (default: disabled), it will block output.
375 If there's more data than the upper threshold, it will block input or
376 (if configured to do so) drop data.
377
378 To use a queue (and therefore force the use of two distinct threads in
379 the pipeline), one can simply create a “queue” element and put this in
380 as part of the pipeline. GStreamer will take care of all threading
381 details internally.