1 /* Handle general operations.
2 Copyright (C) 1997, 1998 Free Software Foundation, Inc.
3 This file is part of the GNU C Library.
4 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
6 The GNU C Library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Library General Public License as
8 published by the Free Software Foundation; either version 2 of the
9 License, or (at your option) any later version.
11 The GNU C Library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Library General Public License for more details.
16 You should have received a copy of the GNU Library General Public
17 License along with the GNU C Library; see the file COPYING.LIB. If not,
18 write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 Boston, MA 02111-1307, USA. */
30 /* Pool of request list entries. */
31 static struct requestlist **pool;
33 /* Number of total and allocated pool entries. */
34 static size_t pool_tab_size;
35 static size_t pool_size;
37 /* We implement a two dimensional array but allocate each row separately.
38 The macro below determines how many entries should be used per row.
39 It should better be a power of two. */
40 #define ENTRIES_PER_ROW 16
42 /* The row table is incremented in units of this. */
45 /* List of available entries. */
46 static struct requestlist *freelist;
48 /* List of request waiting to be processed. */
49 static struct requestlist *runlist;
51 /* Structure list of all currently processed requests. */
52 static struct requestlist *requests;
54 /* Number of threads currently running. */
58 /* These are the values used to optimize the use of AIO. The user can
59 overwrite them by using the `aio_init' function. */
60 static struct aioinit optim =
62 20, /* int aio_threads; Maximal number of threads. */
63 256, /* int aio_num; Number of expected simultanious requests. */
72 /* Since the list is global we need a mutex protecting it. */
73 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
76 /* Functions to handle request list pool. */
77 static struct requestlist *
80 struct requestlist *result;
84 struct requestlist *new_row;
87 /* Compute new size. */
88 new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
90 if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
92 size_t new_tab_size = new_size / ENTRIES_PER_ROW;
93 struct requestlist **new_tab;
95 new_tab = (struct requestlist **)
96 realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
101 pool_tab_size = new_tab_size;
109 new_row = (struct requestlist *)
110 calloc (new_size, sizeof (struct requestlist));
115 for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
116 pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
120 /* Allocat one new row. */
121 new_row = (struct requestlist *)
122 calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
126 pool[new_size / ENTRIES_PER_ROW] = new_row;
129 /* Put all the new entries in the freelist. */
132 new_row->next_prio = freelist;
133 freelist = new_row++;
135 while (++pool_size < new_size);
139 freelist = freelist->next_prio;
146 __aio_free_request (struct requestlist *elem)
149 elem->next_prio = freelist;
155 __aio_find_req (aiocb_union *elem)
157 struct requestlist *runp = requests;
158 int fildes = elem->aiocb.aio_fildes;
160 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
161 runp = runp->next_fd;
164 if (runp->aiocbp->aiocb.aio_fildes != fildes)
167 while (runp != NULL && runp->aiocbp != elem)
168 runp = runp->next_prio;
175 __aio_find_req_fd (int fildes)
177 struct requestlist *runp = requests;
179 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
180 runp = runp->next_fd;
182 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
187 /* The thread handler. */
188 static void *handle_fildes_io (void *arg);
191 /* User optimization. */
193 __aio_init (const struct aioinit *init)
196 pthread_mutex_lock (&__aio_requests_mutex);
198 /* Only allow writing new values if the table is not yet allocated. */
201 optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
202 optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
204 : init->aio_num & ~ENTRIES_PER_ROW);
207 /* Release the mutex. */
208 pthread_mutex_unlock (&__aio_requests_mutex);
210 weak_alias (__aio_init, aio_init)
213 /* The main function of the async I/O handling. It enqueues requests
214 and if necessary starts and handles threads. */
216 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
220 struct sched_param param;
221 struct requestlist *last, *runp, *newp;
224 if (aiocbp->aiocb.aio_reqprio < 0
225 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
227 /* Invalid priority value. */
228 __set_errno (EINVAL);
229 aiocbp->aiocb.__error_code = EINVAL;
230 aiocbp->aiocb.__return_value = -1;
234 /* Compute priority for this request. */
235 pthread_getschedparam (pthread_self (), &policy, ¶m);
236 prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
239 pthread_mutex_lock (&__aio_requests_mutex);
243 /* First look whether the current file descriptor is currently
246 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
249 runp = runp->next_fd;
252 /* Get a new element for the waiting list. */
256 __set_errno (EAGAIN);
257 pthread_mutex_unlock (&__aio_requests_mutex);
260 newp->aiocbp = aiocbp;
261 newp->waiting = NULL;
263 aiocbp->aiocb.__abs_prio = prio;
264 aiocbp->aiocb.__policy = policy;
265 aiocbp->aiocb.aio_lio_opcode = operation;
266 aiocbp->aiocb.__error_code = EINPROGRESS;
267 aiocbp->aiocb.__return_value = 0;
270 && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
272 /* The current file descriptor is worked on. It makes no sense
273 to start another thread since this new thread would fight
274 with the running thread for the resources. But we also cannot
275 say that the thread processing this desriptor shall immediately
276 after finishing the current job process this request if there
277 are other threads in the running queue which have a higher
280 /* Simply enqueue it after the running one according to the
282 while (runp->next_prio != NULL
283 && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
284 runp = runp->next_prio;
286 newp->next_prio = runp->next_prio;
287 runp->next_prio = newp;
293 /* Enqueue this request for a new descriptor. */
296 newp->last_fd = NULL;
297 newp->next_fd = requests;
298 if (requests != NULL)
299 requests->last_fd = newp;
304 newp->next_fd = last->next_fd;
305 newp->last_fd = last;
306 last->next_fd = newp;
307 if (newp->next_fd != NULL)
308 newp->next_fd->last_fd = newp;
311 newp->next_prio = NULL;
316 /* We try to create a new thread for this file descriptor. The
317 function which gets called will handle all available requests
318 for this descriptor and when all are processed it will
321 If no new thread can be created or if the specified limit of
322 threads for AIO is reached we queue the request. */
324 /* See if we can create a thread. */
325 if (nthreads < optim.aio_threads)
330 /* Make sure the thread is created detached. */
331 pthread_attr_init (&attr);
332 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
334 /* Now try to start a thread. */
335 if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
337 /* We managed to enqueue the request. All errors which can
338 happen now can be recognized by calls to `aio_return' and
343 else if (nthreads == 0)
344 /* We cannot create a thread in the moment and there is
345 also no thread running. This is a problem. `errno' is
346 set to EAGAIN if this is only a temporary problem. */
351 /* Enqueue the request in the run queue if it is not yet running. */
352 if (running < yes && result == 0)
354 if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
356 newp->next_run = runlist;
363 while (runp->next_run != NULL
364 && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
365 runp = runp->next_run;
367 newp->next_run = runp->next_run;
368 runp->next_run = newp;
373 newp->running = running;
376 /* Something went wrong. */
377 __aio_free_request (newp);
381 /* Release the mutex. */
382 pthread_mutex_unlock (&__aio_requests_mutex);
389 handle_fildes_io (void *arg)
391 pthread_t self = pthread_self ();
392 struct sched_param param;
393 struct requestlist *runp = (struct requestlist *) arg;
398 pthread_getschedparam (self, &policy, ¶m);
402 /* Update our variables. */
403 aiocbp = runp->aiocbp;
404 fildes = aiocbp->aiocb.aio_fildes;
406 /* Change the priority to the requested value (if necessary). */
407 if (aiocbp->aiocb.__abs_prio != param.sched_priority
408 || aiocbp->aiocb.__policy != policy)
410 param.sched_priority = aiocbp->aiocb.__abs_prio;
411 policy = aiocbp->aiocb.__policy;
412 pthread_setschedparam (self, policy, ¶m);
415 /* Process request pointed to by RUNP. We must not be disturbed
417 if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
419 if (aiocbp->aiocb.aio_lio_opcode & 128)
420 aiocbp->aiocb.__return_value =
421 TEMP_FAILURE_RETRY (__pread64 (fildes,
422 (void *) aiocbp->aiocb64.aio_buf,
423 aiocbp->aiocb64.aio_nbytes,
424 aiocbp->aiocb64.aio_offset));
426 aiocbp->aiocb.__return_value =
427 TEMP_FAILURE_RETRY (pread (fildes,
428 (void *) aiocbp->aiocb.aio_buf,
429 aiocbp->aiocb.aio_nbytes,
430 aiocbp->aiocb.aio_offset));
432 else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
434 if (aiocbp->aiocb.aio_lio_opcode & 128)
435 aiocbp->aiocb.__return_value =
436 TEMP_FAILURE_RETRY (__pwrite64 (fildes,
437 (const void *) aiocbp->aiocb64.aio_buf,
438 aiocbp->aiocb64.aio_nbytes,
439 aiocbp->aiocb64.aio_offset));
441 aiocbp->aiocb.__return_value =
442 TEMP_FAILURE_RETRY (pwrite (fildes,
443 (const void *) aiocbp->aiocb.aio_buf,
444 aiocbp->aiocb.aio_nbytes,
445 aiocbp->aiocb.aio_offset));
447 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
448 aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
449 else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
450 aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
453 /* This is an invalid opcode. */
454 aiocbp->aiocb.__return_value = -1;
455 __set_errno (EINVAL);
459 pthread_mutex_lock (&__aio_requests_mutex);
461 if (aiocbp->aiocb.__return_value == -1)
462 aiocbp->aiocb.__error_code = errno;
464 aiocbp->aiocb.__error_code = 0;
466 /* Send the signal to notify about finished processing of the
470 /* Now dequeue the current request. */
471 if (runp->next_prio == NULL)
473 /* No outstanding request for this descriptor. Remove this
474 descriptor from the list. */
475 if (runp->next_fd != NULL)
476 runp->next_fd->last_fd = runp->last_fd;
477 if (runp->last_fd != NULL)
478 runp->last_fd->next_fd = runp->next_fd;
482 runp->next_prio->last_fd = runp->last_fd;
483 runp->next_prio->next_fd = runp->next_fd;
484 runp->next_prio->running = yes;
485 if (runp->next_fd != NULL)
486 runp->next_fd->last_fd = runp->next_prio;
487 if (runp->last_fd != NULL)
488 runp->last_fd->next_fd = runp->next_prio;
491 /* Free the old element. */
492 __aio_free_request (runp);
497 /* We must not run requests which are not marked `running'. */
498 if (runp->running == yes)
499 runlist = runp->next_run;
502 struct requestlist *old;
507 runp = runp->next_run;
509 while (runp != NULL && runp->running != yes);
512 old->next_run = runp->next_run;
516 /* If no request to work on we will stop the thread. */
520 runp->running = allocated;
522 /* Release the mutex. */
523 pthread_mutex_unlock (&__aio_requests_mutex);
525 while (runp != NULL);
531 /* Free allocated resources. */
533 __attribute__ ((unused))
538 /* The first block of rows as specified in OPTIM is allocated in
542 for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
548 text_set_element (__libc_subfreeres, free_res);