7 GStreamer is inherently multi-threaded, and is fully thread-safe. Most
8 threading internals are hidden from the application, which should make
9 application development easier. However, in some cases, applications may
10 want to have influence on some parts of those. GStreamer allows
11 applications to force the use of multiple threads over some parts of a
12 pipeline. See [When would you want to force a
13 thread?](#when-would-you-want-to-force-a-thread).
15 GStreamer can also notify you when threads are created so that you can
16 configure things such as the thread priority or the threadpool to use.
17 See [Configuring Threads in
18 GStreamer](#configuring-threads-in-gstreamer).
20 ## Scheduling in GStreamer
22 Each element in the GStreamer pipeline decides how it is going to be
23 scheduled. Elements can choose if their pads are to be scheduled
24 push-based or pull-based. An element can, for example, choose to start a
25 thread to start pulling from the sink pad or/and start pushing on the
26 source pad. An element can also choose to use the upstream or downstream
27 thread for its data processing in push and pull mode respectively.
28 GStreamer does not pose any restrictions on how the element chooses to
29 be scheduled. See the Plugin Writer Guide for more details.
31 What will happen in any case is that some elements will start a thread
32 for their data processing, called the “streaming threads”. The streaming
33 threads, or `GstTask` objects, are created from a `GstTaskPool` when the
34 element needs to make a streaming thread. In the next section we see how
35 we can receive notifications of the tasks and pools.
37 ## Configuring Threads in GStreamer
39 A STREAM\_STATUS message is posted on the bus to inform you about the
40 status of the streaming threads. You will get the following information
43 - When a new thread is about to be created, you will be notified of
44 this with a GST\_STREAM\_STATUS\_TYPE\_CREATE type. It is then
45 possible to configure a `GstTaskPool` in the `GstTask`. The custom
46 taskpool will provide custom threads for the task to implement the
49 This message needs to be handled synchronously if you want to
50 configure a custom taskpool. If you don't configure the taskpool on
51 the task when this message returns, the task will use its default
54 - When a thread is entered or left. This is the moment where you could
55 configure thread priorities. You also get a notification when a
58 - You get messages when the thread starts, pauses and stops. This
59 could be used to visualize the status of streaming threads in a gui
62 We will now look at some examples in the next sections.
64 ### Boost priority of a thread
67 .----------. .----------.
68 | faksesrc | | fakesink |
70 '----------' '----------'
74 Let's look at the simple pipeline above. We would like to boost the
75 priority of the streaming thread. It will be the fakesrc element that
76 starts the streaming thread for generating the fake data pushing them to
77 the peer fakesink. The flow for changing the priority would go like
80 - When going from READY to PAUSED state, fakesrc will require a
81 streaming thread for pushing data into the fakesink. It will post a
82 STREAM\_STATUS message indicating its requirement for a streaming
85 - The application will react to the STREAM\_STATUS messages with a
86 sync bus handler. It will then configure a custom `GstTaskPool` on
87 the `GstTask` inside the message. The custom taskpool is responsible
88 for creating the threads. In this example we will make a thread with
91 - Alternatively, since the sync message is called in the thread
92 context, you can use thread ENTER/LEAVE notifications to change the
93 priority or scheduling pollicy of the current thread.
95 In a first step we need to implement a custom `GstTaskPool` that we can
96 configure on the task. Below is the implementation of a `GstTaskPool`
97 subclass that uses pthreads to create a SCHED\_RR real-time thread. Note
98 that creating real-time threads might require extra priveleges.
108 G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
111 default_prepare (GstTaskPool * pool, GError ** error)
113 /* we don't do anything here. We could construct a pool of threads here that
114 * we could reuse later but we don't */
118 default_cleanup (GstTaskPool * pool)
123 default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
129 struct sched_param param;
131 tid = g_slice_new0 (TestRTId);
133 pthread_attr_init (&attr);
134 if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
135 g_warning ("setschedpolicy: failure: %p", g_strerror (res));
137 param.sched_priority = 50;
138 if ((res = pthread_attr_setschedparam (&attr, ¶m)) != 0)
139 g_warning ("setschedparam: failure: %p", g_strerror (res));
141 if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
142 g_warning ("setinheritsched: failure: %p", g_strerror (res));
144 res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
147 g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
148 "Error creating thread: %s", g_strerror (res));
149 g_slice_free (TestRTId, tid);
157 default_join (GstTaskPool * pool, gpointer id)
159 TestRTId *tid = (TestRTId *) id;
161 pthread_join (tid->thread, NULL);
163 g_slice_free (TestRTId, tid);
167 test_rt_pool_class_init (TestRTPoolClass * klass)
169 GstTaskPoolClass *gsttaskpool_class;
171 gsttaskpool_class = (GstTaskPoolClass *) klass;
173 gsttaskpool_class->prepare = default_prepare;
174 gsttaskpool_class->cleanup = default_cleanup;
175 gsttaskpool_class->push = default_push;
176 gsttaskpool_class->join = default_join;
180 test_rt_pool_init (TestRTPool * pool)
185 test_rt_pool_new (void)
189 pool = g_object_new (TEST_TYPE_RT_POOL, NULL);
198 The important function to implement when writing an taskpool is the
199 “push” function. The implementation should start a thread that calls
200 the given function. More involved implementations might want to keep
201 some threads around in a pool because creating and destroying threads is
202 not always the fastest operation.
204 In a next step we need to actually configure the custom taskpool when
205 the fakesrc needs it. For this we intercept the STREAM\_STATUS messages
211 static GMainLoop* loop;
214 on_stream_status (GstBus *bus,
218 GstStreamStatusType type;
221 GstTask *task = NULL;
223 gst_message_parse_stream_status (message, &type, &owner);
225 val = gst_message_get_stream_status_object (message);
227 /* see if we know how to deal with this object */
228 if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
229 task = g_value_get_object (val);
233 case GST_STREAM_STATUS_TYPE_CREATE:
237 pool = test_rt_pool_new();
239 gst_task_set_pool (task, pool);
248 on_error (GstBus *bus,
252 g_message ("received ERROR");
253 g_main_loop_quit (loop);
261 g_main_loop_quit (loop);
265 main (int argc, char *argv[])
267 GstElement *bin, *fakesrc, *fakesink;
269 GstStateChangeReturn ret;
271 gst_init (&argc, &argv);
273 /* create a new bin to hold the elements */
274 bin = gst_pipeline_new ("pipeline");
277 /* create a source */
278 fakesrc = gst_element_factory_make ("fakesrc", "fakesrc");
280 g_object_set (fakesrc, "num-buffers", 50, NULL);
283 fakesink = gst_element_factory_make ("fakesink", "fakesink");
286 /* add objects to the main pipeline */
287 gst_bin_add_many (GST_BIN (bin), fakesrc, fakesink, NULL);
289 /* link the elements */
290 gst_element_link (fakesrc, fakesink);
292 loop = g_main_loop_new (NULL, FALSE);
294 /* get the bus, we need to install a sync handler */
295 bus = gst_pipeline_get_bus (GST_PIPELINE (bin));
296 gst_bus_enable_sync_message_emission (bus);
297 gst_bus_add_signal_watch (bus);
299 g_signal_connect (bus, "sync-message::stream-status",
300 (GCallback) on_stream_status, NULL);
301 g_signal_connect (bus, "message::error",
302 (GCallback) on_error, NULL);
303 g_signal_connect (bus, "message::eos",
304 (GCallback) on_eos, NULL);
307 ret = gst_element_set_state (bin, GST_STATE_PLAYING);
308 if (ret != GST_STATE_CHANGE_SUCCESS) {
309 g_message ("failed to change state");
313 /* Run event loop listening for bus messages until EOS or ERROR */
314 g_main_loop_run (loop);
317 gst_element_set_state (bin, GST_STATE_NULL);
318 gst_object_unref (bus);
319 g_main_loop_unref (loop);
328 Note that this program likely needs root permissions in order to create
329 real-time threads. When the thread can't be created, the state change
330 function will fail, which we catch in the application above.
332 When there are multiple threads in the pipeline, you will receive
333 multiple STREAM\_STATUS messages. You should use the owner of the
334 message, which is likely the pad or the element that starts the thread,
335 to figure out what the function of this thread is in the context of the
338 ## When would you want to force a thread?
340 We have seen that threads are created by elements but it is also
341 possible to insert elements in the pipeline for the sole purpose of
342 forcing a new thread in the pipeline.
344 There are several reasons to force the use of threads. However, for
345 performance reasons, you never want to use one thread for every element
346 out there, since that will create some overhead. Let's now list some
347 situations where threads can be particularly useful:
349 - Data buffering, for example when dealing with network streams or
350 when recording data from a live stream such as a video or audio
351 card. Short hickups elsewhere in the pipeline will not cause data
352 loss. See also [Stream
353 buffering](manual-buffering.md#stream-buffering) about network
354 buffering with queue2.
356 ![Data buffering, from a networked
357 source](images/thread-buffering.png "fig:")
359 - Synchronizing output devices, e.g. when playing a stream containing
360 both video and audio data. By using threads for both outputs, they
361 will run independently and their synchronization will be better.
363 ![Synchronizing audio and video
364 sinks](images/thread-synchronizing.png "fig:")
366 Above, we've mentioned the “queue” element several times now. A queue is
367 the thread boundary element through which you can force the use of
368 threads. It does so by using a classic provider/consumer model as
369 learned in threading classes at universities all around the world. By
370 doing this, it acts both as a means to make data throughput between
371 threads threadsafe, and it can also act as a buffer. Queues have several
372 `GObject` properties to be configured for specific uses. For example,
373 you can set lower and upper thresholds for the element. If there's less
374 data than the lower threshold (default: disabled), it will block output.
375 If there's more data than the upper threshold, it will block input or
376 (if configured to do so) drop data.
378 To use a queue (and therefore force the use of two distinct threads in
379 the pipeline), one can simply create a “queue” element and put this in
380 as part of the pipeline. GStreamer will take care of all threading