Update.
[platform/upstream/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <errno.h>
23 #include <pthread.h>
24 #include <semaphore.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28
29 #include "aio_misc.h"
30
31
32 /* We need a list of pending operations.  This is sorted according to
33    the priority given in the aio_reqprio member.  */
34 aiocb_union *__aio_requests;
35
36 /* Since the list is global we need a semaphore protecting it.  */
37 sem_t __aio_requests_sema;
38
39
40 /* The initialization function.  It gets automatically called if any
41    aio_* function is used in the program.  */
42 static void
43 __attribute__ ((unused))
44 aio_initialize (void)
45 {
46   /* Initialize the semaphore.  We allow exactly one user at a time.  */
47   sem_init (&__aio_requests_sema, 0, 1);
48 }
49
50 text_set_element (__libc_subinit, aio_initialize);
51
52
53 /* The thread handler.  */
54 static void *handle_fildes_io (void *arg);
55
56
57 /* The main function of the async I/O handling.  It enqueues requests
58    and if necessary starts and handles threads.  */
59 int
60 __aio_enqueue_request (aiocb_union *aiocbp, int operation, int require_lock)
61 {
62   int result;
63   int policy, prio;
64   struct sched_param param;
65   aiocb_union *runp;
66
67   if (aiocbp->aiocb.aio_reqprio < 0
68       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
69     {
70       /* Invalid priority value.  */
71       __set_errno (EINVAL);
72       aiocbp->aiocb.__error_code = EINVAL;
73       aiocbp->aiocb.__return_value = -1;
74       return -1;
75     }
76
77   if (pthread_getschedparam (pthread_self (), &policy, &param) < 0)
78     {
79       /* Something went wrong.  */
80       aiocbp->aiocb.__error_code = errno;
81       aiocbp->aiocb.__return_value = -1;
82       return -1;
83     }
84
85   /* Compute priority for this request.  */
86   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
87
88
89   /* Get the semaphore.  */
90   if (require_lock)
91     sem_wait (&__aio_requests_sema);
92
93   runp = __aio_requests;
94   /* First look whether the current file descriptor is currently
95      worked with.  */
96   while (runp != NULL && runp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
97     runp = (aiocb_union *) runp->aiocb.__next_fd;
98
99   if (runp != NULL)
100     {
101       /* The current file descriptor is worked on.  It makes no sense
102          to start another thread since this new thread would have to
103          wait for the previous one to terminate.  Simply enqueue it
104          after the running one according to the priority.  */
105       while (runp->aiocb.__next_prio != NULL
106              && runp->aiocb.__next_prio->__abs_prio >= prio)
107         runp = (aiocb_union *) runp->aiocb.__next_prio;
108
109       aiocbp->aiocb.__next_prio = runp->aiocb.__next_prio;
110       aiocbp->aiocb.__abs_prio = prio;
111       aiocbp->aiocb.__policy = policy;
112       aiocbp->aiocb.aio_lio_opcode = operation;
113       aiocbp->aiocb.__error_code = EINPROGRESS;
114       aiocbp->aiocb.__return_value = 0;
115       runp->aiocb.__next_prio = (struct aiocb *) aiocbp;
116
117       result = 0;
118     }
119   else
120     {
121       /* We create a new thread for this file descriptor.  The
122          function which gets called will handle all available requests
123          for this descriptor and when all are processed it will
124          terminate.  */
125       pthread_t thid;
126       pthread_attr_t attr;
127
128       /* First enqueue the request (the list is empty).  */
129       aiocbp->aiocb.__next_fd = NULL;
130       aiocbp->aiocb.__last_fd = NULL;
131
132       aiocbp->aiocb.__next_prio = NULL;
133       aiocbp->aiocb.__abs_prio = prio;
134       aiocbp->aiocb.__policy = policy;
135       aiocbp->aiocb.aio_lio_opcode = operation;
136       aiocbp->aiocb.__error_code = EINPROGRESS;
137       aiocbp->aiocb.__return_value = 0;
138
139       /* Make sure the thread is created detached.  */
140       pthread_attr_init (&attr);
141       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
142
143       /* Now try to start a thread.  */
144       if (pthread_create (&thid, &attr, handle_fildes_io, aiocbp) < 0)
145         {
146           result = -1;
147           aiocbp->aiocb.__error_code = errno;
148           aiocbp->aiocb.__return_value = -1;
149         }
150       else
151         /* We managed to enqueue the request.  All errors which can
152            happen now can be recognized by calls to `aio_return' and
153            `aio_error'.  */
154           result = 0;
155     }
156
157   /* Release the semaphore.  */
158   if (require_lock)
159     sem_post (&__aio_requests_sema);
160
161   return result;
162 }
163
164
165 static void *
166 handle_fildes_io (void *arg)
167 {
168   pthread_t self = pthread_self ();
169   struct sched_param param;
170   aiocb_union *runp = (aiocb_union *) arg;
171   int policy;
172   int fildes = runp->aiocb.aio_fildes;  /* This is always the same.  */
173
174   pthread_getschedparam (self, &policy, &param);
175
176   do
177     {
178       /* Change the priority to the requested value (if necessary).  */
179       if (runp->aiocb.__abs_prio != param.sched_priority
180           || runp->aiocb.__policy != policy)
181         {
182           param.sched_priority = runp->aiocb.__abs_prio;
183           policy = runp->aiocb.__policy;
184           pthread_setschedparam (self, policy, &param);
185         }
186
187       /* Process request pointed to by RUNP.  We must not be disturbed
188          by signals.  */
189       if ((runp->aiocb.aio_lio_opcode & 127) == LIO_READ)
190         {
191           if (runp->aiocb.aio_lio_opcode & 128)
192             runp->aiocb.__return_value =
193               TEMP_FAILURE_RETRY (__pread64 (fildes,
194                                              (void *) runp->aiocb64.aio_buf,
195                                              runp->aiocb64.aio_nbytes,
196                                              runp->aiocb64.aio_offset));
197           else
198             runp->aiocb.__return_value =
199               TEMP_FAILURE_RETRY (__pread (fildes,
200                                            (void *) runp->aiocb.aio_buf,
201                                            runp->aiocb.aio_nbytes,
202                                            runp->aiocb.aio_offset));
203         }
204       else if ((runp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
205         {
206           if (runp->aiocb.aio_lio_opcode & 128)
207             runp->aiocb.__return_value =
208               TEMP_FAILURE_RETRY (__pwrite64 (fildes,
209                                               (const void *) runp->aiocb64.aio_buf,
210                                               runp->aiocb64.aio_nbytes,
211                                               runp->aiocb64.aio_offset));
212           else
213             runp->aiocb.__return_value =
214               TEMP_FAILURE_RETRY (__pwrite (fildes,
215                                             (const void *) runp->aiocb.aio_buf,
216                                             runp->aiocb.aio_nbytes,
217                                             runp->aiocb.aio_offset));
218         }
219       else if (runp->aiocb.aio_lio_opcode == __LIO_DSYNC)
220         runp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
221       else if (runp->aiocb.aio_lio_opcode == __LIO_SYNC)
222         runp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
223       else
224         {
225           /* This is an invalid opcode.  */
226           runp->aiocb.__return_value = -1;
227           __set_errno (EINVAL);
228         }
229
230       if (runp->aiocb.__return_value == -1)
231         runp->aiocb.__error_code = errno;
232       else
233         runp->aiocb.__error_code = 0;
234
235       /* Send the signal to notify about finished processing of the
236          request.  */
237       if (runp->aiocb.aio_sigevent.sigev_notify == SIGEV_THREAD)
238         {
239           /* We have to start a thread.  */
240           pthread_t tid;
241           pthread_attr_t attr, *pattr;
242
243           pattr = (pthread_attr_t *)
244             runp->aiocb.aio_sigevent.sigev_notify_attributes;
245           if (pattr == NULL)
246             {
247               pthread_attr_init (&attr);
248               pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
249               pattr = &attr;
250             }
251
252           if (pthread_create (&tid,
253                               (pthread_attr_t *)
254                               runp->aiocb.aio_sigevent.sigev_notify_attributes,
255                               (void *(*) (void *))
256                               runp->aiocb.aio_sigevent.sigev_notify_function,
257                               runp->aiocb.aio_sigevent.sigev_value.sival_ptr)
258               < 0)
259             {
260               /* XXX What shall we do if already an error is set by
261                  read/write/fsync?  */
262               runp->aiocb.__error_code = errno;
263               runp->aiocb.__return_value = -1;
264             }
265         }
266       else if (runp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL)
267         /* We have to send a signal.  */
268         if (__aio_sigqueue (runp->aiocb.aio_sigevent.sigev_signo,
269                             runp->aiocb.aio_sigevent.sigev_value) < 0)
270           {
271             /* XXX What shall we do if already an error is set by
272                read/write/fsync?  */
273             runp->aiocb.__error_code = errno;
274             runp->aiocb.__return_value = -1;
275           }
276
277       /* Get the semaphore.  */
278       sem_wait (&__aio_requests_sema);
279
280       /* Now dequeue the current request.  */
281       if (runp->aiocb.__next_prio == NULL)
282         {
283           if (runp->aiocb.__next_fd != NULL)
284             runp->aiocb.__next_fd->__last_fd = runp->aiocb.__last_fd;
285           if (runp->aiocb.__last_fd != NULL)
286             runp->aiocb.__last_fd->__next_fd = runp->aiocb.__next_fd;
287           runp = NULL;
288         }
289       else
290         {
291           runp->aiocb.__next_prio->__last_fd = runp->aiocb.__last_fd;
292           runp->aiocb.__next_prio->__next_fd = runp->aiocb.__next_fd;
293           if (runp->aiocb.__next_fd != NULL)
294             runp->aiocb.__next_fd->__last_fd = runp->aiocb.__next_prio;
295           if (runp->aiocb.__last_fd != NULL)
296             runp->aiocb.__last_fd->__next_fd = runp->aiocb.__next_prio;
297           runp = (aiocb_union *) runp->aiocb.__next_prio;
298         }
299
300       /* Release the semaphore.  */
301       sem_post (&__aio_requests_sema);
302     }
303   while (runp != NULL);
304
305   pthread_exit (NULL);
306 }