Update.
[platform/upstream/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <errno.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "aio_misc.h"
29
30 /* Pool of request list entries.  */
31 static struct requestlist **pool;
32
33 /* Number of total and allocated pool entries.  */
34 static size_t pool_tab_size;
35 static size_t pool_size;
36
37 /* We implement a two dimensional array but allocate each row separately.
38    The macro below determines how many entries should be used per row.
39    It should better be a power of two.  */
40 #define ENTRIES_PER_ROW 16
41
42 /* The row table is incremented in units of this.  */
43 #define ROW_STEP        8
44
45 /* List of available entries.  */
46 static struct requestlist *freelist;
47
48 /* List of request waiting to be processed.  */
49 static struct requestlist *runlist;
50
51 /* Structure list of all currently processed requests.  */
52 static struct requestlist *requests;
53
54 /* Number of threads currently running.  */
55 static int nthreads;
56
57
58 /* These are the values used to optimize the use of AIO.  The user can
59    overwrite them by using the `aio_init' function.  */
60 static struct aioinit optim =
61 {
62   20,   /* int aio_threads;     Maximal number of threads.  */
63   256,  /* int aio_num;         Number of expected simultanious requests. */
64   0,
65   0,
66   0,
67   0,
68   { 0, }
69 };
70
71
72 /* Since the list is global we need a mutex protecting it.  */
73 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
74
75
76 /* Functions to handle request list pool.  */
77 static struct requestlist *
78 get_elem (void)
79 {
80   struct requestlist *result;
81
82   if (freelist == NULL)
83     {
84       struct requestlist *new_row;
85       size_t new_size;
86
87       /* Compute new size.  */
88       new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
89
90       if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
91         {
92           size_t new_tab_size = new_size / ENTRIES_PER_ROW;
93           struct requestlist **new_tab;
94
95           new_tab = (struct requestlist **)
96             realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
97
98           if (new_tab == NULL)
99             return NULL;
100
101           pool_tab_size = new_tab_size;
102           pool = new_tab;
103         }
104
105       if (pool_size == 0)
106         {
107           size_t cnt;
108
109           new_row = (struct requestlist *)
110             calloc (new_size, sizeof (struct requestlist));
111
112           if (new_row == NULL)
113             return NULL;
114
115           for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
116             pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
117         }
118       else
119         {
120           /* Allocat one new row.  */
121           new_row = (struct requestlist *)
122             calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
123           if (new_row == NULL)
124             return NULL;
125
126           pool[new_size / ENTRIES_PER_ROW] = new_row;
127         }
128
129       /* Put all the new entries in the freelist.  */
130       do
131         {
132           new_row->next_prio = freelist;
133           freelist = new_row++;
134         }
135       while (++pool_size < new_size);
136     }
137
138   result = freelist;
139   freelist = freelist->next_prio;
140
141   return result;
142 }
143
144
145 void
146 __aio_free_request (struct requestlist *elem)
147 {
148   elem->running = no;
149   elem->next_prio = freelist;
150   freelist = elem;
151 }
152
153
154 struct requestlist *
155 __aio_find_req (aiocb_union *elem)
156 {
157   struct requestlist *runp = requests;
158   int fildes = elem->aiocb.aio_fildes;
159
160   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
161     runp = runp->next_fd;
162
163   if (runp != NULL)
164     if (runp->aiocbp->aiocb.aio_fildes != fildes)
165       runp = NULL;
166     else
167       while (runp != NULL && runp->aiocbp != elem)
168         runp = runp->next_prio;
169
170   return runp;
171 }
172
173
174 struct requestlist *
175 __aio_find_req_fd (int fildes)
176 {
177   struct requestlist *runp = requests;
178
179   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
180     runp = runp->next_fd;
181
182   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
183           ? runp : NULL);
184 }
185
186
187 /* The thread handler.  */
188 static void *handle_fildes_io (void *arg);
189
190
191 /* User optimization.  */
192 void
193 __aio_init (const struct aioinit *init)
194 {
195   /* Get the mutex.  */
196   pthread_mutex_lock (&__aio_requests_mutex);
197
198   /* Only allow writing new values if the table is not yet allocated.  */
199   if (pool == NULL)
200     {
201       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
202       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
203                        ? ENTRIES_PER_ROW
204                        : init->aio_num & ~ENTRIES_PER_ROW);
205     }
206
207   /* Release the mutex.  */
208   pthread_mutex_unlock (&__aio_requests_mutex);
209 }
210 weak_alias (__aio_init, aio_init)
211
212
213 /* The main function of the async I/O handling.  It enqueues requests
214    and if necessary starts and handles threads.  */
215 struct requestlist *
216 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
217 {
218   int result = 0;
219   int policy, prio;
220   struct sched_param param;
221   struct requestlist *last, *runp, *newp;
222   int running = no;
223
224   if (aiocbp->aiocb.aio_reqprio < 0
225       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
226     {
227       /* Invalid priority value.  */
228       __set_errno (EINVAL);
229       aiocbp->aiocb.__error_code = EINVAL;
230       aiocbp->aiocb.__return_value = -1;
231       return NULL;
232     }
233
234   /* Compute priority for this request.  */
235   pthread_getschedparam (pthread_self (), &policy, &param);
236   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
237
238   /* Get the mutex.  */
239   pthread_mutex_lock (&__aio_requests_mutex);
240
241   last = NULL;
242   runp = requests;
243   /* First look whether the current file descriptor is currently
244      worked with.  */
245   while (runp != NULL
246          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
247     {
248       last = runp;
249       runp = runp->next_fd;
250     }
251
252   /* Get a new element for the waiting list.  */
253   newp = get_elem ();
254   if (newp == NULL)
255     {
256       __set_errno (EAGAIN);
257       pthread_mutex_unlock (&__aio_requests_mutex);
258       return NULL;
259     }
260   newp->aiocbp = aiocbp;
261   newp->waiting = NULL;
262
263   aiocbp->aiocb.__abs_prio = prio;
264   aiocbp->aiocb.__policy = policy;
265   aiocbp->aiocb.aio_lio_opcode = operation;
266   aiocbp->aiocb.__error_code = EINPROGRESS;
267   aiocbp->aiocb.__return_value = 0;
268
269   if (runp != NULL
270       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
271     {
272       /* The current file descriptor is worked on.  It makes no sense
273          to start another thread since this new thread would fight
274          with the running thread for the resources.  But we also cannot
275          say that the thread processing this desriptor shall immediately
276          after finishing the current job process this request if there
277          are other threads in the running queue which have a higher
278          priority.  */
279
280       /* Simply enqueue it after the running one according to the
281          priority.  */
282       while (runp->next_prio != NULL
283              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
284         runp = runp->next_prio;
285
286       newp->next_prio = runp->next_prio;
287       runp->next_prio = newp;
288
289       running = queued;
290     }
291   else
292     {
293       /* Enqueue this request for a new descriptor.  */
294       if (last == NULL)
295         {
296           newp->last_fd = NULL;
297           newp->next_fd = requests;
298           if (requests != NULL)
299             requests->last_fd = newp;
300           requests = newp;
301         }
302       else
303         {
304           newp->next_fd = last->next_fd;
305           newp->last_fd = last;
306           last->next_fd = newp;
307           if (newp->next_fd != NULL)
308             newp->next_fd->last_fd = newp;
309         }
310
311       newp->next_prio = NULL;
312     }
313
314   if (running == no)
315     {
316       /* We try to create a new thread for this file descriptor.  The
317          function which gets called will handle all available requests
318          for this descriptor and when all are processed it will
319          terminate.
320
321          If no new thread can be created or if the specified limit of
322          threads for AIO is reached we queue the request.  */
323
324       /* See if we can create a thread.  */
325       if (nthreads < optim.aio_threads)
326         {
327           pthread_t thid;
328           pthread_attr_t attr;
329
330           /* Make sure the thread is created detached.  */
331           pthread_attr_init (&attr);
332           pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
333
334           /* Now try to start a thread.  */
335           if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
336             {
337               /* We managed to enqueue the request.  All errors which can
338                  happen now can be recognized by calls to `aio_return' and
339                  `aio_error'.  */
340               running = allocated;
341               ++nthreads;
342             }
343           else if (nthreads == 0)
344             /* We cannot create a thread in the moment and there is
345                also no thread running.  This is a problem.  `errno' is
346                set to EAGAIN if this is only a temporary problem.  */
347             result = -1;
348         }
349     }
350
351   /* Enqueue the request in the run queue if it is not yet running.  */
352   if (running < yes && result == 0)
353     {
354       if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
355         {
356           newp->next_run = runlist;
357           runlist = newp;
358         }
359       else
360         {
361           runp = runlist;
362
363           while (runp->next_run != NULL
364                  && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
365             runp = runp->next_run;
366
367           newp->next_run = runp->next_run;
368           runp->next_run = newp;
369         }
370     }
371
372   if (result == 0)
373     newp->running = running;
374   else
375     {
376       /* Something went wrong.  */
377       __aio_free_request (newp);
378       newp = NULL;
379     }
380
381   /* Release the mutex.  */
382   pthread_mutex_unlock (&__aio_requests_mutex);
383
384   return newp;
385 }
386
387
388 static void *
389 handle_fildes_io (void *arg)
390 {
391   pthread_t self = pthread_self ();
392   struct sched_param param;
393   struct requestlist *runp = (struct requestlist *) arg;
394   aiocb_union *aiocbp;
395   int policy;
396   int fildes;
397
398   pthread_getschedparam (self, &policy, &param);
399
400   do
401     {
402       /* Update our variables.  */
403       aiocbp = runp->aiocbp;
404       fildes = aiocbp->aiocb.aio_fildes;
405
406       /* Change the priority to the requested value (if necessary).  */
407       if (aiocbp->aiocb.__abs_prio != param.sched_priority
408           || aiocbp->aiocb.__policy != policy)
409         {
410           param.sched_priority = aiocbp->aiocb.__abs_prio;
411           policy = aiocbp->aiocb.__policy;
412           pthread_setschedparam (self, policy, &param);
413         }
414
415       /* Process request pointed to by RUNP.  We must not be disturbed
416          by signals.  */
417       if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
418         {
419           if (aiocbp->aiocb.aio_lio_opcode & 128)
420             aiocbp->aiocb.__return_value =
421               TEMP_FAILURE_RETRY (__pread64 (fildes,
422                                              (void *) aiocbp->aiocb64.aio_buf,
423                                              aiocbp->aiocb64.aio_nbytes,
424                                              aiocbp->aiocb64.aio_offset));
425           else
426             aiocbp->aiocb.__return_value =
427               TEMP_FAILURE_RETRY (pread (fildes,
428                                          (void *) aiocbp->aiocb.aio_buf,
429                                          aiocbp->aiocb.aio_nbytes,
430                                          aiocbp->aiocb.aio_offset));
431         }
432       else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
433         {
434           if (aiocbp->aiocb.aio_lio_opcode & 128)
435             aiocbp->aiocb.__return_value =
436               TEMP_FAILURE_RETRY (__pwrite64 (fildes,
437                                               (const void *) aiocbp->aiocb64.aio_buf,
438                                               aiocbp->aiocb64.aio_nbytes,
439                                               aiocbp->aiocb64.aio_offset));
440           else
441             aiocbp->aiocb.__return_value =
442               TEMP_FAILURE_RETRY (pwrite (fildes,
443                                           (const void *) aiocbp->aiocb.aio_buf,
444                                           aiocbp->aiocb.aio_nbytes,
445                                           aiocbp->aiocb.aio_offset));
446         }
447       else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
448         aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
449       else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
450         aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
451       else
452         {
453           /* This is an invalid opcode.  */
454           aiocbp->aiocb.__return_value = -1;
455           __set_errno (EINVAL);
456         }
457
458       /* Get the mutex.  */
459       pthread_mutex_lock (&__aio_requests_mutex);
460
461       if (aiocbp->aiocb.__return_value == -1)
462         aiocbp->aiocb.__error_code = errno;
463       else
464         aiocbp->aiocb.__error_code = 0;
465
466       /* Send the signal to notify about finished processing of the
467          request.  */
468       __aio_notify (runp);
469
470       /* Now dequeue the current request.  */
471       if (runp->next_prio == NULL)
472         {
473           /* No outstanding request for this descriptor.  Remove this
474              descriptor from the list.  */
475           if (runp->next_fd != NULL)
476             runp->next_fd->last_fd = runp->last_fd;
477           if (runp->last_fd != NULL)
478             runp->last_fd->next_fd = runp->next_fd;
479         }
480       else
481         {
482           runp->next_prio->last_fd = runp->last_fd;
483           runp->next_prio->next_fd = runp->next_fd;
484           runp->next_prio->running = yes;
485           if (runp->next_fd != NULL)
486             runp->next_fd->last_fd = runp->next_prio;
487           if (runp->last_fd != NULL)
488             runp->last_fd->next_fd = runp->next_prio;
489         }
490
491       /* Free the old element.  */
492       __aio_free_request (runp);
493
494       runp = runlist;
495       if (runp != NULL)
496         {
497           /* We must not run requests which are not marked `running'.  */
498           if (runp->running == yes)
499             runlist = runp->next_run;
500           else
501             {
502               struct requestlist *old;
503
504               do
505                 {
506                   old = runp;
507                   runp = runp->next_run;
508                 }
509               while (runp != NULL && runp->running != yes);
510
511               if (runp != NULL)
512                 old->next_run = runp->next_run;
513             }
514         }
515
516       /* If no request to work on we will stop the thread.  */
517       if (runp == NULL)
518         --nthreads;
519       else
520         runp->running = allocated;
521
522       /* Release the mutex.  */
523       pthread_mutex_unlock (&__aio_requests_mutex);
524     }
525   while (runp != NULL);
526
527   pthread_exit (NULL);
528 }
529
530
531 /* Free allocated resources.  */
532 static void
533 __attribute__ ((unused))
534 free_res (void)
535 {
536   size_t row;
537
538   /* The first block of rows as specified in OPTIM is allocated in
539      one chunk.  */
540   free (pool[0]);
541
542   for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
543     free (pool[row]);
544
545   free (pool);
546 }
547
548 text_set_element (__libc_subfreeres, free_res);