/* Handle general operations.
- Copyright (C) 1997, 1998, 1999 Free Software Foundation, Inc.
+ Copyright (C) 1997, 1998, 1999, 2000 Free Software Foundation, Inc.
This file is part of the GNU C Library.
Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
Boston, MA 02111-1307, USA. */
#include <aio.h>
+#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
+#include <sys/time.h>
#include "aio_misc.h"
+static void add_request_to_runlist (struct requestlist *newrequest);
+
/* Pool of request list entries. */
static struct requestlist **pool;
/* Number of threads currently running. */
static int nthreads;
+/* Number of threads waiting for work to arrive. */
+static int idle_thread_count;
+
/* These are the values used to optimize the use of AIO. The user can
overwrite them by using the `aio_init' function. */
0,
0,
0,
- { 0, }
+ 1,
+ 0
};
/* Since the list is global we need a mutex protecting it. */
pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
+/* When you add a request to the list and there are idle threads present,
+ you signal this condition variable. When a thread finishes work, it waits
+ on this condition variable for a time before it actually exits. */
+pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
+
/* Functions to handle request list pool. */
static struct requestlist *
struct requestlist *new_row;
size_t new_size;
+ assert(sizeof(struct aiocb) == sizeof(struct aiocb64));
+
/* Compute new size. */
new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
: init->aio_num & ~ENTRIES_PER_ROW);
}
+ if (init->aio_idle_time != 0)
+ optim.aio_idle_time = init->aio_idle_time;
+
/* Release the mutex. */
pthread_mutex_unlock (&__aio_requests_mutex);
}
}
else
{
+ running = yes;
/* Enqueue this request for a new descriptor. */
if (last == NULL)
{
newp->next_prio = NULL;
}
- if (running == no)
+ if (running == yes)
{
/* We try to create a new thread for this file descriptor. The
function which gets called will handle all available requests
If no new thread can be created or if the specified limit of
threads for AIO is reached we queue the request. */
- /* See if we can create a thread. */
- if (nthreads < optim.aio_threads)
+ /* See if we need to and are able to create a thread. */
+ if (nthreads < optim.aio_threads && idle_thread_count == 0)
{
pthread_t thid;
pthread_attr_t attr;
}
/* Enqueue the request in the run queue if it is not yet running. */
- if (running < yes && result == 0)
+ if (running == yes && result == 0)
{
- if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
- {
- newp->next_run = runlist;
- runlist = newp;
- }
- else
- {
- runp = runlist;
+ add_request_to_runlist (newp);
- while (runp->next_run != NULL
- && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
- runp = runp->next_run;
-
- newp->next_run = runp->next_run;
- runp->next_run = newp;
- }
+ /* If there is a thread waiting for work, then let it know that we
+ have just given it something to do. */
+ if (idle_thread_count > 0)
+ pthread_cond_signal (&__aio_new_request_notification);
}
if (result == 0)
do
{
- /* Update our variables. */
- aiocbp = runp->aiocbp;
- fildes = aiocbp->aiocb.aio_fildes;
-
- /* Change the priority to the requested value (if necessary). */
- if (aiocbp->aiocb.__abs_prio != param.sched_priority
- || aiocbp->aiocb.__policy != policy)
+ /* If runp is NULL, then we were created to service the work queue
+ in general, not to handle any particular request. In that case we
+ skip the "do work" stuff on the first pass, and go directly to the
+ "get work off the work queue" part of this loop, which is near the
+ end. */
+ if (runp == NULL)
+ pthread_mutex_lock (&__aio_requests_mutex);
+ else
{
- param.sched_priority = aiocbp->aiocb.__abs_prio;
- policy = aiocbp->aiocb.__policy;
- pthread_setschedparam (self, policy, ¶m);
- }
+ /* Update our variables. */
+ aiocbp = runp->aiocbp;
+ fildes = aiocbp->aiocb.aio_fildes;
- /* Process request pointed to by RUNP. We must not be disturbed
- by signals. */
- if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
- {
- if (aiocbp->aiocb.aio_lio_opcode & 128)
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (__pread64 (fildes,
+ /* Change the priority to the requested value (if necessary). */
+ if (aiocbp->aiocb.__abs_prio != param.sched_priority
+ || aiocbp->aiocb.__policy != policy)
+ {
+ param.sched_priority = aiocbp->aiocb.__abs_prio;
+ policy = aiocbp->aiocb.__policy;
+ pthread_setschedparam (self, policy, ¶m);
+ }
+
+ /* Process request pointed to by RUNP. We must not be disturbed
+ by signals. */
+ if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
+ {
+ if (aiocbp->aiocb.aio_lio_opcode & 128)
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
+ aiocbp->aiocb64.aio_buf,
+ aiocbp->aiocb64.aio_nbytes,
+ aiocbp->aiocb64.aio_offset));
+ else
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (pread (fildes,
+ (void *) aiocbp->aiocb.aio_buf,
+ aiocbp->aiocb.aio_nbytes,
+ aiocbp->aiocb.aio_offset));
+
+ if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
+ /* The Linux kernel is different from others. It returns
+ ESPIPE if using pread on a socket. Other platforms
+ simply ignore the offset parameter and behave like
+ read. */
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (read (fildes,
+ (void *) aiocbp->aiocb64.aio_buf,
+ aiocbp->aiocb64.aio_nbytes));
+ }
+ else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
+ {
+ if (aiocbp->aiocb.aio_lio_opcode & 128)
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
+ aiocbp->aiocb64.aio_buf,
+ aiocbp->aiocb64.aio_nbytes,
+ aiocbp->aiocb64.aio_offset));
+ else
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (pwrite (fildes, (const void *)
+ aiocbp->aiocb.aio_buf,
+ aiocbp->aiocb.aio_nbytes,
+ aiocbp->aiocb.aio_offset));
+
+ if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
+ /* The Linux kernel is different from others. It returns
+ ESPIPE if using pwrite on a socket. Other platforms
+ simply ignore the offset parameter and behave like
+ write. */
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (write (fildes,
(void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes,
- aiocbp->aiocb64.aio_offset));
- else
+ aiocbp->aiocb64.aio_nbytes));
+ }
+ else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (pread (fildes,
- (void *) aiocbp->aiocb.aio_buf,
- aiocbp->aiocb.aio_nbytes,
- aiocbp->aiocb.aio_offset));
-
- if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
- /* The Linux kernel is different from others. It returns
- ESPIPE if using pread on a socket. Other platforms
- simply ignore the offset parameter and behave like
- read. */
+ TEMP_FAILURE_RETRY (fdatasync (fildes));
+ else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (read (fildes,
- (void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes));
- }
- else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
- {
- if (aiocbp->aiocb.aio_lio_opcode & 128)
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (__pwrite64 (fildes,
- (const void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes,
- aiocbp->aiocb64.aio_offset));
+ TEMP_FAILURE_RETRY (fsync (fildes));
else
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (pwrite (fildes,
- (const void *) aiocbp->aiocb.aio_buf,
- aiocbp->aiocb.aio_nbytes,
- aiocbp->aiocb.aio_offset));
-
- if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
- /* The Linux kernel is different from others. It returns
- ESPIPE if using pwrite on a socket. Other platforms
- simply ignore the offset parameter and behave like
- write. */
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (write (fildes,
- (void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes));
- }
- else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
- aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
- else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
- aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
- else
- {
- /* This is an invalid opcode. */
- aiocbp->aiocb.__return_value = -1;
- __set_errno (EINVAL);
- }
+ {
+ /* This is an invalid opcode. */
+ aiocbp->aiocb.__return_value = -1;
+ __set_errno (EINVAL);
+ }
- /* Get the mutex. */
- pthread_mutex_lock (&__aio_requests_mutex);
+ /* Get the mutex. */
+ pthread_mutex_lock (&__aio_requests_mutex);
- if (aiocbp->aiocb.__return_value == -1)
- aiocbp->aiocb.__error_code = errno;
- else
- aiocbp->aiocb.__error_code = 0;
+ if (aiocbp->aiocb.__return_value == -1)
+ aiocbp->aiocb.__error_code = errno;
+ else
+ aiocbp->aiocb.__error_code = 0;
- /* Send the signal to notify about finished processing of the
- request. */
- __aio_notify (runp);
+ /* Send the signal to notify about finished processing of the
+ request. */
+ __aio_notify (runp);
- /* Now dequeue the current request. */
- if (runp->next_prio == NULL)
- {
- /* No outstanding request for this descriptor. Remove this
- descriptor from the list. */
- if (runp->next_fd != NULL)
- runp->next_fd->last_fd = runp->last_fd;
- if (runp->last_fd != NULL)
- runp->last_fd->next_fd = runp->next_fd;
- else
- requests = runp->next_fd;
- }
- else
- {
- runp->next_prio->last_fd = runp->last_fd;
- runp->next_prio->next_fd = runp->next_fd;
- runp->next_prio->running = yes;
- if (runp->next_fd != NULL)
- runp->next_fd->last_fd = runp->next_prio;
- if (runp->last_fd != NULL)
- runp->last_fd->next_fd = runp->next_prio;
+ /* Now dequeue the current request. */
+ if (runp->next_prio == NULL)
+ {
+ /* No outstanding request for this descriptor. Remove this
+ descriptor from the list. */
+ if (runp->next_fd != NULL)
+ runp->next_fd->last_fd = runp->last_fd;
+ if (runp->last_fd != NULL)
+ runp->last_fd->next_fd = runp->next_fd;
+ else
+ requests = runp->next_fd;
+ }
else
- requests = runp->next_prio;
- }
+ {
+ runp->next_prio->last_fd = runp->last_fd;
+ runp->next_prio->next_fd = runp->next_fd;
+ runp->next_prio->running = yes;
+ if (runp->next_fd != NULL)
+ runp->next_fd->last_fd = runp->next_prio;
+ if (runp->last_fd != NULL)
+ runp->last_fd->next_fd = runp->next_prio;
+ else
+ requests = runp->next_prio;
+ add_request_to_runlist (runp->next_prio);
+ }
- /* Free the old element. */
- __aio_free_request (runp);
+ /* Free the old element. */
+ __aio_free_request (runp);
+ }
runp = runlist;
- if (runp != NULL)
+
+ /* If the runlist is empty, then we sleep for a while, waiting for
+ something to arrive in it. */
+ if (runp == NULL && optim.aio_idle_time >= 0)
{
- /* We must not run requests which are not marked `running'. */
- if (runp->running == yes)
- runlist = runp->next_run;
- else
+ struct timeval now;
+ struct timespec wakeup_time;
+
+ ++idle_thread_count;
+ gettimeofday (&now, NULL);
+ wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
+ wakeup_time.tv_nsec = now.tv_usec * 1000;
+ if (wakeup_time.tv_nsec > 1000000000)
{
- struct requestlist *old;
-
- do
- {
- old = runp;
- runp = runp->next_run;
- }
- while (runp != NULL && runp->running != yes);
-
- if (runp != NULL)
- old->next_run = runp->next_run;
+ wakeup_time.tv_nsec -= 1000000000;
+ ++wakeup_time.tv_sec;
}
+ pthread_cond_timedwait (&__aio_new_request_notification,
+ &__aio_requests_mutex,
+ &wakeup_time);
+ --idle_thread_count;
+ runp = runlist;
}
- /* If no request to work on we will stop the thread. */
if (runp == NULL)
--nthreads;
else
- runp->running = allocated;
+ {
+ assert (runp->running == yes);
+ runp->running = allocated;
+ runlist = runp->next_run;
+
+ /* If we have a request to process, and there's still another in
+ the run list, then we need to either wake up or create a new
+ thread to service the request that is still in the run list. */
+ if (runlist != NULL)
+ {
+ /* There are at least two items in the work queue to work on.
+ If there are other idle threads, then we should wake them
+ up for these other work elements; otherwise, we should try
+ to create a new thread. */
+ if (idle_thread_count > 0)
+ pthread_cond_signal (&__aio_new_request_notification);
+ else if (nthreads < optim.aio_threads)
+ {
+ pthread_t thid;
+ pthread_attr_t attr;
+
+ /* Make sure the thread is created detached. */
+ pthread_attr_init (&attr);
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+ /* Now try to start a thread. If we fail, no big deal,
+ because we know that there is at least one thread (us)
+ that is working on AIO operations. */
+ if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
+ == 0)
+ ++nthreads;
+ }
+ }
+ }
/* Release the mutex. */
pthread_mutex_unlock (&__aio_requests_mutex);
free (pool);
}
-
text_set_element (__libc_subfreeres, free_res);
+
+
+/* Add newrequest to the runlist. The __abs_prio flag of newrequest must
+ be correctly set to do this. Also, you had better set newrequest's
+ "running" flag to "yes" before you release your lock or you'll throw an
+ assertion. */
+static void
+add_request_to_runlist (struct requestlist *newrequest)
+{
+ int prio = newrequest->aiocbp->aiocb.__abs_prio;
+ struct requestlist *runp;
+
+ if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
+ {
+ newrequest->next_run = runlist;
+ runlist = newrequest;
+ }
+ else
+ {
+ runp = runlist;
+
+ while (runp->next_run != NULL
+ && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
+ runp = runp->next_run;
+
+ newrequest->next_run = runp->next_run;
+ runp->next_run = newrequest;
+ }
+}