52e2ca626377b8e048f5cd18e964cea5676dc000
[platform/upstream/glib.git] / gio / gioscheduler.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24
25 #include "gioscheduler.h"
26
27 #include "gioalias.h"
28
29 /**
30  * SECTION:gioscheduler
31  * @short_description: I/O Scheduler
32  * 
33  * Schedules asynchronous I/O operations for integration into the main 
34  * event loop (#GMainLoop).
35  * 
36  * Each I/O operation has a priority, and the scheduler uses the priorities
37  * to determine the order in which operations are executed. They are 
38  * <emphasis>not</emphasis> used to determine system-wide I/O scheduling.
39  * Priorities are integers, with lower numbers indicating higher priority. 
40  * It is recommended to choose priorities between %G_PRIORITY_LOW and 
41  * %G_PRIORITY_HIGH, with %G_PRIORITY_DEFAULT as a default.
42  **/
43
44 struct _GIOJob {
45   GSList *active_link;
46   GIOJobFunc job_func;
47   GIODataFunc cancel_func; /* Runs under job map lock */
48   gpointer data;
49   GDestroyNotify destroy_notify;
50
51   gint io_priority;
52   GCancellable *cancellable;
53
54   guint idle_tag;
55 };
56
57 G_LOCK_DEFINE_STATIC(active_jobs);
58 static GSList *active_jobs = NULL;
59
60 static GThreadPool *job_thread_pool = NULL;
61
62 static void io_job_thread (gpointer data,
63                            gpointer user_data);
64
65 static void
66 g_io_job_free (GIOJob *job)
67 {
68   if (job->cancellable)
69     g_object_unref (job->cancellable);
70   g_free (job);
71 }
72
73 static gint
74 g_io_job_compare (gconstpointer a,
75                   gconstpointer b,
76                   gpointer      user_data)
77 {
78   const GIOJob *aa = a;
79   const GIOJob *bb = b;
80
81   /* Cancelled jobs are set prio == -1, so that
82      they are executed as quickly as possible */
83   
84   /* Lower value => higher priority */
85   if (aa->io_priority < bb->io_priority)
86     return -1;
87   if (aa->io_priority == bb->io_priority)
88     return 0;
89   return 1;
90 }
91
92 static gpointer
93 init_scheduler (gpointer arg)
94 {
95   if (job_thread_pool == NULL)
96     {
97       /* TODO: thread_pool_new can fail */
98       job_thread_pool = g_thread_pool_new (io_job_thread,
99                                            NULL,
100                                            10,
101                                            FALSE,
102                                            NULL);
103       if (job_thread_pool != NULL)
104         {
105           g_thread_pool_set_sort_function (job_thread_pool,
106                                            g_io_job_compare,
107                                            NULL);
108           /* Its kinda weird that this is a global setting
109            * instead of per threadpool. However, we really
110            * want to cache some threads, but not keep around
111            * those threads forever. */
112           g_thread_pool_set_max_idle_time (15 * 1000);
113           g_thread_pool_set_max_unused_threads (2);
114         }
115     }
116   return NULL;
117 }
118
119 static void
120 remove_active_job (GIOJob *job)
121 {
122   GIOJob *other_job;
123   GSList *l;
124   gboolean resort_jobs;
125   
126   G_LOCK (active_jobs);
127   active_jobs = g_slist_delete_link (active_jobs, job->active_link);
128   
129   resort_jobs = FALSE;
130   for (l = active_jobs; l != NULL; l = l->next)
131     {
132       other_job = l->data;
133       if (other_job->io_priority >= 0 &&
134           g_cancellable_is_cancelled (other_job->cancellable))
135         {
136           other_job->io_priority = -1;
137           resort_jobs = TRUE;
138         }
139     }
140   G_UNLOCK (active_jobs);
141   
142   if (resort_jobs &&
143       job_thread_pool != NULL)
144     g_thread_pool_set_sort_function (job_thread_pool,
145                                      g_io_job_compare,
146                                      NULL);
147
148 }
149
150 static void
151 io_job_thread (gpointer data,
152                gpointer user_data)
153 {
154   GIOJob *job = data;
155
156   if (job->cancellable)
157     g_push_current_cancellable (job->cancellable);
158   job->job_func (job, job->cancellable, job->data);
159   if (job->cancellable)
160     g_pop_current_cancellable (job->cancellable);
161
162   if (job->destroy_notify)
163     job->destroy_notify (job->data);
164
165   remove_active_job (job);
166   g_io_job_free (job);
167
168 }
169
170 static gboolean
171 run_job_at_idle (gpointer data)
172 {
173   GIOJob *job = data;
174
175   if (job->cancellable)
176     g_push_current_cancellable (job->cancellable);
177   
178   job->job_func (job, job->cancellable, job->data);
179   
180   if (job->cancellable)
181     g_pop_current_cancellable (job->cancellable);
182
183   if (job->destroy_notify)
184     job->destroy_notify (job->data);
185
186   remove_active_job (job);
187   g_io_job_free (job);
188
189   return FALSE;
190 }
191
192 /**
193  * g_schedule_io_job:
194  * @job_func: a #GIOJobFunc.
195  * @user_data: a #gpointer.
196  * @notify: a #GDestroyNotify.
197  * @io_priority: the <link linkend="gioscheduler">I/O priority</link> 
198  * of the request.
199  * @cancellable: optional #GCancellable object, %NULL to ignore.
200  *
201  * Schedules the I/O Job.
202  * 
203  **/
204 void
205 g_schedule_io_job (GIOJobFunc      job_func,
206                    gpointer        user_data,
207                    GDestroyNotify  notify,
208                    gint            io_priority,
209                    GCancellable   *cancellable)
210 {
211   static GOnce once_init = G_ONCE_INIT;
212   GIOJob *job;
213
214   g_return_if_fail (job_func != NULL);
215
216   job = g_new0 (GIOJob, 1);
217   job->job_func = job_func;
218   job->data = user_data;
219   job->destroy_notify = notify;
220   job->io_priority = io_priority;
221     
222   if (cancellable)
223     job->cancellable = g_object_ref (cancellable);
224
225   G_LOCK (active_jobs);
226   active_jobs = g_slist_prepend (active_jobs, job);
227   job->active_link = active_jobs;
228   G_UNLOCK (active_jobs);
229
230   if (g_thread_supported())
231     {
232       g_once (&once_init, init_scheduler, NULL);
233       g_thread_pool_push (job_thread_pool, job, NULL);
234     }
235   else
236     {
237       /* Threads not available, instead do the i/o sync inside a
238        * low prio idle handler
239        */
240       job->idle_tag = g_idle_add_full (G_PRIORITY_DEFAULT_IDLE + 1 + io_priority / 10,
241                                        run_job_at_idle,
242                                        job, NULL);
243     }
244 }
245
246 /**
247  * g_cancel_all_io_jobs:
248  * 
249  * Cancels all cancellable I/O Jobs. 
250  **/
251 void
252 g_cancel_all_io_jobs (void)
253 {
254   GSList *cancellable_list, *l;
255   
256   G_LOCK (active_jobs);
257   cancellable_list = NULL;
258   for (l = active_jobs; l != NULL; l = l->next)
259     {
260       GIOJob *job = l->data;
261       if (job->cancellable)
262         cancellable_list = g_slist_prepend (cancellable_list,
263                                             g_object_ref (job->cancellable));
264     }
265   G_UNLOCK (active_jobs);
266
267   for (l = cancellable_list; l != NULL; l = l->next)
268     {
269       GCancellable *c = l->data;
270       g_cancellable_cancel (c);
271       g_object_unref (c);
272     }
273   g_slist_free (cancellable_list);
274 }
275
276 typedef struct {
277   GIODataFunc func;
278   gpointer    data;
279   GDestroyNotify notify;
280
281   GMutex *ack_lock;
282   GCond *ack_condition;
283 } MainLoopProxy;
284
285 static gboolean
286 mainloop_proxy_func (gpointer data)
287 {
288   MainLoopProxy *proxy = data;
289
290   proxy->func (proxy->data);
291
292   if (proxy->ack_lock)
293     {
294       g_mutex_lock (proxy->ack_lock);
295       g_cond_signal (proxy->ack_condition);
296       g_mutex_unlock (proxy->ack_lock);
297     }
298   
299   return FALSE;
300 }
301
302 static void
303 mainloop_proxy_free (MainLoopProxy *proxy)
304 {
305   if (proxy->ack_lock)
306     {
307       g_mutex_free (proxy->ack_lock);
308       g_cond_free (proxy->ack_condition);
309     }
310   
311   g_free (proxy);
312 }
313
314 static void
315 mainloop_proxy_notify (gpointer data)
316 {
317   MainLoopProxy *proxy = data;
318
319   if (proxy->notify)
320     proxy->notify (proxy->data);
321
322   /* If nonblocking we free here, otherwise we free in io thread */
323   if (proxy->ack_lock == NULL)
324     mainloop_proxy_free (proxy);
325 }
326
327 /**
328  * g_io_job_send_to_mainloop:
329  * @job: a #GIOJob.
330  * @func: a #GIODataFunc.
331  * @user_data: a #gpointer.
332  * @notify: a #GDestroyNotify.
333  * @block: boolean flag indicating whether or not this job should block.
334  * 
335  * Sends an I/O job to the application's main loop for processing.
336  **/
337 void
338 g_io_job_send_to_mainloop (GIOJob         *job,
339                            GIODataFunc     func,
340                            gpointer        user_data,
341                            GDestroyNotify  notify,
342                            gboolean        block)
343 {
344   GSource *source;
345   MainLoopProxy *proxy;
346   guint id;
347
348   g_return_if_fail (job != NULL);
349   g_return_if_fail (func != NULL);
350
351   if (job->idle_tag)
352     {
353       /* We just immediately re-enter in the case of idles (non-threads)
354        * Anything else would just deadlock. If you can't handle this, enable threads.
355        */
356       func (user_data); 
357       return;
358     }
359   
360   proxy = g_new0 (MainLoopProxy, 1);
361   proxy->func = func;
362   proxy->data = user_data;
363   proxy->notify = notify;
364   
365   if (block)
366     {
367       proxy->ack_lock = g_mutex_new ();
368       proxy->ack_condition = g_cond_new ();
369     }
370   
371   source = g_idle_source_new ();
372   g_source_set_priority (source, G_PRIORITY_DEFAULT);
373
374   g_source_set_callback (source, mainloop_proxy_func, proxy, mainloop_proxy_notify);
375
376   if (block)
377     g_mutex_lock (proxy->ack_lock);
378                   
379   id = g_source_attach (source, NULL);
380   g_source_unref (source);
381
382   if (block)
383     {
384       g_cond_wait (proxy->ack_condition, proxy->ack_lock);
385       g_mutex_unlock (proxy->ack_lock);
386       
387       /* destroy notify didn't free proxy */
388       mainloop_proxy_free (proxy);
389     }
390 }
391
392 #define __G_IO_SCHEDULER_C__
393 #include "gioaliasdef.c"