fd3fcbb75599b1c31ee3de7812f606c7d24d94d7
[platform/upstream/glibc.git] / sysdeps / pthread / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2006, 2007
3    Free Software Foundation, Inc.
4    This file is part of the GNU C Library.
5    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
6
7    The GNU C Library is free software; you can redistribute it and/or
8    modify it under the terms of the GNU Lesser General Public
9    License as published by the Free Software Foundation; either
10    version 2.1 of the License, or (at your option) any later version.
11
12    The GNU C Library is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15    Lesser General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public
18    License along with the GNU C Library; if not, write to the Free
19    Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20    02111-1307 USA.  */
21
22 #include <aio.h>
23 #include <assert.h>
24 #include <errno.h>
25 #include <limits.h>
26 #include <pthread.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30 #include <sys/time.h>
31 #include <aio_misc.h>
32
33 #ifndef aio_create_helper_thread
34 # define aio_create_helper_thread __aio_create_helper_thread
35
36 extern inline int
37 __aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
38 {
39   pthread_attr_t attr;
40
41   /* Make sure the thread is created detached.  */
42   pthread_attr_init (&attr);
43   pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
44
45   int ret = pthread_create (threadp, &attr, tf, arg);
46
47   (void) pthread_attr_destroy (&attr);
48   return ret;
49 }
50 #endif
51
52 static void add_request_to_runlist (struct requestlist *newrequest);
53
54 /* Pool of request list entries.  */
55 static struct requestlist **pool;
56
57 /* Number of total and allocated pool entries.  */
58 static size_t pool_max_size;
59 static size_t pool_size;
60
61 /* We implement a two dimensional array but allocate each row separately.
62    The macro below determines how many entries should be used per row.
63    It should better be a power of two.  */
64 #define ENTRIES_PER_ROW 32
65
66 /* How many rows we allocate at once.  */
67 #define ROWS_STEP       8
68
69 /* List of available entries.  */
70 static struct requestlist *freelist;
71
72 /* List of request waiting to be processed.  */
73 static struct requestlist *runlist;
74
75 /* Structure list of all currently processed requests.  */
76 static struct requestlist *requests;
77
78 /* Number of threads currently running.  */
79 static int nthreads;
80
81 /* Number of threads waiting for work to arrive. */
82 static int idle_thread_count;
83
84
85 /* These are the values used to optimize the use of AIO.  The user can
86    overwrite them by using the `aio_init' function.  */
87 static struct aioinit optim =
88 {
89   20,   /* int aio_threads;     Maximal number of threads.  */
90   64,   /* int aio_num;         Number of expected simultanious requests. */
91   0,
92   0,
93   0,
94   0,
95   1,
96   0
97 };
98
99
100 /* Since the list is global we need a mutex protecting it.  */
101 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
102
103 /* When you add a request to the list and there are idle threads present,
104    you signal this condition variable. When a thread finishes work, it waits
105    on this condition variable for a time before it actually exits. */
106 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
107
108
109 /* Functions to handle request list pool.  */
110 static struct requestlist *
111 get_elem (void)
112 {
113   struct requestlist *result;
114
115   if (freelist == NULL)
116     {
117       struct requestlist *new_row;
118       int cnt;
119
120       assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
121
122       if (pool_size + 1 >= pool_max_size)
123         {
124           size_t new_max_size = pool_max_size + ROWS_STEP;
125           struct requestlist **new_tab;
126
127           new_tab = (struct requestlist **)
128             realloc (pool, new_max_size * sizeof (struct requestlist *));
129
130           if (new_tab == NULL)
131             return NULL;
132
133           pool_max_size = new_max_size;
134           pool = new_tab;
135         }
136
137       /* Allocate the new row.  */
138       cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
139       new_row = (struct requestlist *) calloc (cnt,
140                                                sizeof (struct requestlist));
141       if (new_row == NULL)
142         return NULL;
143
144       pool[pool_size++] = new_row;
145
146       /* Put all the new entries in the freelist.  */
147       do
148         {
149           new_row->next_prio = freelist;
150           freelist = new_row++;
151         }
152       while (--cnt > 0);
153     }
154
155   result = freelist;
156   freelist = freelist->next_prio;
157
158   return result;
159 }
160
161
162 void
163 internal_function
164 __aio_free_request (struct requestlist *elem)
165 {
166   elem->running = no;
167   elem->next_prio = freelist;
168   freelist = elem;
169 }
170
171
172 struct requestlist *
173 internal_function
174 __aio_find_req (aiocb_union *elem)
175 {
176   struct requestlist *runp = requests;
177   int fildes = elem->aiocb.aio_fildes;
178
179   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
180     runp = runp->next_fd;
181
182   if (runp != NULL)
183     {
184       if (runp->aiocbp->aiocb.aio_fildes != fildes)
185         runp = NULL;
186       else
187         while (runp != NULL && runp->aiocbp != elem)
188           runp = runp->next_prio;
189     }
190
191   return runp;
192 }
193
194
195 struct requestlist *
196 internal_function
197 __aio_find_req_fd (int fildes)
198 {
199   struct requestlist *runp = requests;
200
201   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
202     runp = runp->next_fd;
203
204   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
205           ? runp : NULL);
206 }
207
208
209 void
210 internal_function
211 __aio_remove_request (struct requestlist *last, struct requestlist *req,
212                       int all)
213 {
214   assert (req->running == yes || req->running == queued
215           || req->running == done);
216
217   if (last != NULL)
218     last->next_prio = all ? NULL : req->next_prio;
219   else
220     {
221       if (all || req->next_prio == NULL)
222         {
223           if (req->last_fd != NULL)
224             req->last_fd->next_fd = req->next_fd;
225           else
226             requests = req->next_fd;
227           if (req->next_fd != NULL)
228             req->next_fd->last_fd = req->last_fd;
229         }
230       else
231         {
232           if (req->last_fd != NULL)
233             req->last_fd->next_fd = req->next_prio;
234           else
235             requests = req->next_prio;
236
237           if (req->next_fd != NULL)
238             req->next_fd->last_fd = req->next_prio;
239
240           req->next_prio->last_fd = req->last_fd;
241           req->next_prio->next_fd = req->next_fd;
242
243           /* Mark this entry as runnable.  */
244           req->next_prio->running = yes;
245         }
246
247       if (req->running == yes)
248         {
249           struct requestlist *runp = runlist;
250
251           last = NULL;
252           while (runp != NULL)
253             {
254               if (runp == req)
255                 {
256                   if (last == NULL)
257                     runlist = runp->next_run;
258                   else
259                     last->next_run = runp->next_run;
260                   break;
261                 }
262               last = runp;
263               runp = runp->next_run;
264             }
265         }
266     }
267 }
268
269
270 /* The thread handler.  */
271 static void *handle_fildes_io (void *arg);
272
273
274 /* User optimization.  */
275 void
276 __aio_init (const struct aioinit *init)
277 {
278   /* Get the mutex.  */
279   pthread_mutex_lock (&__aio_requests_mutex);
280
281   /* Only allow writing new values if the table is not yet allocated.  */
282   if (pool == NULL)
283     {
284       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
285       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
286                        ? ENTRIES_PER_ROW
287                        : init->aio_num & ~ENTRIES_PER_ROW);
288     }
289
290   if (init->aio_idle_time != 0)
291     optim.aio_idle_time = init->aio_idle_time;
292
293   /* Release the mutex.  */
294   pthread_mutex_unlock (&__aio_requests_mutex);
295 }
296 weak_alias (__aio_init, aio_init)
297
298
299 /* The main function of the async I/O handling.  It enqueues requests
300    and if necessary starts and handles threads.  */
301 struct requestlist *
302 internal_function
303 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
304 {
305   int result = 0;
306   int policy, prio;
307   struct sched_param param;
308   struct requestlist *last, *runp, *newp;
309   int running = no;
310
311   if (operation == LIO_SYNC || operation == LIO_DSYNC)
312     aiocbp->aiocb.aio_reqprio = 0;
313   else if (aiocbp->aiocb.aio_reqprio < 0
314            || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
315     {
316       /* Invalid priority value.  */
317       __set_errno (EINVAL);
318       aiocbp->aiocb.__error_code = EINVAL;
319       aiocbp->aiocb.__return_value = -1;
320       return NULL;
321     }
322
323   /* Compute priority for this request.  */
324   pthread_getschedparam (pthread_self (), &policy, &param);
325   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
326
327   /* Get the mutex.  */
328   pthread_mutex_lock (&__aio_requests_mutex);
329
330   last = NULL;
331   runp = requests;
332   /* First look whether the current file descriptor is currently
333      worked with.  */
334   while (runp != NULL
335          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
336     {
337       last = runp;
338       runp = runp->next_fd;
339     }
340
341   /* Get a new element for the waiting list.  */
342   newp = get_elem ();
343   if (newp == NULL)
344     {
345       pthread_mutex_unlock (&__aio_requests_mutex);
346       __set_errno (EAGAIN);
347       return NULL;
348     }
349   newp->aiocbp = aiocbp;
350 #ifdef BROKEN_THREAD_SIGNALS
351   newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL
352                       ? getpid () : 0);
353 #endif
354   newp->waiting = NULL;
355
356   aiocbp->aiocb.__abs_prio = prio;
357   aiocbp->aiocb.__policy = policy;
358   aiocbp->aiocb.aio_lio_opcode = operation;
359   aiocbp->aiocb.__error_code = EINPROGRESS;
360   aiocbp->aiocb.__return_value = 0;
361
362   if (runp != NULL
363       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
364     {
365       /* The current file descriptor is worked on.  It makes no sense
366          to start another thread since this new thread would fight
367          with the running thread for the resources.  But we also cannot
368          say that the thread processing this desriptor shall immediately
369          after finishing the current job process this request if there
370          are other threads in the running queue which have a higher
371          priority.  */
372
373       /* Simply enqueue it after the running one according to the
374          priority.  */
375       while (runp->next_prio != NULL
376              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
377         runp = runp->next_prio;
378
379       newp->next_prio = runp->next_prio;
380       runp->next_prio = newp;
381
382       running = queued;
383     }
384   else
385     {
386       running = yes;
387       /* Enqueue this request for a new descriptor.  */
388       if (last == NULL)
389         {
390           newp->last_fd = NULL;
391           newp->next_fd = requests;
392           if (requests != NULL)
393             requests->last_fd = newp;
394           requests = newp;
395         }
396       else
397         {
398           newp->next_fd = last->next_fd;
399           newp->last_fd = last;
400           last->next_fd = newp;
401           if (newp->next_fd != NULL)
402             newp->next_fd->last_fd = newp;
403         }
404
405       newp->next_prio = NULL;
406     }
407
408   if (running == yes)
409     {
410       /* We try to create a new thread for this file descriptor.  The
411          function which gets called will handle all available requests
412          for this descriptor and when all are processed it will
413          terminate.
414
415          If no new thread can be created or if the specified limit of
416          threads for AIO is reached we queue the request.  */
417
418       /* See if we need to and are able to create a thread.  */
419       if (nthreads < optim.aio_threads && idle_thread_count == 0)
420         {
421           pthread_t thid;
422
423           running = newp->running = allocated;
424
425           /* Now try to start a thread.  */
426           if (aio_create_helper_thread (&thid, handle_fildes_io, newp) == 0)
427             /* We managed to enqueue the request.  All errors which can
428                happen now can be recognized by calls to `aio_return' and
429                `aio_error'.  */
430             ++nthreads;
431           else
432             {
433               /* Reset the running flag.  The new request is not running.  */
434               running = newp->running = yes;
435
436               if (nthreads == 0)
437                 /* We cannot create a thread in the moment and there is
438                    also no thread running.  This is a problem.  `errno' is
439                    set to EAGAIN if this is only a temporary problem.  */
440                 result = -1;
441             }
442         }
443     }
444
445   /* Enqueue the request in the run queue if it is not yet running.  */
446   if (running == yes && result == 0)
447     {
448       add_request_to_runlist (newp);
449
450       /* If there is a thread waiting for work, then let it know that we
451          have just given it something to do. */
452       if (idle_thread_count > 0)
453         pthread_cond_signal (&__aio_new_request_notification);
454     }
455
456   if (result == 0)
457     newp->running = running;
458   else
459     {
460       /* Something went wrong.  */
461       __aio_free_request (newp);
462       newp = NULL;
463     }
464
465   /* Release the mutex.  */
466   pthread_mutex_unlock (&__aio_requests_mutex);
467
468   return newp;
469 }
470
471
472 static void *
473 handle_fildes_io (void *arg)
474 {
475   pthread_t self = pthread_self ();
476   struct sched_param param;
477   struct requestlist *runp = (struct requestlist *) arg;
478   aiocb_union *aiocbp;
479   int policy;
480   int fildes;
481
482   pthread_getschedparam (self, &policy, &param);
483
484   do
485     {
486       /* If runp is NULL, then we were created to service the work queue
487          in general, not to handle any particular request. In that case we
488          skip the "do work" stuff on the first pass, and go directly to the
489          "get work off the work queue" part of this loop, which is near the
490          end. */
491       if (runp == NULL)
492         pthread_mutex_lock (&__aio_requests_mutex);
493       else
494         {
495           /* Hopefully this request is marked as running.  */
496           assert (runp->running == allocated);
497
498           /* Update our variables.  */
499           aiocbp = runp->aiocbp;
500           fildes = aiocbp->aiocb.aio_fildes;
501
502           /* Change the priority to the requested value (if necessary).  */
503           if (aiocbp->aiocb.__abs_prio != param.sched_priority
504               || aiocbp->aiocb.__policy != policy)
505             {
506               param.sched_priority = aiocbp->aiocb.__abs_prio;
507               policy = aiocbp->aiocb.__policy;
508               pthread_setschedparam (self, policy, &param);
509             }
510
511           /* Process request pointed to by RUNP.  We must not be disturbed
512              by signals.  */
513           if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
514             {
515               if (sizeof (off_t) != sizeof (off64_t)
516                   && aiocbp->aiocb.aio_lio_opcode & 128)
517                 aiocbp->aiocb.__return_value =
518                   TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
519                                                  aiocbp->aiocb64.aio_buf,
520                                                  aiocbp->aiocb64.aio_nbytes,
521                                                  aiocbp->aiocb64.aio_offset));
522               else
523                 aiocbp->aiocb.__return_value =
524                   TEMP_FAILURE_RETRY (pread (fildes,
525                                              (void *) aiocbp->aiocb.aio_buf,
526                                              aiocbp->aiocb.aio_nbytes,
527                                              aiocbp->aiocb.aio_offset));
528
529               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
530                 /* The Linux kernel is different from others.  It returns
531                    ESPIPE if using pread on a socket.  Other platforms
532                    simply ignore the offset parameter and behave like
533                    read.  */
534                 aiocbp->aiocb.__return_value =
535                   TEMP_FAILURE_RETRY (read (fildes,
536                                             (void *) aiocbp->aiocb64.aio_buf,
537                                             aiocbp->aiocb64.aio_nbytes));
538             }
539           else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
540             {
541               if (sizeof (off_t) != sizeof (off64_t)
542                   && aiocbp->aiocb.aio_lio_opcode & 128)
543                 aiocbp->aiocb.__return_value =
544                   TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
545                                                   aiocbp->aiocb64.aio_buf,
546                                                   aiocbp->aiocb64.aio_nbytes,
547                                                   aiocbp->aiocb64.aio_offset));
548               else
549                 aiocbp->aiocb.__return_value =
550                   TEMP_FAILURE_RETRY (__libc_pwrite (fildes, (const void *)
551                                               aiocbp->aiocb.aio_buf,
552                                               aiocbp->aiocb.aio_nbytes,
553                                               aiocbp->aiocb.aio_offset));
554
555               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
556                 /* The Linux kernel is different from others.  It returns
557                    ESPIPE if using pwrite on a socket.  Other platforms
558                    simply ignore the offset parameter and behave like
559                    write.  */
560                 aiocbp->aiocb.__return_value =
561                   TEMP_FAILURE_RETRY (write (fildes,
562                                              (void *) aiocbp->aiocb64.aio_buf,
563                                              aiocbp->aiocb64.aio_nbytes));
564             }
565           else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
566             aiocbp->aiocb.__return_value =
567               TEMP_FAILURE_RETRY (fdatasync (fildes));
568           else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
569             aiocbp->aiocb.__return_value =
570               TEMP_FAILURE_RETRY (fsync (fildes));
571           else
572             {
573               /* This is an invalid opcode.  */
574               aiocbp->aiocb.__return_value = -1;
575               __set_errno (EINVAL);
576             }
577
578           /* Get the mutex.  */
579           pthread_mutex_lock (&__aio_requests_mutex);
580
581           /* In theory we would need here a write memory barrier since the
582              callers test using aio_error() whether the request finished
583              and once this value != EINPROGRESS the field __return_value
584              must be committed to memory.
585
586              But since the pthread_mutex_lock call involves write memory
587              barriers as well it is not necessary.  */
588
589           if (aiocbp->aiocb.__return_value == -1)
590             aiocbp->aiocb.__error_code = errno;
591           else
592             aiocbp->aiocb.__error_code = 0;
593
594           /* Send the signal to notify about finished processing of the
595              request.  */
596           __aio_notify (runp);
597
598           /* For debugging purposes we reset the running flag of the
599              finished request.  */
600           assert (runp->running == allocated);
601           runp->running = done;
602
603           /* Now dequeue the current request.  */
604           __aio_remove_request (NULL, runp, 0);
605           if (runp->next_prio != NULL)
606             add_request_to_runlist (runp->next_prio);
607
608           /* Free the old element.  */
609           __aio_free_request (runp);
610         }
611
612       runp = runlist;
613
614       /* If the runlist is empty, then we sleep for a while, waiting for
615          something to arrive in it. */
616       if (runp == NULL && optim.aio_idle_time >= 0)
617         {
618           struct timeval now;
619           struct timespec wakeup_time;
620
621           ++idle_thread_count;
622           gettimeofday (&now, NULL);
623           wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
624           wakeup_time.tv_nsec = now.tv_usec * 1000;
625           if (wakeup_time.tv_nsec > 1000000000)
626             {
627               wakeup_time.tv_nsec -= 1000000000;
628               ++wakeup_time.tv_sec;
629             }
630           pthread_cond_timedwait (&__aio_new_request_notification,
631                                   &__aio_requests_mutex,
632                                   &wakeup_time);
633           --idle_thread_count;
634           runp = runlist;
635         }
636
637       if (runp == NULL)
638         --nthreads;
639       else
640         {
641           assert (runp->running == yes);
642           runp->running = allocated;
643           runlist = runp->next_run;
644
645           /* If we have a request to process, and there's still another in
646              the run list, then we need to either wake up or create a new
647              thread to service the request that is still in the run list. */
648           if (runlist != NULL)
649             {
650               /* There are at least two items in the work queue to work on.
651                  If there are other idle threads, then we should wake them
652                  up for these other work elements; otherwise, we should try
653                  to create a new thread. */
654               if (idle_thread_count > 0)
655                 pthread_cond_signal (&__aio_new_request_notification);
656               else if (nthreads < optim.aio_threads)
657                 {
658                   pthread_t thid;
659                   pthread_attr_t attr;
660
661                   /* Make sure the thread is created detached.  */
662                   pthread_attr_init (&attr);
663                   pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
664
665                   /* Now try to start a thread. If we fail, no big deal,
666                      because we know that there is at least one thread (us)
667                      that is working on AIO operations. */
668                   if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
669                       == 0)
670                     ++nthreads;
671                 }
672             }
673         }
674
675       /* Release the mutex.  */
676       pthread_mutex_unlock (&__aio_requests_mutex);
677     }
678   while (runp != NULL);
679
680   return NULL;
681 }
682
683
684 /* Free allocated resources.  */
685 libc_freeres_fn (free_res)
686 {
687   size_t row;
688
689   for (row = 0; row < pool_max_size; ++row)
690     free (pool[row]);
691
692   free (pool);
693 }
694
695
696 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
697    be correctly set to do this. Also, you had better set newrequest's
698    "running" flag to "yes" before you release your lock or you'll throw an
699    assertion. */
700 static void
701 add_request_to_runlist (struct requestlist *newrequest)
702 {
703   int prio = newrequest->aiocbp->aiocb.__abs_prio;
704   struct requestlist *runp;
705
706   if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
707     {
708       newrequest->next_run = runlist;
709       runlist = newrequest;
710     }
711   else
712     {
713       runp = runlist;
714
715       while (runp->next_run != NULL
716              && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
717         runp = runp->next_run;
718
719       newrequest->next_run = runp->next_run;
720       runp->next_run = newrequest;
721     }
722 }