Update.
[platform/upstream/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998, 1999, 2000, 2001 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <sys/stat.h>
29 #include <sys/time.h>
30
31 #include "aio_misc.h"
32
33 static void add_request_to_runlist (struct requestlist *newrequest);
34
35 /* Pool of request list entries.  */
36 static struct requestlist **pool;
37
38 /* Number of total and allocated pool entries.  */
39 static size_t pool_max_size;
40 static size_t pool_size;
41
42 /* We implement a two dimensional array but allocate each row separately.
43    The macro below determines how many entries should be used per row.
44    It should better be a power of two.  */
45 #define ENTRIES_PER_ROW 32
46
47 /* How many rows we allocate at once.  */
48 #define ROWS_STEP       8
49
50 /* List of available entries.  */
51 static struct requestlist *freelist;
52
53 /* List of request waiting to be processed.  */
54 static struct requestlist *runlist;
55
56 /* Structure list of all currently processed requests.  */
57 static struct requestlist *requests;
58
59 /* Number of threads currently running.  */
60 static int nthreads;
61
62 /* Number of threads waiting for work to arrive. */
63 static int idle_thread_count;
64
65
66 /* These are the values used to optimize the use of AIO.  The user can
67    overwrite them by using the `aio_init' function.  */
68 static struct aioinit optim =
69 {
70   20,   /* int aio_threads;     Maximal number of threads.  */
71   64,   /* int aio_num;         Number of expected simultanious requests. */
72   0,
73   0,
74   0,
75   0,
76   1,
77   0
78 };
79
80
81 /* Since the list is global we need a mutex protecting it.  */
82 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
83
84 /* When you add a request to the list and there are idle threads present,
85    you signal this condition variable. When a thread finishes work, it waits
86    on this condition variable for a time before it actually exits. */
87 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
88
89
90 /* Functions to handle request list pool.  */
91 static struct requestlist *
92 get_elem (void)
93 {
94   struct requestlist *result;
95
96   if (freelist == NULL)
97     {
98       struct requestlist *new_row;
99       int cnt;
100
101       assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
102
103       if (pool_size + 1 >= pool_max_size)
104         {
105           size_t new_max_size = pool_max_size + ROWS_STEP;
106           struct requestlist **new_tab;
107
108           new_tab = (struct requestlist **)
109             realloc (pool, new_max_size * sizeof (struct requestlist *));
110
111           if (new_tab == NULL)
112             return NULL;
113
114           pool_max_size = new_max_size;
115           pool = new_tab;
116         }
117
118       /* Allocate the new row.  */
119       cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
120       new_row = (struct requestlist *) calloc (cnt,
121                                                sizeof (struct requestlist));
122       if (new_row == NULL)
123         return NULL;
124
125       pool[pool_size++] = new_row;
126
127       /* Put all the new entries in the freelist.  */
128       do
129         {
130           new_row->next_prio = freelist;
131           freelist = new_row++;
132         }
133       while (--cnt > 0);
134     }
135
136   result = freelist;
137   freelist = freelist->next_prio;
138
139   return result;
140 }
141
142
143 void
144 internal_function
145 __aio_free_request (struct requestlist *elem)
146 {
147   elem->running = no;
148   elem->next_prio = freelist;
149   freelist = elem;
150 }
151
152
153 struct requestlist *
154 internal_function
155 __aio_find_req (aiocb_union *elem)
156 {
157   struct requestlist *runp = requests;
158   int fildes = elem->aiocb.aio_fildes;
159
160   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
161     runp = runp->next_fd;
162
163   if (runp != NULL)
164     {
165       if (runp->aiocbp->aiocb.aio_fildes != fildes)
166         runp = NULL;
167       else
168         while (runp != NULL && runp->aiocbp != elem)
169           runp = runp->next_prio;
170     }
171
172   return runp;
173 }
174
175
176 struct requestlist *
177 internal_function
178 __aio_find_req_fd (int fildes)
179 {
180   struct requestlist *runp = requests;
181
182   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
183     runp = runp->next_fd;
184
185   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
186           ? runp : NULL);
187 }
188
189
190 void
191 internal_function
192 __aio_remove_request (struct requestlist *last, struct requestlist *req,
193                       int all)
194 {
195   assert (req->running == yes || req->running == queued
196           || req->running == done);
197
198   if (last != NULL)
199     last->next_prio = all ? NULL : req->next_prio;
200   else
201     {
202       if (all || req->next_prio == NULL)
203         {
204           if (req->last_fd != NULL)
205             req->last_fd->next_fd = req->next_fd;
206           else
207             requests = req->next_fd;
208           if (req->next_fd != NULL)
209             req->next_fd->last_fd = req->last_fd;
210         }
211       else
212         {
213           if (req->last_fd != NULL)
214             req->last_fd->next_fd = req->next_prio;
215           else
216             requests = req->next_prio;
217
218           if (req->next_fd != NULL)
219             req->next_fd->last_fd = req->next_prio;
220
221           req->next_prio->last_fd = req->last_fd;
222           req->next_prio->next_fd = req->next_fd;
223
224           /* Mark this entry as runnable.  */
225           req->next_prio->running = yes;
226         }
227
228       if (req->running == yes)
229         {
230           struct requestlist *runp = runlist;
231
232           last = NULL;
233           while (runp != NULL)
234             {
235               if (runp == req)
236                 {
237                   if (last == NULL)
238                     runlist = runp->next_run;
239                   else
240                     last->next_run = runp->next_run;
241                   break;
242                 }
243               last = runp;
244               runp = runp->next_run;
245             }
246         }
247     }
248 }
249
250
251 /* The thread handler.  */
252 static void *handle_fildes_io (void *arg);
253
254
255 /* User optimization.  */
256 void
257 __aio_init (const struct aioinit *init)
258 {
259   /* Get the mutex.  */
260   pthread_mutex_lock (&__aio_requests_mutex);
261
262   /* Only allow writing new values if the table is not yet allocated.  */
263   if (pool == NULL)
264     {
265       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
266       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
267                        ? ENTRIES_PER_ROW
268                        : init->aio_num & ~ENTRIES_PER_ROW);
269     }
270
271   if (init->aio_idle_time != 0)
272     optim.aio_idle_time = init->aio_idle_time;
273
274   /* Release the mutex.  */
275   pthread_mutex_unlock (&__aio_requests_mutex);
276 }
277 weak_alias (__aio_init, aio_init)
278
279
280 /* The main function of the async I/O handling.  It enqueues requests
281    and if necessary starts and handles threads.  */
282 struct requestlist *
283 internal_function
284 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
285 {
286   int result = 0;
287   int policy, prio;
288   struct sched_param param;
289   struct requestlist *last, *runp, *newp;
290   int running = no;
291
292   if (operation == LIO_SYNC || operation == LIO_DSYNC)
293     aiocbp->aiocb.aio_reqprio = 0;
294   else if (aiocbp->aiocb.aio_reqprio < 0
295            || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
296     {
297       /* Invalid priority value.  */
298       __set_errno (EINVAL);
299       aiocbp->aiocb.__error_code = EINVAL;
300       aiocbp->aiocb.__return_value = -1;
301       return NULL;
302     }
303
304   /* Compute priority for this request.  */
305   pthread_getschedparam (pthread_self (), &policy, &param);
306   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
307
308   /* Get the mutex.  */
309   pthread_mutex_lock (&__aio_requests_mutex);
310
311   last = NULL;
312   runp = requests;
313   /* First look whether the current file descriptor is currently
314      worked with.  */
315   while (runp != NULL
316          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
317     {
318       last = runp;
319       runp = runp->next_fd;
320     }
321
322   /* Get a new element for the waiting list.  */
323   newp = get_elem ();
324   if (newp == NULL)
325     {
326       pthread_mutex_unlock (&__aio_requests_mutex);
327       __set_errno (EAGAIN);
328       return NULL;
329     }
330   newp->aiocbp = aiocbp;
331   newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL
332                       ? getpid () : 0);
333   newp->waiting = NULL;
334
335   aiocbp->aiocb.__abs_prio = prio;
336   aiocbp->aiocb.__policy = policy;
337   aiocbp->aiocb.aio_lio_opcode = operation;
338   aiocbp->aiocb.__error_code = EINPROGRESS;
339   aiocbp->aiocb.__return_value = 0;
340
341   if (runp != NULL
342       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
343     {
344       /* The current file descriptor is worked on.  It makes no sense
345          to start another thread since this new thread would fight
346          with the running thread for the resources.  But we also cannot
347          say that the thread processing this desriptor shall immediately
348          after finishing the current job process this request if there
349          are other threads in the running queue which have a higher
350          priority.  */
351
352       /* Simply enqueue it after the running one according to the
353          priority.  */
354       while (runp->next_prio != NULL
355              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
356         runp = runp->next_prio;
357
358       newp->next_prio = runp->next_prio;
359       runp->next_prio = newp;
360
361       running = queued;
362     }
363   else
364     {
365       running = yes;
366       /* Enqueue this request for a new descriptor.  */
367       if (last == NULL)
368         {
369           newp->last_fd = NULL;
370           newp->next_fd = requests;
371           if (requests != NULL)
372             requests->last_fd = newp;
373           requests = newp;
374         }
375       else
376         {
377           newp->next_fd = last->next_fd;
378           newp->last_fd = last;
379           last->next_fd = newp;
380           if (newp->next_fd != NULL)
381             newp->next_fd->last_fd = newp;
382         }
383
384       newp->next_prio = NULL;
385     }
386
387   if (running == yes)
388     {
389       /* We try to create a new thread for this file descriptor.  The
390          function which gets called will handle all available requests
391          for this descriptor and when all are processed it will
392          terminate.
393
394          If no new thread can be created or if the specified limit of
395          threads for AIO is reached we queue the request.  */
396
397       /* See if we need to and are able to create a thread.  */
398       if (nthreads < optim.aio_threads && idle_thread_count == 0)
399         {
400           pthread_t thid;
401           pthread_attr_t attr;
402
403           /* Make sure the thread is created detached.  */
404           pthread_attr_init (&attr);
405           pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
406
407           running = newp->running = allocated;
408
409           /* Now try to start a thread.  */
410           if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
411             /* We managed to enqueue the request.  All errors which can
412                happen now can be recognized by calls to `aio_return' and
413                `aio_error'.  */
414             ++nthreads;
415           else
416             {
417               /* Reset the running flag.  The new request is not running.  */
418               running = newp->running = yes;
419
420               if (nthreads == 0)
421                 /* We cannot create a thread in the moment and there is
422                    also no thread running.  This is a problem.  `errno' is
423                    set to EAGAIN if this is only a temporary problem.  */
424                 result = -1;
425             }
426         }
427     }
428
429   /* Enqueue the request in the run queue if it is not yet running.  */
430   if (running == yes && result == 0)
431     {
432       add_request_to_runlist (newp);
433
434       /* If there is a thread waiting for work, then let it know that we
435          have just given it something to do. */
436       if (idle_thread_count > 0)
437         pthread_cond_signal (&__aio_new_request_notification);
438     }
439
440   if (result == 0)
441     newp->running = running;
442   else
443     {
444       /* Something went wrong.  */
445       __aio_free_request (newp);
446       newp = NULL;
447     }
448
449   /* Release the mutex.  */
450   pthread_mutex_unlock (&__aio_requests_mutex);
451
452   return newp;
453 }
454
455
456 static void *
457 __attribute__ ((noreturn))
458 handle_fildes_io (void *arg)
459 {
460   pthread_t self = pthread_self ();
461   struct sched_param param;
462   struct requestlist *runp = (struct requestlist *) arg;
463   aiocb_union *aiocbp;
464   int policy;
465   int fildes;
466
467   pthread_getschedparam (self, &policy, &param);
468
469   do
470     {
471       /* If runp is NULL, then we were created to service the work queue
472          in general, not to handle any particular request. In that case we
473          skip the "do work" stuff on the first pass, and go directly to the
474          "get work off the work queue" part of this loop, which is near the
475          end. */
476       if (runp == NULL)
477         pthread_mutex_lock (&__aio_requests_mutex);
478       else
479         {
480           /* Hopefully this request is marked as running.  */
481           assert (runp->running == allocated);
482
483           /* Update our variables.  */
484           aiocbp = runp->aiocbp;
485           fildes = aiocbp->aiocb.aio_fildes;
486
487           /* Change the priority to the requested value (if necessary).  */
488           if (aiocbp->aiocb.__abs_prio != param.sched_priority
489               || aiocbp->aiocb.__policy != policy)
490             {
491               param.sched_priority = aiocbp->aiocb.__abs_prio;
492               policy = aiocbp->aiocb.__policy;
493               pthread_setschedparam (self, policy, &param);
494             }
495
496           /* Process request pointed to by RUNP.  We must not be disturbed
497              by signals.  */
498           if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
499             {
500               if (aiocbp->aiocb.aio_lio_opcode & 128)
501                 aiocbp->aiocb.__return_value =
502                   TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
503                                                  aiocbp->aiocb64.aio_buf,
504                                                  aiocbp->aiocb64.aio_nbytes,
505                                                  aiocbp->aiocb64.aio_offset));
506               else
507                 aiocbp->aiocb.__return_value =
508                   TEMP_FAILURE_RETRY (pread (fildes,
509                                              (void *) aiocbp->aiocb.aio_buf,
510                                              aiocbp->aiocb.aio_nbytes,
511                                              aiocbp->aiocb.aio_offset));
512
513               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
514                 /* The Linux kernel is different from others.  It returns
515                    ESPIPE if using pread on a socket.  Other platforms
516                    simply ignore the offset parameter and behave like
517                    read.  */
518                 aiocbp->aiocb.__return_value =
519                   TEMP_FAILURE_RETRY (read (fildes,
520                                             (void *) aiocbp->aiocb64.aio_buf,
521                                             aiocbp->aiocb64.aio_nbytes));
522             }
523           else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
524             {
525               if (aiocbp->aiocb.aio_lio_opcode & 128)
526                 aiocbp->aiocb.__return_value =
527                   TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
528                                                   aiocbp->aiocb64.aio_buf,
529                                                   aiocbp->aiocb64.aio_nbytes,
530                                                   aiocbp->aiocb64.aio_offset));
531               else
532                 aiocbp->aiocb.__return_value =
533                   TEMP_FAILURE_RETRY (pwrite (fildes, (const void *)
534                                               aiocbp->aiocb.aio_buf,
535                                               aiocbp->aiocb.aio_nbytes,
536                                               aiocbp->aiocb.aio_offset));
537
538               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
539                 /* The Linux kernel is different from others.  It returns
540                    ESPIPE if using pwrite on a socket.  Other platforms
541                    simply ignore the offset parameter and behave like
542                    write.  */
543                 aiocbp->aiocb.__return_value =
544                   TEMP_FAILURE_RETRY (write (fildes,
545                                              (void *) aiocbp->aiocb64.aio_buf,
546                                              aiocbp->aiocb64.aio_nbytes));
547             }
548           else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
549             aiocbp->aiocb.__return_value =
550               TEMP_FAILURE_RETRY (fdatasync (fildes));
551           else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
552             aiocbp->aiocb.__return_value =
553               TEMP_FAILURE_RETRY (fsync (fildes));
554           else
555             {
556               /* This is an invalid opcode.  */
557               aiocbp->aiocb.__return_value = -1;
558               __set_errno (EINVAL);
559             }
560
561           /* Get the mutex.  */
562           pthread_mutex_lock (&__aio_requests_mutex);
563
564           /* In theory we would need here a write memory barrier since the
565              callers test using aio_error() whether the request finished
566              and once this value != EINPROGRESS the field __return_value
567              must be committed to memory.
568
569              But since the pthread_mutex_lock call involves write memory
570              barriers as well it is not necessary.  */
571
572           if (aiocbp->aiocb.__return_value == -1)
573             aiocbp->aiocb.__error_code = errno;
574           else
575             aiocbp->aiocb.__error_code = 0;
576
577           /* Send the signal to notify about finished processing of the
578              request.  */
579           __aio_notify (runp);
580
581           /* For debugging purposes we reset the running flag of the
582              finished request.  */
583           assert (runp->running == allocated);
584           runp->running = done;
585
586           /* Now dequeue the current request.  */
587           __aio_remove_request (NULL, runp, 0);
588           if (runp->next_prio != NULL)
589             add_request_to_runlist (runp->next_prio);
590
591           /* Free the old element.  */
592           __aio_free_request (runp);
593         }
594
595       runp = runlist;
596
597       /* If the runlist is empty, then we sleep for a while, waiting for
598          something to arrive in it. */
599       if (runp == NULL && optim.aio_idle_time >= 0)
600         {
601           struct timeval now;
602           struct timespec wakeup_time;
603
604           ++idle_thread_count;
605           gettimeofday (&now, NULL);
606           wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
607           wakeup_time.tv_nsec = now.tv_usec * 1000;
608           if (wakeup_time.tv_nsec > 1000000000)
609             {
610               wakeup_time.tv_nsec -= 1000000000;
611               ++wakeup_time.tv_sec;
612             }
613           pthread_cond_timedwait (&__aio_new_request_notification,
614                                   &__aio_requests_mutex,
615                                   &wakeup_time);
616           --idle_thread_count;
617           runp = runlist;
618         }
619
620       if (runp == NULL)
621         --nthreads;
622       else
623         {
624           assert (runp->running == yes);
625           runp->running = allocated;
626           runlist = runp->next_run;
627
628           /* If we have a request to process, and there's still another in
629              the run list, then we need to either wake up or create a new
630              thread to service the request that is still in the run list. */
631           if (runlist != NULL)
632             {
633               /* There are at least two items in the work queue to work on.
634                  If there are other idle threads, then we should wake them
635                  up for these other work elements; otherwise, we should try
636                  to create a new thread. */
637               if (idle_thread_count > 0)
638                 pthread_cond_signal (&__aio_new_request_notification);
639               else if (nthreads < optim.aio_threads)
640                 {
641                   pthread_t thid;
642                   pthread_attr_t attr;
643
644                   /* Make sure the thread is created detached.  */
645                   pthread_attr_init (&attr);
646                   pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
647
648                   /* Now try to start a thread. If we fail, no big deal,
649                      because we know that there is at least one thread (us)
650                      that is working on AIO operations. */
651                   if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
652                       == 0)
653                     ++nthreads;
654                 }
655             }
656         }
657
658       /* Release the mutex.  */
659       pthread_mutex_unlock (&__aio_requests_mutex);
660     }
661   while (runp != NULL);
662
663   pthread_exit (NULL);
664 }
665
666
667 /* Free allocated resources.  */
668 static void
669 __attribute__ ((unused))
670 free_res (void)
671 {
672   size_t row;
673
674   for (row = 0; row < pool_max_size; ++row)
675     free (pool[row]);
676
677   free (pool);
678 }
679 text_set_element (__libc_subfreeres, free_res);
680
681
682 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
683    be correctly set to do this. Also, you had better set newrequest's
684    "running" flag to "yes" before you release your lock or you'll throw an
685    assertion. */
686 static void
687 add_request_to_runlist (struct requestlist *newrequest)
688 {
689   int prio = newrequest->aiocbp->aiocb.__abs_prio;
690   struct requestlist *runp;
691
692   if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
693     {
694       newrequest->next_run = runlist;
695       runlist = newrequest;
696     }
697   else
698     {
699       runp = runlist;
700
701       while (runp->next_run != NULL
702              && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
703         runp = runp->next_run;
704
705       newrequest->next_run = runp->next_run;
706       runp->next_run = newrequest;
707     }
708 }