Bumps documentation to 93% symbol coverage, touching most
[platform/upstream/glib.git] / gio / gioscheduler.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24
25 #include "gioscheduler.h"
26
27 /**
28  * SECTION:gioscheduler
29  * @short_description: I/O Scheduler
30  * 
31  * Schedules asynchronous I/O operations for integration into the main 
32  * event loop (#GMainLoop).
33  * 
34  **/
35
36 struct _GIOJob {
37   GSList *active_link;
38   GIOJobFunc job_func;
39   GIODataFunc cancel_func; /* Runs under job map lock */
40   gpointer data;
41   GDestroyNotify destroy_notify;
42
43   gint io_priority;
44   GCancellable *cancellable;
45
46   guint idle_tag;
47 };
48
49 G_LOCK_DEFINE_STATIC(active_jobs);
50 static GSList *active_jobs = NULL;
51
52 static GThreadPool *job_thread_pool = NULL;
53
54 static void io_job_thread (gpointer       data,
55                            gpointer       user_data);
56
57 static void
58 g_io_job_free (GIOJob *job)
59 {
60   if (job->cancellable)
61     g_object_unref (job->cancellable);
62   g_free (job);
63 }
64
65 static gint
66 g_io_job_compare (gconstpointer  a,
67                   gconstpointer  b,
68                   gpointer       user_data)
69 {
70   const GIOJob *aa = a;
71   const GIOJob *bb = b;
72
73   /* Cancelled jobs are set prio == -1, so that
74      they are executed as quickly as possible */
75   
76   /* Lower value => higher priority */
77   if (aa->io_priority < bb->io_priority)
78     return -1;
79   if (aa->io_priority == bb->io_priority)
80     return 0;
81   return 1;
82 }
83
84 static gpointer
85 init_scheduler (gpointer arg)
86 {
87   if (job_thread_pool == NULL)
88     {
89       /* TODO: thread_pool_new can fail */
90       job_thread_pool = g_thread_pool_new (io_job_thread,
91                                            NULL,
92                                            10,
93                                            FALSE,
94                                            NULL);
95       if (job_thread_pool != NULL)
96         {
97           g_thread_pool_set_sort_function (job_thread_pool,
98                                            g_io_job_compare,
99                                            NULL);
100           /* Its kinda weird that this is a global setting
101            * instead of per threadpool. However, we really
102            * want to cache some threads, but not keep around
103            * those threads forever. */
104           g_thread_pool_set_max_idle_time (15 * 1000);
105           g_thread_pool_set_max_unused_threads (2);
106         }
107     }
108   return NULL;
109 }
110
111 static void
112 remove_active_job (GIOJob *job)
113 {
114   GIOJob *other_job;
115   GSList *l;
116   gboolean resort_jobs;
117   
118   G_LOCK (active_jobs);
119   active_jobs = g_slist_delete_link (active_jobs, job->active_link);
120   
121   resort_jobs = FALSE;
122   for (l = active_jobs; l != NULL; l = l->next)
123     {
124       other_job = l->data;
125       if (other_job->io_priority >= 0 &&
126           g_cancellable_is_cancelled (other_job->cancellable))
127         {
128           other_job->io_priority = -1;
129           resort_jobs = TRUE;
130         }
131     }
132   G_UNLOCK (active_jobs);
133   
134   if (resort_jobs &&
135       job_thread_pool != NULL)
136     g_thread_pool_set_sort_function (job_thread_pool,
137                                      g_io_job_compare,
138                                      NULL);
139
140 }
141
142 static void
143 io_job_thread (gpointer       data,
144                gpointer       user_data)
145 {
146   GIOJob *job = data;
147
148   if (job->cancellable)
149     g_push_current_cancellable (job->cancellable);
150   job->job_func (job, job->cancellable, job->data);
151   if (job->cancellable)
152     g_pop_current_cancellable (job->cancellable);
153
154   if (job->destroy_notify)
155     job->destroy_notify (job->data);
156
157   remove_active_job (job);
158   g_io_job_free (job);
159
160 }
161
162 static gboolean
163 run_job_at_idle (gpointer data)
164 {
165   GIOJob *job = data;
166
167   if (job->cancellable)
168     g_push_current_cancellable (job->cancellable);
169   
170   job->job_func (job, job->cancellable, job->data);
171   
172   if (job->cancellable)
173     g_pop_current_cancellable (job->cancellable);
174
175   if (job->destroy_notify)
176     job->destroy_notify (job->data);
177
178   remove_active_job (job);
179   g_io_job_free (job);
180
181   return FALSE;
182 }
183
184 /**
185  * g_schedule_io_job:
186  * @job_func: a #GIOJobFunc.
187  * @user_data: a #gpointer.
188  * @notify: a #GDestroyNotify.
189  * @io_priority: the i/o priority of the request. a #gint.
190  * @cancellable: optional #GCancellable object, %NULL to ignore.
191  *
192  * Schedules the I/O Job.
193  * 
194  **/
195 void
196 g_schedule_io_job (GIOJobFunc     job_func,
197                    gpointer       user_data,
198                    GDestroyNotify notify,
199                    gint           io_priority,
200                    GCancellable  *cancellable)
201 {
202   static GOnce once_init = G_ONCE_INIT;
203   GIOJob *job;
204
205   g_return_if_fail (job_func != NULL);
206
207   job = g_new0 (GIOJob, 1);
208   job->job_func = job_func;
209   job->data = user_data;
210   job->destroy_notify = notify;
211   job->io_priority = io_priority;
212     
213   if (cancellable)
214     job->cancellable = g_object_ref (cancellable);
215
216   G_LOCK (active_jobs);
217   active_jobs = g_slist_prepend (active_jobs, job);
218   job->active_link = active_jobs;
219   G_UNLOCK (active_jobs);
220
221   if (g_thread_supported())
222     {
223       g_once (&once_init, init_scheduler, NULL);
224       g_thread_pool_push (job_thread_pool, job, NULL);
225     }
226   else
227     {
228       /* Threads not available, instead do the i/o sync inside a
229        * low prio idle handler
230        */
231       job->idle_tag = g_idle_add_full (G_PRIORITY_DEFAULT_IDLE + 1 + io_priority / 10,
232                                        run_job_at_idle,
233                                        job, NULL);
234     }
235 }
236
237 /**
238  * g_cancel_all_io_jobs:
239  * 
240  * Cancels all cancellable I/O Jobs. 
241  **/
242 void
243 g_cancel_all_io_jobs (void)
244 {
245   GSList *cancellable_list, *l;
246   
247   G_LOCK (active_jobs);
248   cancellable_list = NULL;
249   for (l = active_jobs; l != NULL; l = l->next)
250     {
251       GIOJob *job = l->data;
252       if (job->cancellable)
253         cancellable_list = g_slist_prepend (cancellable_list,
254                                             g_object_ref (job->cancellable));
255     }
256   G_UNLOCK (active_jobs);
257
258   for (l = cancellable_list; l != NULL; l = l->next)
259     {
260       GCancellable *c = l->data;
261       g_cancellable_cancel (c);
262       g_object_unref (c);
263     }
264   g_slist_free (cancellable_list);
265 }
266
267 typedef struct {
268   GIODataFunc func;
269   gpointer    data;
270   GDestroyNotify notify;
271
272   GMutex *ack_lock;
273   GCond *ack_condition;
274 } MainLoopProxy;
275
276 static gboolean
277 mainloop_proxy_func (gpointer data)
278 {
279   MainLoopProxy *proxy = data;
280
281   proxy->func (proxy->data);
282
283   if (proxy->ack_lock)
284     {
285       g_mutex_lock (proxy->ack_lock);
286       g_cond_signal (proxy->ack_condition);
287       g_mutex_unlock (proxy->ack_lock);
288     }
289   
290   return FALSE;
291 }
292
293 static void
294 mainloop_proxy_free (MainLoopProxy *proxy)
295 {
296   if (proxy->ack_lock)
297     {
298       g_mutex_free (proxy->ack_lock);
299       g_cond_free (proxy->ack_condition);
300     }
301   
302   g_free (proxy);
303 }
304
305 static void
306 mainloop_proxy_notify (gpointer data)
307 {
308   MainLoopProxy *proxy = data;
309
310   if (proxy->notify)
311     proxy->notify (proxy->data);
312
313   /* If nonblocking we free here, otherwise we free in io thread */
314   if (proxy->ack_lock == NULL)
315     mainloop_proxy_free (proxy);
316 }
317
318 /**
319  * g_io_job_send_to_mainloop:
320  * @job: a #GIOJob.
321  * @func: a #GIODataFunc.
322  * @user_data: a #gpointer.
323  * @notify: a #GDestroyNotify.
324  * @block: boolean flag indicating whether or not this job should block.
325  * 
326  * Sends an I/O job to the application's main loop for processing.
327  * 
328  **/
329 void
330 g_io_job_send_to_mainloop (GIOJob        *job,
331                            GIODataFunc    func,
332                            gpointer       user_data,
333                            GDestroyNotify notify,
334                            gboolean       block)
335 {
336   GSource *source;
337   MainLoopProxy *proxy;
338   guint id;
339
340   g_return_if_fail (job != NULL);
341   g_return_if_fail (func != NULL);
342
343   if (job->idle_tag)
344     {
345       /* We just immediately re-enter in the case of idles (non-threads)
346        * Anything else would just deadlock. If you can't handle this, enable threads.
347        */
348       func (user_data); 
349       return;
350     }
351   
352   proxy = g_new0 (MainLoopProxy, 1);
353   proxy->func = func;
354   proxy->data = user_data;
355   proxy->notify = notify;
356   
357   if (block)
358     {
359       proxy->ack_lock = g_mutex_new ();
360       proxy->ack_condition = g_cond_new ();
361     }
362   
363   source = g_idle_source_new ();
364   g_source_set_priority (source, G_PRIORITY_DEFAULT);
365
366   g_source_set_callback (source, mainloop_proxy_func, proxy, mainloop_proxy_notify);
367
368   if (block)
369     g_mutex_lock (proxy->ack_lock);
370                   
371   id = g_source_attach (source, NULL);
372   g_source_unref (source);
373
374   if (block)
375     {
376       g_cond_wait (proxy->ack_condition, proxy->ack_lock);
377       g_mutex_unlock (proxy->ack_lock);
378       
379       /* destroy notify didn't free proxy */
380       mainloop_proxy_free (proxy);
381     }
382 }