Merge branch 'master' into gdbus-codegen
[platform/upstream/glib.git] / gio / gioscheduler.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include "gioscheduler.h"
26 #include "gcancellable.h"
27
28
29 /**
30  * SECTION:gioscheduler
31  * @short_description: I/O Scheduler
32  * @include: gio/gio.h
33  * 
34  * Schedules asynchronous I/O operations. #GIOScheduler integrates 
35  * into the main event loop (#GMainLoop) and may use threads if they 
36  * are available.
37  * 
38  * <para id="io-priority"><indexterm><primary>I/O priority</primary></indexterm>
39  * Each I/O operation has a priority, and the scheduler uses the priorities
40  * to determine the order in which operations are executed. They are 
41  * <emphasis>not</emphasis> used to determine system-wide I/O scheduling.
42  * Priorities are integers, with lower numbers indicating higher priority. 
43  * It is recommended to choose priorities between %G_PRIORITY_LOW and 
44  * %G_PRIORITY_HIGH, with %G_PRIORITY_DEFAULT as a default.
45  * </para>
46  **/
47
48 struct _GIOSchedulerJob {
49   GSList *active_link;
50   GIOSchedulerJobFunc job_func;
51   GSourceFunc cancel_func; /* Runs under job map lock */
52   gpointer data;
53   GDestroyNotify destroy_notify;
54   GCancellable *cancellable;
55   GMainContext *context;
56
57   gint io_priority;
58   guint idle_tag;
59 };
60
61 G_LOCK_DEFINE_STATIC(active_jobs);
62 static GSList *active_jobs = NULL;
63
64 static GThreadPool *job_thread_pool = NULL;
65
66 static void io_job_thread (gpointer data,
67                            gpointer user_data);
68
69 static void
70 g_io_job_free (GIOSchedulerJob *job)
71 {
72   if (job->cancellable)
73     g_object_unref (job->cancellable);
74   if (job->context)
75     g_main_context_unref (job->context);
76   g_free (job);
77 }
78
79 static gint
80 g_io_job_compare (gconstpointer a,
81                   gconstpointer b,
82                   gpointer      user_data)
83 {
84   const GIOSchedulerJob *aa = a;
85   const GIOSchedulerJob *bb = b;
86
87   /* Cancelled jobs are set prio == -1, so that
88      they are executed as quickly as possible */
89   
90   /* Lower value => higher priority */
91   if (aa->io_priority < bb->io_priority)
92     return -1;
93   if (aa->io_priority == bb->io_priority)
94     return 0;
95   return 1;
96 }
97
98 static gpointer
99 init_scheduler (gpointer arg)
100 {
101   if (job_thread_pool == NULL)
102     {
103       /* TODO: thread_pool_new can fail */
104       job_thread_pool = g_thread_pool_new (io_job_thread,
105                                            NULL,
106                                            10,
107                                            FALSE,
108                                            NULL);
109       if (job_thread_pool != NULL)
110         {
111           g_thread_pool_set_sort_function (job_thread_pool,
112                                            g_io_job_compare,
113                                            NULL);
114           /* It's kinda weird that this is a global setting
115            * instead of per threadpool. However, we really
116            * want to cache some threads, but not keep around
117            * those threads forever. */
118           g_thread_pool_set_max_idle_time (15 * 1000);
119           g_thread_pool_set_max_unused_threads (2);
120         }
121     }
122   return NULL;
123 }
124
125 static void
126 remove_active_job (GIOSchedulerJob *job)
127 {
128   GIOSchedulerJob *other_job;
129   GSList *l;
130   gboolean resort_jobs;
131   
132   G_LOCK (active_jobs);
133   active_jobs = g_slist_delete_link (active_jobs, job->active_link);
134   
135   resort_jobs = FALSE;
136   for (l = active_jobs; l != NULL; l = l->next)
137     {
138       other_job = l->data;
139       if (other_job->io_priority >= 0 &&
140           g_cancellable_is_cancelled (other_job->cancellable))
141         {
142           other_job->io_priority = -1;
143           resort_jobs = TRUE;
144         }
145     }
146   G_UNLOCK (active_jobs);
147   
148   if (resort_jobs &&
149       job_thread_pool != NULL)
150     g_thread_pool_set_sort_function (job_thread_pool,
151                                      g_io_job_compare,
152                                      NULL);
153
154 }
155
156 static void
157 job_destroy (gpointer data)
158 {
159   GIOSchedulerJob *job = data;
160
161   if (job->destroy_notify)
162     job->destroy_notify (job->data);
163
164   remove_active_job (job);
165   g_io_job_free (job);
166 }
167
168 static void
169 io_job_thread (gpointer data,
170                gpointer user_data)
171 {
172   GIOSchedulerJob *job = data;
173   gboolean result;
174
175   if (job->cancellable)
176     g_cancellable_push_current (job->cancellable);
177
178   do 
179     {
180       result = job->job_func (job, job->cancellable, job->data);
181     }
182   while (result);
183
184   if (job->cancellable)
185     g_cancellable_pop_current (job->cancellable);
186
187   job_destroy (job);
188 }
189
190 static gboolean
191 run_job_at_idle (gpointer data)
192 {
193   GIOSchedulerJob *job = data;
194   gboolean result;
195
196   if (job->cancellable)
197     g_cancellable_push_current (job->cancellable);
198   
199   result = job->job_func (job, job->cancellable, job->data);
200   
201   if (job->cancellable)
202     g_cancellable_pop_current (job->cancellable);
203
204   return result;
205 }
206
207 /**
208  * g_io_scheduler_push_job:
209  * @job_func: a #GIOSchedulerJobFunc.
210  * @user_data: data to pass to @job_func
211  * @notify: a #GDestroyNotify for @user_data, or %NULL
212  * @io_priority: the <link linkend="gioscheduler">I/O priority</link> 
213  * of the request.
214  * @cancellable: optional #GCancellable object, %NULL to ignore.
215  *
216  * Schedules the I/O job to run. 
217  *
218  * @notify will be called on @user_data after @job_func has returned,
219  * regardless whether the job was cancelled or has run to completion.
220  * 
221  * If @cancellable is not %NULL, it can be used to cancel the I/O job
222  * by calling g_cancellable_cancel() or by calling 
223  * g_io_scheduler_cancel_all_jobs().
224  **/
225 void
226 g_io_scheduler_push_job (GIOSchedulerJobFunc  job_func,
227                          gpointer             user_data,
228                          GDestroyNotify       notify,
229                          gint                 io_priority,
230                          GCancellable        *cancellable)
231 {
232   static GOnce once_init = G_ONCE_INIT;
233   GIOSchedulerJob *job;
234
235   g_return_if_fail (job_func != NULL);
236
237   job = g_new0 (GIOSchedulerJob, 1);
238   job->job_func = job_func;
239   job->data = user_data;
240   job->destroy_notify = notify;
241   job->io_priority = io_priority;
242     
243   if (cancellable)
244     job->cancellable = g_object_ref (cancellable);
245
246   job->context = g_main_context_get_thread_default ();
247   if (job->context)
248     g_main_context_ref (job->context);
249
250   G_LOCK (active_jobs);
251   active_jobs = g_slist_prepend (active_jobs, job);
252   job->active_link = active_jobs;
253   G_UNLOCK (active_jobs);
254
255   if (g_thread_supported())
256     {
257       g_once (&once_init, init_scheduler, NULL);
258       g_thread_pool_push (job_thread_pool, job, NULL);
259     }
260   else
261     {
262       /* Threads not available, instead do the i/o sync inside a
263        * low prio idle handler
264        */
265       job->idle_tag = g_idle_add_full (io_priority,
266                                        run_job_at_idle,
267                                        job, job_destroy);
268     }
269 }
270
271 /**
272  * g_io_scheduler_cancel_all_jobs:
273  * 
274  * Cancels all cancellable I/O jobs. 
275  *
276  * A job is cancellable if a #GCancellable was passed into
277  * g_io_scheduler_push_job().
278  **/
279 void
280 g_io_scheduler_cancel_all_jobs (void)
281 {
282   GSList *cancellable_list, *l;
283   
284   G_LOCK (active_jobs);
285   cancellable_list = NULL;
286   for (l = active_jobs; l != NULL; l = l->next)
287     {
288       GIOSchedulerJob *job = l->data;
289       if (job->cancellable)
290         cancellable_list = g_slist_prepend (cancellable_list,
291                                             g_object_ref (job->cancellable));
292     }
293   G_UNLOCK (active_jobs);
294
295   for (l = cancellable_list; l != NULL; l = l->next)
296     {
297       GCancellable *c = l->data;
298       g_cancellable_cancel (c);
299       g_object_unref (c);
300     }
301   g_slist_free (cancellable_list);
302 }
303
304 typedef struct {
305   GSourceFunc func;
306   gboolean ret_val;
307   gpointer data;
308   GDestroyNotify notify;
309
310   GMutex *ack_lock;
311   GCond *ack_condition;
312 } MainLoopProxy;
313
314 static gboolean
315 mainloop_proxy_func (gpointer data)
316 {
317   MainLoopProxy *proxy = data;
318
319   proxy->ret_val = proxy->func (proxy->data);
320
321   if (proxy->notify)
322     proxy->notify (proxy->data);
323   
324   if (proxy->ack_lock)
325     {
326       g_mutex_lock (proxy->ack_lock);
327       g_cond_signal (proxy->ack_condition);
328       g_mutex_unlock (proxy->ack_lock);
329     }
330   
331   return FALSE;
332 }
333
334 static void
335 mainloop_proxy_free (MainLoopProxy *proxy)
336 {
337   if (proxy->ack_lock)
338     {
339       g_mutex_free (proxy->ack_lock);
340       g_cond_free (proxy->ack_condition);
341     }
342   
343   g_free (proxy);
344 }
345
346 /**
347  * g_io_scheduler_job_send_to_mainloop:
348  * @job: a #GIOSchedulerJob
349  * @func: a #GSourceFunc callback that will be called in the original thread
350  * @user_data: data to pass to @func
351  * @notify: a #GDestroyNotify for @user_data, or %NULL
352  * 
353  * Used from an I/O job to send a callback to be run in the thread
354  * that the job was started from, waiting for the result (and thus
355  * blocking the I/O job).
356  *
357  * Returns: The return value of @func
358  **/
359 gboolean
360 g_io_scheduler_job_send_to_mainloop (GIOSchedulerJob *job,
361                                      GSourceFunc      func,
362                                      gpointer         user_data,
363                                      GDestroyNotify   notify)
364 {
365   GSource *source;
366   MainLoopProxy *proxy;
367   gboolean ret_val;
368
369   g_return_val_if_fail (job != NULL, FALSE);
370   g_return_val_if_fail (func != NULL, FALSE);
371
372   if (job->idle_tag)
373     {
374       /* We just immediately re-enter in the case of idles (non-threads)
375        * Anything else would just deadlock. If you can't handle this, enable threads.
376        */
377       ret_val = func (user_data);
378       if (notify)
379         notify (user_data);
380       return ret_val;
381     }
382   
383   proxy = g_new0 (MainLoopProxy, 1);
384   proxy->func = func;
385   proxy->data = user_data;
386   proxy->notify = notify;
387   proxy->ack_lock = g_mutex_new ();
388   proxy->ack_condition = g_cond_new ();
389   g_mutex_lock (proxy->ack_lock);
390   
391   source = g_idle_source_new ();
392   g_source_set_priority (source, G_PRIORITY_DEFAULT);
393   g_source_set_callback (source, mainloop_proxy_func, proxy,
394                          NULL);
395
396   g_source_attach (source, job->context);
397   g_source_unref (source);
398
399   g_cond_wait (proxy->ack_condition, proxy->ack_lock);
400   g_mutex_unlock (proxy->ack_lock);
401
402   ret_val = proxy->ret_val;
403   mainloop_proxy_free (proxy);
404   
405   return ret_val;
406 }
407
408 /**
409  * g_io_scheduler_job_send_to_mainloop_async:
410  * @job: a #GIOSchedulerJob
411  * @func: a #GSourceFunc callback that will be called in the original thread
412  * @user_data: data to pass to @func
413  * @notify: a #GDestroyNotify for @user_data, or %NULL
414  * 
415  * Used from an I/O job to send a callback to be run asynchronously in
416  * the thread that the job was started from. The callback will be run
417  * when the main loop is available, but at that time the I/O job might
418  * have finished. The return value from the callback is ignored.
419  *
420  * Note that if you are passing the @user_data from g_io_scheduler_push_job()
421  * on to this function you have to ensure that it is not freed before
422  * @func is called, either by passing %NULL as @notify to 
423  * g_io_scheduler_push_job() or by using refcounting for @user_data.
424  **/
425 void
426 g_io_scheduler_job_send_to_mainloop_async (GIOSchedulerJob *job,
427                                            GSourceFunc      func,
428                                            gpointer         user_data,
429                                            GDestroyNotify   notify)
430 {
431   GSource *source;
432   MainLoopProxy *proxy;
433
434   g_return_if_fail (job != NULL);
435   g_return_if_fail (func != NULL);
436
437   if (job->idle_tag)
438     {
439       /* We just immediately re-enter in the case of idles (non-threads)
440        * Anything else would just deadlock. If you can't handle this, enable threads.
441        */
442       func (user_data);
443       if (notify)
444         notify (user_data);
445       return;
446     }
447   
448   proxy = g_new0 (MainLoopProxy, 1);
449   proxy->func = func;
450   proxy->data = user_data;
451   proxy->notify = notify;
452   
453   source = g_idle_source_new ();
454   g_source_set_priority (source, G_PRIORITY_DEFAULT);
455   g_source_set_callback (source, mainloop_proxy_func, proxy,
456                          (GDestroyNotify)mainloop_proxy_free);
457
458   g_source_attach (source, job->context);
459   g_source_unref (source);
460 }