[HQ](Debian) Package version up
[external/glib2.0.git] / glib / gasyncqueue.c
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*
24  * MT safe
25  */
26
27 #include "config.h"
28
29 #include "glib.h"
30 #include "galias.h"
31
32
33 struct _GAsyncQueue
34 {
35   GMutex *mutex;
36   GCond *cond;
37   GQueue *queue;
38   GDestroyNotify item_free_func;
39   guint waiting_threads;
40   gint32 ref_count;
41 };
42
43 typedef struct {
44   GCompareDataFunc func;
45   gpointer         user_data;
46 } SortData;
47
48 /**
49  * g_async_queue_new:
50  * 
51  * Creates a new asynchronous queue with the initial reference count of 1.
52  * 
53  * Return value: the new #GAsyncQueue.
54  **/
55 GAsyncQueue*
56 g_async_queue_new (void)
57 {
58   GAsyncQueue* retval = g_new (GAsyncQueue, 1);
59   retval->mutex = g_mutex_new ();
60   retval->cond = NULL;
61   retval->queue = g_queue_new ();
62   retval->waiting_threads = 0;
63   retval->ref_count = 1;
64   retval->item_free_func = NULL;
65   return retval;
66 }
67
68 /**
69  * g_async_queue_new_full:
70  * @item_free_func: function to free queue elements
71  * 
72  * Creates a new asynchronous queue with an initial reference count of 1 and
73  * sets up a destroy notify function that is used to free any remaining
74  * queue items when the queue is destroyed after the final unref.
75  *
76  * Return value: the new #GAsyncQueue.
77  *
78  * Since: 2.16
79  **/
80 GAsyncQueue*
81 g_async_queue_new_full (GDestroyNotify item_free_func)
82 {
83   GAsyncQueue *async_queue = g_async_queue_new ();
84   async_queue->item_free_func = item_free_func;
85   return async_queue;
86 }
87
88 /**
89  * g_async_queue_ref:
90  * @queue: a #GAsyncQueue.
91  *
92  * Increases the reference count of the asynchronous @queue by 1. You
93  * do not need to hold the lock to call this function.
94  *
95  * Returns: the @queue that was passed in (since 2.6)
96  **/
97 GAsyncQueue *
98 g_async_queue_ref (GAsyncQueue *queue)
99 {
100   g_return_val_if_fail (queue, NULL);
101   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
102   
103   g_atomic_int_inc (&queue->ref_count);
104
105   return queue;
106 }
107
108 /**
109  * g_async_queue_ref_unlocked:
110  * @queue: a #GAsyncQueue.
111  * 
112  * Increases the reference count of the asynchronous @queue by 1.
113  *
114  * @Deprecated: Since 2.8, reference counting is done atomically
115  * so g_async_queue_ref() can be used regardless of the @queue's
116  * lock.
117  **/
118 void 
119 g_async_queue_ref_unlocked (GAsyncQueue *queue)
120 {
121   g_return_if_fail (queue);
122   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
123   
124   g_atomic_int_inc (&queue->ref_count);
125 }
126
127 /**
128  * g_async_queue_unref_and_unlock:
129  * @queue: a #GAsyncQueue.
130  * 
131  * Decreases the reference count of the asynchronous @queue by 1 and
132  * releases the lock. This function must be called while holding the
133  * @queue's lock. If the reference count went to 0, the @queue will be
134  * destroyed and the memory allocated will be freed.
135  *
136  * @Deprecated: Since 2.8, reference counting is done atomically
137  * so g_async_queue_unref() can be used regardless of the @queue's
138  * lock.
139  **/
140 void 
141 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
142 {
143   g_return_if_fail (queue);
144   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
145
146   g_mutex_unlock (queue->mutex);
147   g_async_queue_unref (queue);
148 }
149
150 /**
151  * g_async_queue_unref:
152  * @queue: a #GAsyncQueue.
153  * 
154  * Decreases the reference count of the asynchronous @queue by 1. If
155  * the reference count went to 0, the @queue will be destroyed and the
156  * memory allocated will be freed. So you are not allowed to use the
157  * @queue afterwards, as it might have disappeared. You do not need to
158  * hold the lock to call this function.
159  **/
160 void 
161 g_async_queue_unref (GAsyncQueue *queue)
162 {
163   g_return_if_fail (queue);
164   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
165   
166   if (g_atomic_int_dec_and_test (&queue->ref_count))
167     {
168       g_return_if_fail (queue->waiting_threads == 0);
169       g_mutex_free (queue->mutex);
170       if (queue->cond)
171         g_cond_free (queue->cond);
172       if (queue->item_free_func)
173         g_queue_foreach (queue->queue, (GFunc) queue->item_free_func, NULL);
174       g_queue_free (queue->queue);
175       g_free (queue);
176     }
177 }
178
179 /**
180  * g_async_queue_lock:
181  * @queue: a #GAsyncQueue.
182  * 
183  * Acquires the @queue's lock. After that you can only call the
184  * <function>g_async_queue_*_unlocked()</function> function variants on that
185  * @queue. Otherwise it will deadlock.
186  **/
187 void
188 g_async_queue_lock (GAsyncQueue *queue)
189 {
190   g_return_if_fail (queue);
191   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
192
193   g_mutex_lock (queue->mutex);
194 }
195
196 /**
197  * g_async_queue_unlock:
198  * @queue: a #GAsyncQueue.
199  * 
200  * Releases the queue's lock.
201  **/
202 void 
203 g_async_queue_unlock (GAsyncQueue *queue)
204 {
205   g_return_if_fail (queue);
206   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
207
208   g_mutex_unlock (queue->mutex);
209 }
210
211 /**
212  * g_async_queue_push:
213  * @queue: a #GAsyncQueue.
214  * @data: @data to push into the @queue.
215  *
216  * Pushes the @data into the @queue. @data must not be %NULL.
217  **/
218 void
219 g_async_queue_push (GAsyncQueue* queue, gpointer data)
220 {
221   g_return_if_fail (queue);
222   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
223   g_return_if_fail (data);
224
225   g_mutex_lock (queue->mutex);
226   g_async_queue_push_unlocked (queue, data);
227   g_mutex_unlock (queue->mutex);
228 }
229
230 /**
231  * g_async_queue_push_unlocked:
232  * @queue: a #GAsyncQueue.
233  * @data: @data to push into the @queue.
234  * 
235  * Pushes the @data into the @queue. @data must not be %NULL. This
236  * function must be called while holding the @queue's lock.
237  **/
238 void
239 g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
240 {
241   g_return_if_fail (queue);
242   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
243   g_return_if_fail (data);
244
245   g_queue_push_head (queue->queue, data);
246   if (queue->waiting_threads > 0)
247     g_cond_signal (queue->cond);
248 }
249
250 /**
251  * g_async_queue_push_sorted:
252  * @queue: a #GAsyncQueue
253  * @data: the @data to push into the @queue
254  * @func: the #GCompareDataFunc is used to sort @queue. This function
255  *     is passed two elements of the @queue. The function should return
256  *     0 if they are equal, a negative value if the first element
257  *     should be higher in the @queue or a positive value if the first
258  *     element should be lower in the @queue than the second element.
259  * @user_data: user data passed to @func.
260  * 
261  * Inserts @data into @queue using @func to determine the new
262  * position. 
263  * 
264  * This function requires that the @queue is sorted before pushing on
265  * new elements.
266  * 
267  * This function will lock @queue before it sorts the queue and unlock
268  * it when it is finished.
269  * 
270  * For an example of @func see g_async_queue_sort(). 
271  *
272  * Since: 2.10
273  **/
274 void
275 g_async_queue_push_sorted (GAsyncQueue      *queue,
276                            gpointer          data,
277                            GCompareDataFunc  func,
278                            gpointer          user_data)
279 {
280   g_return_if_fail (queue != NULL);
281
282   g_mutex_lock (queue->mutex);
283   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
284   g_mutex_unlock (queue->mutex);
285 }
286
287 static gint 
288 g_async_queue_invert_compare (gpointer  v1, 
289                               gpointer  v2, 
290                               SortData *sd)
291 {
292   return -sd->func (v1, v2, sd->user_data);
293 }
294
295 /**
296  * g_async_queue_push_sorted_unlocked:
297  * @queue: a #GAsyncQueue
298  * @data: the @data to push into the @queue
299  * @func: the #GCompareDataFunc is used to sort @queue. This function
300  *     is passed two elements of the @queue. The function should return
301  *     0 if they are equal, a negative value if the first element
302  *     should be higher in the @queue or a positive value if the first
303  *     element should be lower in the @queue than the second element.
304  * @user_data: user data passed to @func.
305  * 
306  * Inserts @data into @queue using @func to determine the new
307  * position.
308  * 
309  * This function requires that the @queue is sorted before pushing on
310  * new elements.
311  * 
312  * This function is called while holding the @queue's lock.
313  * 
314  * For an example of @func see g_async_queue_sort(). 
315  *
316  * Since: 2.10
317  **/
318 void
319 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
320                                     gpointer          data,
321                                     GCompareDataFunc  func,
322                                     gpointer          user_data)
323 {
324   SortData sd;
325   
326   g_return_if_fail (queue != NULL);
327
328   sd.func = func;
329   sd.user_data = user_data;
330
331   g_queue_insert_sorted (queue->queue, 
332                          data, 
333                          (GCompareDataFunc)g_async_queue_invert_compare, 
334                          &sd);
335   if (queue->waiting_threads > 0)
336     g_cond_signal (queue->cond);
337 }
338
339 static gpointer
340 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue, 
341                                    gboolean     try, 
342                                    GTimeVal    *end_time)
343 {
344   gpointer retval;
345
346   if (!g_queue_peek_tail_link (queue->queue))
347     {
348       if (try)
349         return NULL;
350       
351       if (!queue->cond)
352         queue->cond = g_cond_new ();
353
354       if (!end_time)
355         {
356           queue->waiting_threads++;
357           while (!g_queue_peek_tail_link (queue->queue))
358             g_cond_wait (queue->cond, queue->mutex);
359           queue->waiting_threads--;
360         }
361       else
362         {
363           queue->waiting_threads++;
364           while (!g_queue_peek_tail_link (queue->queue))
365             if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
366               break;
367           queue->waiting_threads--;
368           if (!g_queue_peek_tail_link (queue->queue))
369             return NULL;
370         }
371     }
372
373   retval = g_queue_pop_tail (queue->queue);
374
375   g_assert (retval);
376
377   return retval;
378 }
379
380 /**
381  * g_async_queue_pop:
382  * @queue: a #GAsyncQueue.
383  * 
384  * Pops data from the @queue. This function blocks until data become
385  * available.
386  *
387  * Return value: data from the queue.
388  **/
389 gpointer
390 g_async_queue_pop (GAsyncQueue* queue)
391 {
392   gpointer retval;
393
394   g_return_val_if_fail (queue, NULL);
395   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
396
397   g_mutex_lock (queue->mutex);
398   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
399   g_mutex_unlock (queue->mutex);
400
401   return retval;
402 }
403
404 /**
405  * g_async_queue_pop_unlocked:
406  * @queue: a #GAsyncQueue.
407  * 
408  * Pops data from the @queue. This function blocks until data become
409  * available. This function must be called while holding the @queue's
410  * lock.
411  *
412  * Return value: data from the queue.
413  **/
414 gpointer
415 g_async_queue_pop_unlocked (GAsyncQueue* queue)
416 {
417   g_return_val_if_fail (queue, NULL);
418   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
419
420   return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
421 }
422
423 /**
424  * g_async_queue_try_pop:
425  * @queue: a #GAsyncQueue.
426  * 
427  * Tries to pop data from the @queue. If no data is available, %NULL is
428  * returned.
429  *
430  * Return value: data from the queue or %NULL, when no data is
431  * available immediately.
432  **/
433 gpointer
434 g_async_queue_try_pop (GAsyncQueue* queue)
435 {
436   gpointer retval;
437
438   g_return_val_if_fail (queue, NULL);
439   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
440
441   g_mutex_lock (queue->mutex);
442   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
443   g_mutex_unlock (queue->mutex);
444
445   return retval;
446 }
447
448 /**
449  * g_async_queue_try_pop_unlocked:
450  * @queue: a #GAsyncQueue.
451  * 
452  * Tries to pop data from the @queue. If no data is available, %NULL is
453  * returned. This function must be called while holding the @queue's
454  * lock.
455  *
456  * Return value: data from the queue or %NULL, when no data is
457  * available immediately.
458  **/
459 gpointer
460 g_async_queue_try_pop_unlocked (GAsyncQueue* queue)
461 {
462   g_return_val_if_fail (queue, NULL);
463   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
464
465   return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
466 }
467
468 /**
469  * g_async_queue_timed_pop:
470  * @queue: a #GAsyncQueue.
471  * @end_time: a #GTimeVal, determining the final time.
472  *
473  * Pops data from the @queue. If no data is received before @end_time,
474  * %NULL is returned.
475  *
476  * To easily calculate @end_time a combination of g_get_current_time()
477  * and g_time_val_add() can be used.
478  *
479  * Return value: data from the queue or %NULL, when no data is
480  * received before @end_time.
481  **/
482 gpointer
483 g_async_queue_timed_pop (GAsyncQueue* queue, GTimeVal *end_time)
484 {
485   gpointer retval;
486
487   g_return_val_if_fail (queue, NULL);
488   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
489
490   g_mutex_lock (queue->mutex);
491   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
492   g_mutex_unlock (queue->mutex);
493
494   return retval;  
495 }
496
497 /**
498  * g_async_queue_timed_pop_unlocked:
499  * @queue: a #GAsyncQueue.
500  * @end_time: a #GTimeVal, determining the final time.
501  *
502  * Pops data from the @queue. If no data is received before @end_time,
503  * %NULL is returned. This function must be called while holding the
504  * @queue's lock.
505  *
506  * To easily calculate @end_time a combination of g_get_current_time()
507  * and g_time_val_add() can be used.
508  *
509  * Return value: data from the queue or %NULL, when no data is
510  * received before @end_time.
511  **/
512 gpointer
513 g_async_queue_timed_pop_unlocked (GAsyncQueue* queue, GTimeVal *end_time)
514 {
515   g_return_val_if_fail (queue, NULL);
516   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
517
518   return g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
519 }
520
521 /**
522  * g_async_queue_length:
523  * @queue: a #GAsyncQueue.
524  * 
525  * Returns the length of the queue, negative values mean waiting
526  * threads, positive values mean available entries in the
527  * @queue. Actually this function returns the number of data items in
528  * the queue minus the number of waiting threads. Thus a return value
529  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
530  * That can happen due to locking of the queue or due to
531  * scheduling.  
532  *
533  * Return value: the length of the @queue.
534  **/
535 gint
536 g_async_queue_length (GAsyncQueue* queue)
537 {
538   gint retval;
539
540   g_return_val_if_fail (queue, 0);
541   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
542
543   g_mutex_lock (queue->mutex);
544   retval = queue->queue->length - queue->waiting_threads;
545   g_mutex_unlock (queue->mutex);
546
547   return retval;
548 }
549
550 /**
551  * g_async_queue_length_unlocked:
552  * @queue: a #GAsyncQueue.
553  * 
554  * Returns the length of the queue, negative values mean waiting
555  * threads, positive values mean available entries in the
556  * @queue. Actually this function returns the number of data items in
557  * the queue minus the number of waiting threads. Thus a return value
558  * of 0 could mean 'n' entries in the queue and 'n' thread waiting.
559  * That can happen due to locking of the queue or due to
560  * scheduling. This function must be called while holding the @queue's
561  * lock.
562  *
563  * Return value: the length of the @queue.
564  **/
565 gint
566 g_async_queue_length_unlocked (GAsyncQueue* queue)
567 {
568   g_return_val_if_fail (queue, 0);
569   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
570
571   return queue->queue->length - queue->waiting_threads;
572 }
573
574 /**
575  * g_async_queue_sort:
576  * @queue: a #GAsyncQueue
577  * @func: the #GCompareDataFunc is used to sort @queue. This
578  *     function is passed two elements of the @queue. The function
579  *     should return 0 if they are equal, a negative value if the
580  *     first element should be higher in the @queue or a positive
581  *     value if the first element should be lower in the @queue than
582  *     the second element. 
583  * @user_data: user data passed to @func
584  *
585  * Sorts @queue using @func. 
586  *
587  * This function will lock @queue before it sorts the queue and unlock
588  * it when it is finished.
589  *
590  * If you were sorting a list of priority numbers to make sure the
591  * lowest priority would be at the top of the queue, you could use:
592  * |[
593  *  gint32 id1;
594  *  gint32 id2;
595  *   
596  *  id1 = GPOINTER_TO_INT (element1);
597  *  id2 = GPOINTER_TO_INT (element2);
598  *   
599  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
600  * ]|
601  *
602  * Since: 2.10
603  **/
604 void
605 g_async_queue_sort (GAsyncQueue      *queue,
606                     GCompareDataFunc  func,
607                     gpointer          user_data)
608 {
609   g_return_if_fail (queue != NULL);
610   g_return_if_fail (func != NULL);
611
612   g_mutex_lock (queue->mutex);
613   g_async_queue_sort_unlocked (queue, func, user_data);
614   g_mutex_unlock (queue->mutex);
615 }
616
617 /**
618  * g_async_queue_sort_unlocked:
619  * @queue: a #GAsyncQueue
620  * @func: the #GCompareDataFunc is used to sort @queue. This
621  *     function is passed two elements of the @queue. The function
622  *     should return 0 if they are equal, a negative value if the
623  *     first element should be higher in the @queue or a positive
624  *     value if the first element should be lower in the @queue than
625  *     the second element. 
626  * @user_data: user data passed to @func
627  *
628  * Sorts @queue using @func. 
629  *
630  * This function is called while holding the @queue's lock.
631  * 
632  * Since: 2.10
633  **/
634 void
635 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
636                              GCompareDataFunc  func,
637                              gpointer          user_data)
638 {
639   SortData sd;
640
641   g_return_if_fail (queue != NULL);
642   g_return_if_fail (func != NULL);
643
644   sd.func = func;
645   sd.user_data = user_data;
646
647   g_queue_sort (queue->queue, 
648                 (GCompareDataFunc)g_async_queue_invert_compare, 
649                 &sd);
650 }
651
652 /*
653  * Private API
654  */
655
656 GMutex*
657 _g_async_queue_get_mutex (GAsyncQueue* queue)
658 {
659   g_return_val_if_fail (queue, NULL);
660   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
661
662   return queue->mutex;
663 }
664
665 #define __G_ASYNCQUEUE_C__
666 #include "galiasdef.c"