Update.
[platform/upstream/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998, 1999 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <errno.h>
23 #include <limits.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28
29 #include "aio_misc.h"
30
31 /* Pool of request list entries.  */
32 static struct requestlist **pool;
33
34 /* Number of total and allocated pool entries.  */
35 static size_t pool_tab_size;
36 static size_t pool_size;
37
38 /* We implement a two dimensional array but allocate each row separately.
39    The macro below determines how many entries should be used per row.
40    It should better be a power of two.  */
41 #define ENTRIES_PER_ROW 16
42
43 /* The row table is incremented in units of this.  */
44 #define ROW_STEP        8
45
46 /* List of available entries.  */
47 static struct requestlist *freelist;
48
49 /* List of request waiting to be processed.  */
50 static struct requestlist *runlist;
51
52 /* Structure list of all currently processed requests.  */
53 static struct requestlist *requests;
54
55 /* Number of threads currently running.  */
56 static int nthreads;
57
58
59 /* These are the values used to optimize the use of AIO.  The user can
60    overwrite them by using the `aio_init' function.  */
61 static struct aioinit optim =
62 {
63   20,   /* int aio_threads;     Maximal number of threads.  */
64   256,  /* int aio_num;         Number of expected simultanious requests. */
65   0,
66   0,
67   0,
68   0,
69   { 0, }
70 };
71
72
73 /* Since the list is global we need a mutex protecting it.  */
74 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
75
76
77 /* Functions to handle request list pool.  */
78 static struct requestlist *
79 get_elem (void)
80 {
81   struct requestlist *result;
82
83   if (freelist == NULL)
84     {
85       struct requestlist *new_row;
86       size_t new_size;
87
88       /* Compute new size.  */
89       new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
90
91       if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
92         {
93           size_t new_tab_size = new_size / ENTRIES_PER_ROW;
94           struct requestlist **new_tab;
95
96           new_tab = (struct requestlist **)
97             realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
98
99           if (new_tab == NULL)
100             return NULL;
101
102           pool_tab_size = new_tab_size;
103           pool = new_tab;
104         }
105
106       if (pool_size == 0)
107         {
108           size_t cnt;
109
110           new_row = (struct requestlist *)
111             calloc (new_size, sizeof (struct requestlist));
112
113           if (new_row == NULL)
114             return NULL;
115
116           for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
117             pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
118         }
119       else
120         {
121           /* Allocat one new row.  */
122           new_row = (struct requestlist *)
123             calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
124           if (new_row == NULL)
125             return NULL;
126
127           pool[new_size / ENTRIES_PER_ROW] = new_row;
128         }
129
130       /* Put all the new entries in the freelist.  */
131       do
132         {
133           new_row->next_prio = freelist;
134           freelist = new_row++;
135         }
136       while (++pool_size < new_size);
137     }
138
139   result = freelist;
140   freelist = freelist->next_prio;
141
142   return result;
143 }
144
145
146 void
147 internal_function
148 __aio_free_request (struct requestlist *elem)
149 {
150   elem->running = no;
151   elem->next_prio = freelist;
152   freelist = elem;
153 }
154
155
156 struct requestlist *
157 internal_function
158 __aio_find_req (aiocb_union *elem)
159 {
160   struct requestlist *runp = requests;
161   int fildes = elem->aiocb.aio_fildes;
162
163   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
164     runp = runp->next_fd;
165
166   if (runp != NULL)
167     {
168       if (runp->aiocbp->aiocb.aio_fildes != fildes)
169         runp = NULL;
170       else
171         while (runp != NULL && runp->aiocbp != elem)
172           runp = runp->next_prio;
173     }
174
175   return runp;
176 }
177
178
179 struct requestlist *
180 internal_function
181 __aio_find_req_fd (int fildes)
182 {
183   struct requestlist *runp = requests;
184
185   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
186     runp = runp->next_fd;
187
188   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
189           ? runp : NULL);
190 }
191
192
193 /* The thread handler.  */
194 static void *handle_fildes_io (void *arg);
195
196
197 /* User optimization.  */
198 void
199 __aio_init (const struct aioinit *init)
200 {
201   /* Get the mutex.  */
202   pthread_mutex_lock (&__aio_requests_mutex);
203
204   /* Only allow writing new values if the table is not yet allocated.  */
205   if (pool == NULL)
206     {
207       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
208       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
209                        ? ENTRIES_PER_ROW
210                        : init->aio_num & ~ENTRIES_PER_ROW);
211     }
212
213   /* Release the mutex.  */
214   pthread_mutex_unlock (&__aio_requests_mutex);
215 }
216 weak_alias (__aio_init, aio_init)
217
218
219 /* The main function of the async I/O handling.  It enqueues requests
220    and if necessary starts and handles threads.  */
221 struct requestlist *
222 internal_function
223 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
224 {
225   int result = 0;
226   int policy, prio;
227   struct sched_param param;
228   struct requestlist *last, *runp, *newp;
229   int running = no;
230
231   if (aiocbp->aiocb.aio_reqprio < 0
232       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
233     {
234       /* Invalid priority value.  */
235       __set_errno (EINVAL);
236       aiocbp->aiocb.__error_code = EINVAL;
237       aiocbp->aiocb.__return_value = -1;
238       return NULL;
239     }
240
241   /* Compute priority for this request.  */
242   pthread_getschedparam (pthread_self (), &policy, &param);
243   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
244
245   /* Get the mutex.  */
246   pthread_mutex_lock (&__aio_requests_mutex);
247
248   last = NULL;
249   runp = requests;
250   /* First look whether the current file descriptor is currently
251      worked with.  */
252   while (runp != NULL
253          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
254     {
255       last = runp;
256       runp = runp->next_fd;
257     }
258
259   /* Get a new element for the waiting list.  */
260   newp = get_elem ();
261   if (newp == NULL)
262     {
263       pthread_mutex_unlock (&__aio_requests_mutex);
264       __set_errno (EAGAIN);
265       return NULL;
266     }
267   newp->aiocbp = aiocbp;
268   newp->waiting = NULL;
269
270   aiocbp->aiocb.__abs_prio = prio;
271   aiocbp->aiocb.__policy = policy;
272   aiocbp->aiocb.aio_lio_opcode = operation;
273   aiocbp->aiocb.__error_code = EINPROGRESS;
274   aiocbp->aiocb.__return_value = 0;
275
276   if (runp != NULL
277       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
278     {
279       /* The current file descriptor is worked on.  It makes no sense
280          to start another thread since this new thread would fight
281          with the running thread for the resources.  But we also cannot
282          say that the thread processing this desriptor shall immediately
283          after finishing the current job process this request if there
284          are other threads in the running queue which have a higher
285          priority.  */
286
287       /* Simply enqueue it after the running one according to the
288          priority.  */
289       while (runp->next_prio != NULL
290              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
291         runp = runp->next_prio;
292
293       newp->next_prio = runp->next_prio;
294       runp->next_prio = newp;
295
296       running = queued;
297     }
298   else
299     {
300       /* Enqueue this request for a new descriptor.  */
301       if (last == NULL)
302         {
303           newp->last_fd = NULL;
304           newp->next_fd = requests;
305           if (requests != NULL)
306             requests->last_fd = newp;
307           requests = newp;
308         }
309       else
310         {
311           newp->next_fd = last->next_fd;
312           newp->last_fd = last;
313           last->next_fd = newp;
314           if (newp->next_fd != NULL)
315             newp->next_fd->last_fd = newp;
316         }
317
318       newp->next_prio = NULL;
319     }
320
321   if (running == no)
322     {
323       /* We try to create a new thread for this file descriptor.  The
324          function which gets called will handle all available requests
325          for this descriptor and when all are processed it will
326          terminate.
327
328          If no new thread can be created or if the specified limit of
329          threads for AIO is reached we queue the request.  */
330
331       /* See if we can create a thread.  */
332       if (nthreads < optim.aio_threads)
333         {
334           pthread_t thid;
335           pthread_attr_t attr;
336
337           /* Make sure the thread is created detached.  */
338           pthread_attr_init (&attr);
339           pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
340
341           /* Now try to start a thread.  */
342           if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
343             {
344               /* We managed to enqueue the request.  All errors which can
345                  happen now can be recognized by calls to `aio_return' and
346                  `aio_error'.  */
347               running = allocated;
348               ++nthreads;
349             }
350           else if (nthreads == 0)
351             /* We cannot create a thread in the moment and there is
352                also no thread running.  This is a problem.  `errno' is
353                set to EAGAIN if this is only a temporary problem.  */
354             result = -1;
355         }
356     }
357
358   /* Enqueue the request in the run queue if it is not yet running.  */
359   if (running < yes && result == 0)
360     {
361       if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
362         {
363           newp->next_run = runlist;
364           runlist = newp;
365         }
366       else
367         {
368           runp = runlist;
369
370           while (runp->next_run != NULL
371                  && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
372             runp = runp->next_run;
373
374           newp->next_run = runp->next_run;
375           runp->next_run = newp;
376         }
377     }
378
379   if (result == 0)
380     newp->running = running;
381   else
382     {
383       /* Something went wrong.  */
384       __aio_free_request (newp);
385       newp = NULL;
386     }
387
388   /* Release the mutex.  */
389   pthread_mutex_unlock (&__aio_requests_mutex);
390
391   return newp;
392 }
393
394
395 static void *
396 handle_fildes_io (void *arg)
397 {
398   pthread_t self = pthread_self ();
399   struct sched_param param;
400   struct requestlist *runp = (struct requestlist *) arg;
401   aiocb_union *aiocbp;
402   int policy;
403   int fildes;
404
405   pthread_getschedparam (self, &policy, &param);
406
407   do
408     {
409       /* Update our variables.  */
410       aiocbp = runp->aiocbp;
411       fildes = aiocbp->aiocb.aio_fildes;
412
413       /* Change the priority to the requested value (if necessary).  */
414       if (aiocbp->aiocb.__abs_prio != param.sched_priority
415           || aiocbp->aiocb.__policy != policy)
416         {
417           param.sched_priority = aiocbp->aiocb.__abs_prio;
418           policy = aiocbp->aiocb.__policy;
419           pthread_setschedparam (self, policy, &param);
420         }
421
422       /* Process request pointed to by RUNP.  We must not be disturbed
423          by signals.  */
424       if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
425         {
426           if (aiocbp->aiocb.aio_lio_opcode & 128)
427             aiocbp->aiocb.__return_value =
428               TEMP_FAILURE_RETRY (__pread64 (fildes,
429                                              (void *) aiocbp->aiocb64.aio_buf,
430                                              aiocbp->aiocb64.aio_nbytes,
431                                              aiocbp->aiocb64.aio_offset));
432           else
433             aiocbp->aiocb.__return_value =
434               TEMP_FAILURE_RETRY (pread (fildes,
435                                          (void *) aiocbp->aiocb.aio_buf,
436                                          aiocbp->aiocb.aio_nbytes,
437                                          aiocbp->aiocb.aio_offset));
438
439           if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
440             /* The Linux kernel is different from others.  It returns
441                ESPIPE if using pread on a socket.  Other platforms
442                simply ignore the offset parameter and behave like
443                read.  */
444             aiocbp->aiocb.__return_value =
445               TEMP_FAILURE_RETRY (read (fildes,
446                                         (void *) aiocbp->aiocb64.aio_buf,
447                                         aiocbp->aiocb64.aio_nbytes));
448         }
449       else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
450         {
451           if (aiocbp->aiocb.aio_lio_opcode & 128)
452             aiocbp->aiocb.__return_value =
453               TEMP_FAILURE_RETRY (__pwrite64 (fildes,
454                                               (const void *) aiocbp->aiocb64.aio_buf,
455                                               aiocbp->aiocb64.aio_nbytes,
456                                               aiocbp->aiocb64.aio_offset));
457           else
458             aiocbp->aiocb.__return_value =
459               TEMP_FAILURE_RETRY (pwrite (fildes,
460                                           (const void *) aiocbp->aiocb.aio_buf,
461                                           aiocbp->aiocb.aio_nbytes,
462                                           aiocbp->aiocb.aio_offset));
463
464           if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
465             /* The Linux kernel is different from others.  It returns
466                ESPIPE if using pwrite on a socket.  Other platforms
467                simply ignore the offset parameter and behave like
468                write.  */
469             aiocbp->aiocb.__return_value =
470               TEMP_FAILURE_RETRY (write (fildes,
471                                          (void *) aiocbp->aiocb64.aio_buf,
472                                          aiocbp->aiocb64.aio_nbytes));
473         }
474       else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
475         aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
476       else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
477         aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
478       else
479         {
480           /* This is an invalid opcode.  */
481           aiocbp->aiocb.__return_value = -1;
482           __set_errno (EINVAL);
483         }
484
485       /* Get the mutex.  */
486       pthread_mutex_lock (&__aio_requests_mutex);
487
488       if (aiocbp->aiocb.__return_value == -1)
489         aiocbp->aiocb.__error_code = errno;
490       else
491         aiocbp->aiocb.__error_code = 0;
492
493       /* Send the signal to notify about finished processing of the
494          request.  */
495       __aio_notify (runp);
496
497       /* Now dequeue the current request.  */
498       if (runp->next_prio == NULL)
499         {
500           /* No outstanding request for this descriptor.  Remove this
501              descriptor from the list.  */
502           if (runp->next_fd != NULL)
503             runp->next_fd->last_fd = runp->last_fd;
504           if (runp->last_fd != NULL)
505             runp->last_fd->next_fd = runp->next_fd;
506           else
507             requests = runp->next_fd;
508         }
509       else
510         {
511           runp->next_prio->last_fd = runp->last_fd;
512           runp->next_prio->next_fd = runp->next_fd;
513           runp->next_prio->running = yes;
514           if (runp->next_fd != NULL)
515             runp->next_fd->last_fd = runp->next_prio;
516           if (runp->last_fd != NULL)
517             runp->last_fd->next_fd = runp->next_prio;
518           else
519             requests = runp->next_prio;
520         }
521
522       /* Free the old element.  */
523       __aio_free_request (runp);
524
525       runp = runlist;
526       if (runp != NULL)
527         {
528           /* We must not run requests which are not marked `running'.  */
529           if (runp->running == yes)
530             runlist = runp->next_run;
531           else
532             {
533               struct requestlist *old;
534
535               do
536                 {
537                   old = runp;
538                   runp = runp->next_run;
539                 }
540               while (runp != NULL && runp->running != yes);
541
542               if (runp != NULL)
543                 old->next_run = runp->next_run;
544             }
545         }
546
547       /* If no request to work on we will stop the thread.  */
548       if (runp == NULL)
549         --nthreads;
550       else
551         runp->running = allocated;
552
553       /* Release the mutex.  */
554       pthread_mutex_unlock (&__aio_requests_mutex);
555     }
556   while (runp != NULL);
557
558   pthread_exit (NULL);
559 }
560
561
562 /* Free allocated resources.  */
563 static void
564 __attribute__ ((unused))
565 free_res (void)
566 {
567   size_t row;
568
569   /* The first block of rows as specified in OPTIM is allocated in
570      one chunk.  */
571   free (pool[0]);
572
573   for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
574     free (pool[row]);
575
576   free (pool);
577 }
578
579 text_set_element (__libc_subfreeres, free_res);