f3f9f062b61b2227c7ba7aab3c0ebb8d3b6b518c
[platform/upstream/glib.git] / gio / gioscheduler.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24
25 #include "gioscheduler.h"
26
27 #include "gioalias.h"
28
29 /**
30  * SECTION:gioscheduler
31  * @short_description: I/O Scheduler
32  * 
33  * Schedules asynchronous I/O operations for integration into the main 
34  * event loop (#GMainLoop).
35  * 
36  **/
37
38 struct _GIOJob {
39   GSList *active_link;
40   GIOJobFunc job_func;
41   GIODataFunc cancel_func; /* Runs under job map lock */
42   gpointer data;
43   GDestroyNotify destroy_notify;
44
45   gint io_priority;
46   GCancellable *cancellable;
47
48   guint idle_tag;
49 };
50
51 G_LOCK_DEFINE_STATIC(active_jobs);
52 static GSList *active_jobs = NULL;
53
54 static GThreadPool *job_thread_pool = NULL;
55
56 static void io_job_thread (gpointer data,
57                            gpointer user_data);
58
59 static void
60 g_io_job_free (GIOJob *job)
61 {
62   if (job->cancellable)
63     g_object_unref (job->cancellable);
64   g_free (job);
65 }
66
67 static gint
68 g_io_job_compare (gconstpointer a,
69                   gconstpointer b,
70                   gpointer      user_data)
71 {
72   const GIOJob *aa = a;
73   const GIOJob *bb = b;
74
75   /* Cancelled jobs are set prio == -1, so that
76      they are executed as quickly as possible */
77   
78   /* Lower value => higher priority */
79   if (aa->io_priority < bb->io_priority)
80     return -1;
81   if (aa->io_priority == bb->io_priority)
82     return 0;
83   return 1;
84 }
85
86 static gpointer
87 init_scheduler (gpointer arg)
88 {
89   if (job_thread_pool == NULL)
90     {
91       /* TODO: thread_pool_new can fail */
92       job_thread_pool = g_thread_pool_new (io_job_thread,
93                                            NULL,
94                                            10,
95                                            FALSE,
96                                            NULL);
97       if (job_thread_pool != NULL)
98         {
99           g_thread_pool_set_sort_function (job_thread_pool,
100                                            g_io_job_compare,
101                                            NULL);
102           /* Its kinda weird that this is a global setting
103            * instead of per threadpool. However, we really
104            * want to cache some threads, but not keep around
105            * those threads forever. */
106           g_thread_pool_set_max_idle_time (15 * 1000);
107           g_thread_pool_set_max_unused_threads (2);
108         }
109     }
110   return NULL;
111 }
112
113 static void
114 remove_active_job (GIOJob *job)
115 {
116   GIOJob *other_job;
117   GSList *l;
118   gboolean resort_jobs;
119   
120   G_LOCK (active_jobs);
121   active_jobs = g_slist_delete_link (active_jobs, job->active_link);
122   
123   resort_jobs = FALSE;
124   for (l = active_jobs; l != NULL; l = l->next)
125     {
126       other_job = l->data;
127       if (other_job->io_priority >= 0 &&
128           g_cancellable_is_cancelled (other_job->cancellable))
129         {
130           other_job->io_priority = -1;
131           resort_jobs = TRUE;
132         }
133     }
134   G_UNLOCK (active_jobs);
135   
136   if (resort_jobs &&
137       job_thread_pool != NULL)
138     g_thread_pool_set_sort_function (job_thread_pool,
139                                      g_io_job_compare,
140                                      NULL);
141
142 }
143
144 static void
145 io_job_thread (gpointer data,
146                gpointer user_data)
147 {
148   GIOJob *job = data;
149
150   if (job->cancellable)
151     g_push_current_cancellable (job->cancellable);
152   job->job_func (job, job->cancellable, job->data);
153   if (job->cancellable)
154     g_pop_current_cancellable (job->cancellable);
155
156   if (job->destroy_notify)
157     job->destroy_notify (job->data);
158
159   remove_active_job (job);
160   g_io_job_free (job);
161
162 }
163
164 static gboolean
165 run_job_at_idle (gpointer data)
166 {
167   GIOJob *job = data;
168
169   if (job->cancellable)
170     g_push_current_cancellable (job->cancellable);
171   
172   job->job_func (job, job->cancellable, job->data);
173   
174   if (job->cancellable)
175     g_pop_current_cancellable (job->cancellable);
176
177   if (job->destroy_notify)
178     job->destroy_notify (job->data);
179
180   remove_active_job (job);
181   g_io_job_free (job);
182
183   return FALSE;
184 }
185
186 /**
187  * g_schedule_io_job:
188  * @job_func: a #GIOJobFunc.
189  * @user_data: a #gpointer.
190  * @notify: a #GDestroyNotify.
191  * @io_priority: the i/o priority of the request. a #gint.
192  * @cancellable: optional #GCancellable object, %NULL to ignore.
193  *
194  * Schedules the I/O Job.
195  * 
196  **/
197 void
198 g_schedule_io_job (GIOJobFunc      job_func,
199                    gpointer        user_data,
200                    GDestroyNotify  notify,
201                    gint            io_priority,
202                    GCancellable   *cancellable)
203 {
204   static GOnce once_init = G_ONCE_INIT;
205   GIOJob *job;
206
207   g_return_if_fail (job_func != NULL);
208
209   job = g_new0 (GIOJob, 1);
210   job->job_func = job_func;
211   job->data = user_data;
212   job->destroy_notify = notify;
213   job->io_priority = io_priority;
214     
215   if (cancellable)
216     job->cancellable = g_object_ref (cancellable);
217
218   G_LOCK (active_jobs);
219   active_jobs = g_slist_prepend (active_jobs, job);
220   job->active_link = active_jobs;
221   G_UNLOCK (active_jobs);
222
223   if (g_thread_supported())
224     {
225       g_once (&once_init, init_scheduler, NULL);
226       g_thread_pool_push (job_thread_pool, job, NULL);
227     }
228   else
229     {
230       /* Threads not available, instead do the i/o sync inside a
231        * low prio idle handler
232        */
233       job->idle_tag = g_idle_add_full (G_PRIORITY_DEFAULT_IDLE + 1 + io_priority / 10,
234                                        run_job_at_idle,
235                                        job, NULL);
236     }
237 }
238
239 /**
240  * g_cancel_all_io_jobs:
241  * 
242  * Cancels all cancellable I/O Jobs. 
243  **/
244 void
245 g_cancel_all_io_jobs (void)
246 {
247   GSList *cancellable_list, *l;
248   
249   G_LOCK (active_jobs);
250   cancellable_list = NULL;
251   for (l = active_jobs; l != NULL; l = l->next)
252     {
253       GIOJob *job = l->data;
254       if (job->cancellable)
255         cancellable_list = g_slist_prepend (cancellable_list,
256                                             g_object_ref (job->cancellable));
257     }
258   G_UNLOCK (active_jobs);
259
260   for (l = cancellable_list; l != NULL; l = l->next)
261     {
262       GCancellable *c = l->data;
263       g_cancellable_cancel (c);
264       g_object_unref (c);
265     }
266   g_slist_free (cancellable_list);
267 }
268
269 typedef struct {
270   GIODataFunc func;
271   gpointer    data;
272   GDestroyNotify notify;
273
274   GMutex *ack_lock;
275   GCond *ack_condition;
276 } MainLoopProxy;
277
278 static gboolean
279 mainloop_proxy_func (gpointer data)
280 {
281   MainLoopProxy *proxy = data;
282
283   proxy->func (proxy->data);
284
285   if (proxy->ack_lock)
286     {
287       g_mutex_lock (proxy->ack_lock);
288       g_cond_signal (proxy->ack_condition);
289       g_mutex_unlock (proxy->ack_lock);
290     }
291   
292   return FALSE;
293 }
294
295 static void
296 mainloop_proxy_free (MainLoopProxy *proxy)
297 {
298   if (proxy->ack_lock)
299     {
300       g_mutex_free (proxy->ack_lock);
301       g_cond_free (proxy->ack_condition);
302     }
303   
304   g_free (proxy);
305 }
306
307 static void
308 mainloop_proxy_notify (gpointer data)
309 {
310   MainLoopProxy *proxy = data;
311
312   if (proxy->notify)
313     proxy->notify (proxy->data);
314
315   /* If nonblocking we free here, otherwise we free in io thread */
316   if (proxy->ack_lock == NULL)
317     mainloop_proxy_free (proxy);
318 }
319
320 /**
321  * g_io_job_send_to_mainloop:
322  * @job: a #GIOJob.
323  * @func: a #GIODataFunc.
324  * @user_data: a #gpointer.
325  * @notify: a #GDestroyNotify.
326  * @block: boolean flag indicating whether or not this job should block.
327  * 
328  * Sends an I/O job to the application's main loop for processing.
329  * 
330  **/
331 void
332 g_io_job_send_to_mainloop (GIOJob         *job,
333                            GIODataFunc     func,
334                            gpointer        user_data,
335                            GDestroyNotify  notify,
336                            gboolean        block)
337 {
338   GSource *source;
339   MainLoopProxy *proxy;
340   guint id;
341
342   g_return_if_fail (job != NULL);
343   g_return_if_fail (func != NULL);
344
345   if (job->idle_tag)
346     {
347       /* We just immediately re-enter in the case of idles (non-threads)
348        * Anything else would just deadlock. If you can't handle this, enable threads.
349        */
350       func (user_data); 
351       return;
352     }
353   
354   proxy = g_new0 (MainLoopProxy, 1);
355   proxy->func = func;
356   proxy->data = user_data;
357   proxy->notify = notify;
358   
359   if (block)
360     {
361       proxy->ack_lock = g_mutex_new ();
362       proxy->ack_condition = g_cond_new ();
363     }
364   
365   source = g_idle_source_new ();
366   g_source_set_priority (source, G_PRIORITY_DEFAULT);
367
368   g_source_set_callback (source, mainloop_proxy_func, proxy, mainloop_proxy_notify);
369
370   if (block)
371     g_mutex_lock (proxy->ack_lock);
372                   
373   id = g_source_attach (source, NULL);
374   g_source_unref (source);
375
376   if (block)
377     {
378       g_cond_wait (proxy->ack_condition, proxy->ack_lock);
379       g_mutex_unlock (proxy->ack_lock);
380       
381       /* destroy notify didn't free proxy */
382       mainloop_proxy_free (proxy);
383     }
384 }
385
386 #define __G_IO_SCHEDULER_C__
387 #include "gioaliasdef.c"