gio/ docs/reference/gio Merged gio-standalone into glib.
[platform/upstream/glib.git] / gio / gioscheduler.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24
25 #include "gioscheduler.h"
26
27 struct _GIOJob {
28   GSList *active_link;
29   GIOJobFunc job_func;
30   GIODataFunc cancel_func; /* Runs under job map lock */
31   gpointer data;
32   GDestroyNotify destroy_notify;
33
34   gint io_priority;
35   GCancellable *cancellable;
36
37   guint idle_tag;
38 };
39
40 G_LOCK_DEFINE_STATIC(active_jobs);
41 static GSList *active_jobs = NULL;
42
43 static GThreadPool *job_thread_pool = NULL;
44
45 static void io_job_thread (gpointer       data,
46                            gpointer       user_data);
47
48 static void
49 g_io_job_free (GIOJob *job)
50 {
51   if (job->cancellable)
52     g_object_unref (job->cancellable);
53   g_free (job);
54 }
55
56 static gint
57 g_io_job_compare (gconstpointer  a,
58                   gconstpointer  b,
59                   gpointer       user_data)
60 {
61   const GIOJob *aa = a;
62   const GIOJob *bb = b;
63
64   /* Cancelled jobs are set prio == -1, so that
65      they are executed as quickly as possible */
66   
67   /* Lower value => higher priority */
68   if (aa->io_priority < bb->io_priority)
69     return -1;
70   if (aa->io_priority == bb->io_priority)
71     return 0;
72   return 1;
73 }
74
75 static gpointer
76 init_scheduler (gpointer arg)
77 {
78   if (job_thread_pool == NULL)
79     {
80       /* TODO: thread_pool_new can fail */
81       job_thread_pool = g_thread_pool_new (io_job_thread,
82                                            NULL,
83                                            10,
84                                            FALSE,
85                                            NULL);
86       if (job_thread_pool != NULL)
87         {
88           g_thread_pool_set_sort_function (job_thread_pool,
89                                            g_io_job_compare,
90                                            NULL);
91           /* Its kinda weird that this is a global setting
92            * instead of per threadpool. However, we really
93            * want to cache some threads, but not keep around
94            * those threads forever. */
95           g_thread_pool_set_max_idle_time (15 * 1000);
96           g_thread_pool_set_max_unused_threads (2);
97         }
98     }
99   return NULL;
100 }
101
102 static void
103 remove_active_job (GIOJob *job)
104 {
105   GIOJob *other_job;
106   GSList *l;
107   gboolean resort_jobs;
108   
109   G_LOCK (active_jobs);
110   active_jobs = g_slist_delete_link (active_jobs, job->active_link);
111   
112   resort_jobs = FALSE;
113   for (l = active_jobs; l != NULL; l = l->next)
114     {
115       other_job = l->data;
116       if (other_job->io_priority >= 0 &&
117           g_cancellable_is_cancelled (other_job->cancellable))
118         {
119           other_job->io_priority = -1;
120           resort_jobs = TRUE;
121         }
122     }
123   G_UNLOCK (active_jobs);
124   
125   if (resort_jobs &&
126       job_thread_pool != NULL)
127     g_thread_pool_set_sort_function (job_thread_pool,
128                                      g_io_job_compare,
129                                      NULL);
130
131 }
132
133 static void
134 io_job_thread (gpointer       data,
135                gpointer       user_data)
136 {
137   GIOJob *job = data;
138
139   if (job->cancellable)
140     g_push_current_cancellable (job->cancellable);
141   job->job_func (job, job->cancellable, job->data);
142   if (job->cancellable)
143     g_pop_current_cancellable (job->cancellable);
144
145   if (job->destroy_notify)
146     job->destroy_notify (job->data);
147
148   remove_active_job (job);
149   g_io_job_free (job);
150
151 }
152
153 static gboolean
154 run_job_at_idle (gpointer data)
155 {
156   GIOJob *job = data;
157
158   if (job->cancellable)
159     g_push_current_cancellable (job->cancellable);
160   
161   job->job_func (job, job->cancellable, job->data);
162   
163   if (job->cancellable)
164     g_pop_current_cancellable (job->cancellable);
165
166   if (job->destroy_notify)
167     job->destroy_notify (job->data);
168
169   remove_active_job (job);
170   g_io_job_free (job);
171
172   return FALSE;
173 }
174
175 /**
176  * g_schedule_io_job:
177  * @job_func: a #GIOJobFunc.
178  * @user_data: a #gpointer.
179  * @notify: a #GDestroyNotify.
180  * @io_priority: the io priority of the request. a #gint.
181  * @cancellable: optional #GCancellable object, %NULL to ignore.
182  *
183  * Schedules the @job_func.
184  * 
185  **/
186 void
187 g_schedule_io_job (GIOJobFunc     job_func,
188                    gpointer       user_data,
189                    GDestroyNotify notify,
190                    gint           io_priority,
191                    GCancellable  *cancellable)
192 {
193   static GOnce once_init = G_ONCE_INIT;
194   GIOJob *job;
195
196   g_return_if_fail (job_func != NULL);
197
198   job = g_new0 (GIOJob, 1);
199   job->job_func = job_func;
200   job->data = user_data;
201   job->destroy_notify = notify;
202   job->io_priority = io_priority;
203     
204   if (cancellable)
205     job->cancellable = g_object_ref (cancellable);
206
207   G_LOCK (active_jobs);
208   active_jobs = g_slist_prepend (active_jobs, job);
209   job->active_link = active_jobs;
210   G_UNLOCK (active_jobs);
211
212   if (g_thread_supported())
213     {
214       g_once (&once_init, init_scheduler, NULL);
215       g_thread_pool_push (job_thread_pool, job, NULL);
216     }
217   else
218     {
219       /* Threads not available, instead do the i/o sync inside a
220        * low prio idle handler
221        */
222       job->idle_tag = g_idle_add_full (G_PRIORITY_DEFAULT_IDLE + 1 + io_priority / 10,
223                                        run_job_at_idle,
224                                        job, NULL);
225     }
226 }
227
228 /**
229  * g_cancel_all_io_jobs:
230  * 
231  * Cancels all cancellable I/O Jobs. 
232  **/
233 void
234 g_cancel_all_io_jobs (void)
235 {
236   GSList *cancellable_list, *l;
237   
238   G_LOCK (active_jobs);
239   cancellable_list = NULL;
240   for (l = active_jobs; l != NULL; l = l->next)
241     {
242       GIOJob *job = l->data;
243       if (job->cancellable)
244         cancellable_list = g_slist_prepend (cancellable_list,
245                                             g_object_ref (job->cancellable));
246     }
247   G_UNLOCK (active_jobs);
248
249   for (l = cancellable_list; l != NULL; l = l->next)
250     {
251       GCancellable *c = l->data;
252       g_cancellable_cancel (c);
253       g_object_unref (c);
254     }
255   g_slist_free (cancellable_list);
256 }
257
258 typedef struct {
259   GIODataFunc func;
260   gpointer    data;
261   GDestroyNotify notify;
262
263   GMutex *ack_lock;
264   GCond *ack_condition;
265 } MainLoopProxy;
266
267 static gboolean
268 mainloop_proxy_func (gpointer data)
269 {
270   MainLoopProxy *proxy = data;
271
272   proxy->func (proxy->data);
273
274   if (proxy->ack_lock)
275     {
276       g_mutex_lock (proxy->ack_lock);
277       g_cond_signal (proxy->ack_condition);
278       g_mutex_unlock (proxy->ack_lock);
279     }
280   
281   return FALSE;
282 }
283
284 static void
285 mainloop_proxy_free (MainLoopProxy *proxy)
286 {
287   if (proxy->ack_lock)
288     {
289       g_mutex_free (proxy->ack_lock);
290       g_cond_free (proxy->ack_condition);
291     }
292   
293   g_free (proxy);
294 }
295
296 static void
297 mainloop_proxy_notify (gpointer data)
298 {
299   MainLoopProxy *proxy = data;
300
301   if (proxy->notify)
302     proxy->notify (proxy->data);
303
304   /* If nonblocking we free here, otherwise we free in io thread */
305   if (proxy->ack_lock == NULL)
306     mainloop_proxy_free (proxy);
307 }
308
309 /**
310  * g_io_job_send_to_mainloop:
311  * @job: a #GIOJob.
312  * @func: a #GIODataFunc.
313  * @user_data: a #gpointer.
314  * @notify: a #GDestroyNotify.
315  * @block: boolean flag indicating whether or not this job should block.
316  * 
317  * 
318  **/
319 void
320 g_io_job_send_to_mainloop (GIOJob        *job,
321                            GIODataFunc    func,
322                            gpointer       user_data,
323                            GDestroyNotify notify,
324                            gboolean       block)
325 {
326   GSource *source;
327   MainLoopProxy *proxy;
328   guint id;
329
330   g_return_if_fail (job != NULL);
331   g_return_if_fail (func != NULL);
332
333   if (job->idle_tag)
334     {
335       /* We just immediately re-enter in the case of idles (non-threads)
336        * Anything else would just deadlock. If you can't handle this, enable threads.
337        */
338       func (user_data); 
339       return;
340     }
341   
342   proxy = g_new0 (MainLoopProxy, 1);
343   proxy->func = func;
344   proxy->data = user_data;
345   proxy->notify = notify;
346   
347   if (block)
348     {
349       proxy->ack_lock = g_mutex_new ();
350       proxy->ack_condition = g_cond_new ();
351     }
352   
353   source = g_idle_source_new ();
354   g_source_set_priority (source, G_PRIORITY_DEFAULT);
355
356   g_source_set_callback (source, mainloop_proxy_func, proxy, mainloop_proxy_notify);
357
358   if (block)
359     g_mutex_lock (proxy->ack_lock);
360                   
361   id = g_source_attach (source, NULL);
362   g_source_unref (source);
363
364   if (block)
365     {
366       g_cond_wait (proxy->ack_condition, proxy->ack_lock);
367       g_mutex_unlock (proxy->ack_lock);
368       
369       /* destroy notify didn't free proxy */
370       mainloop_proxy_free (proxy);
371     }
372 }