S390: Fix building with --disable-mutli-arch [BZ #31196]
[platform/upstream/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997-2024 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4
5    The GNU C Library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 2.1 of the License, or (at your option) any later version.
9
10    The GNU C Library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with the GNU C Library; if not, see
17    <https://www.gnu.org/licenses/>.  */
18
19 #include <aio.h>
20 #include <assert.h>
21 #include <errno.h>
22 #include <limits.h>
23 #include <pthreadP.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <sys/param.h>
27 #include <sys/stat.h>
28 #include <sys/time.h>
29 #include <aio_misc.h>
30
31 #if !PTHREAD_IN_LIBC
32 /* The available function names differ outside of libc.  (In libc, we
33    need to use hidden aliases to avoid the PLT.)  */
34 # define __pread __libc_pread
35 # define __pthread_attr_destroy pthread_attr_destroy
36 # define __pthread_attr_init pthread_attr_init
37 # define __pthread_attr_setdetachstate pthread_attr_setdetachstate
38 # define __pthread_cond_signal pthread_cond_signal
39 # define __pthread_cond_timedwait pthread_cond_timedwait
40 # define __pthread_getschedparam pthread_getschedparam
41 # define __pthread_setschedparam pthread_setschedparam
42 # define __pwrite __libc_pwrite
43 #endif
44
45 #ifndef aio_create_helper_thread
46 # define aio_create_helper_thread __aio_create_helper_thread
47
48 extern inline int
49 __aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
50 {
51   pthread_attr_t attr;
52
53   /* Make sure the thread is created detached.  */
54   __pthread_attr_init (&attr);
55   __pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
56
57   int ret = __pthread_create (threadp, &attr, tf, arg);
58
59   __pthread_attr_destroy (&attr);
60   return ret;
61 }
62 #endif
63
64 static void add_request_to_runlist (struct requestlist *newrequest);
65
66 /* Pool of request list entries.  */
67 static struct requestlist **pool;
68
69 /* Number of total and allocated pool entries.  */
70 static size_t pool_max_size;
71 static size_t pool_size;
72
73 /* We implement a two dimensional array but allocate each row separately.
74    The macro below determines how many entries should be used per row.
75    It should better be a power of two.  */
76 #define ENTRIES_PER_ROW 32
77
78 /* How many rows we allocate at once.  */
79 #define ROWS_STEP       8
80
81 /* List of available entries.  */
82 static struct requestlist *freelist;
83
84 /* List of request waiting to be processed.  */
85 static struct requestlist *runlist;
86
87 /* Structure list of all currently processed requests.  */
88 static struct requestlist *requests;
89
90 /* Number of threads currently running.  */
91 static int nthreads;
92
93 /* Number of threads waiting for work to arrive. */
94 static int idle_thread_count;
95
96
97 /* These are the values used to optimize the use of AIO.  The user can
98    overwrite them by using the `aio_init' function.  */
99 static struct aioinit optim =
100 {
101   20,   /* int aio_threads;     Maximal number of threads.  */
102   64,   /* int aio_num;         Number of expected simultaneous requests. */
103   0,
104   0,
105   0,
106   0,
107   1,
108   0
109 };
110
111
112 /* Since the list is global we need a mutex protecting it.  */
113 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
114
115 /* When you add a request to the list and there are idle threads present,
116    you signal this condition variable. When a thread finishes work, it waits
117    on this condition variable for a time before it actually exits. */
118 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
119
120
121 /* Functions to handle request list pool.  */
122 static struct requestlist *
123 get_elem (void)
124 {
125   struct requestlist *result;
126
127   if (freelist == NULL)
128     {
129       struct requestlist *new_row;
130       int cnt;
131
132       assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
133
134       if (pool_size + 1 >= pool_max_size)
135         {
136           size_t new_max_size = pool_max_size + ROWS_STEP;
137           struct requestlist **new_tab;
138
139           new_tab = (struct requestlist **)
140             realloc (pool, new_max_size * sizeof (struct requestlist *));
141
142           if (new_tab == NULL)
143             return NULL;
144
145           pool_max_size = new_max_size;
146           pool = new_tab;
147         }
148
149       /* Allocate the new row.  */
150       cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
151       new_row = (struct requestlist *) calloc (cnt,
152                                                sizeof (struct requestlist));
153       if (new_row == NULL)
154         return NULL;
155
156       pool[pool_size++] = new_row;
157
158       /* Put all the new entries in the freelist.  */
159       do
160         {
161           new_row->next_prio = freelist;
162           freelist = new_row++;
163         }
164       while (--cnt > 0);
165     }
166
167   result = freelist;
168   freelist = freelist->next_prio;
169
170   return result;
171 }
172
173
174 void
175 __aio_free_request (struct requestlist *elem)
176 {
177   elem->running = no;
178   elem->next_prio = freelist;
179   freelist = elem;
180 }
181
182
183 struct requestlist *
184 __aio_find_req (aiocb_union *elem)
185 {
186   struct requestlist *runp = requests;
187   int fildes = elem->aiocb.aio_fildes;
188
189   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
190     runp = runp->next_fd;
191
192   if (runp != NULL)
193     {
194       if (runp->aiocbp->aiocb.aio_fildes != fildes)
195         runp = NULL;
196       else
197         while (runp != NULL && runp->aiocbp != elem)
198           runp = runp->next_prio;
199     }
200
201   return runp;
202 }
203
204
205 struct requestlist *
206 __aio_find_req_fd (int fildes)
207 {
208   struct requestlist *runp = requests;
209
210   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
211     runp = runp->next_fd;
212
213   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
214           ? runp : NULL);
215 }
216
217
218 void
219 __aio_remove_request (struct requestlist *last, struct requestlist *req,
220                       int all)
221 {
222   assert (req->running == yes || req->running == queued
223           || req->running == done);
224
225   if (last != NULL)
226     last->next_prio = all ? NULL : req->next_prio;
227   else
228     {
229       if (all || req->next_prio == NULL)
230         {
231           if (req->last_fd != NULL)
232             req->last_fd->next_fd = req->next_fd;
233           else
234             requests = req->next_fd;
235           if (req->next_fd != NULL)
236             req->next_fd->last_fd = req->last_fd;
237         }
238       else
239         {
240           if (req->last_fd != NULL)
241             req->last_fd->next_fd = req->next_prio;
242           else
243             requests = req->next_prio;
244
245           if (req->next_fd != NULL)
246             req->next_fd->last_fd = req->next_prio;
247
248           req->next_prio->last_fd = req->last_fd;
249           req->next_prio->next_fd = req->next_fd;
250
251           /* Mark this entry as runnable.  */
252           req->next_prio->running = yes;
253         }
254
255       if (req->running == yes)
256         {
257           struct requestlist *runp = runlist;
258
259           last = NULL;
260           while (runp != NULL)
261             {
262               if (runp == req)
263                 {
264                   if (last == NULL)
265                     runlist = runp->next_run;
266                   else
267                     last->next_run = runp->next_run;
268                   break;
269                 }
270               last = runp;
271               runp = runp->next_run;
272             }
273         }
274     }
275 }
276
277
278 /* The thread handler.  */
279 static void *handle_fildes_io (void *arg);
280
281
282 /* User optimization.  */
283 void
284 __aio_init (const struct aioinit *init)
285 {
286   /* Get the mutex.  */
287   __pthread_mutex_lock (&__aio_requests_mutex);
288
289   /* Only allow writing new values if the table is not yet allocated.  */
290   if (pool == NULL)
291     {
292       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
293       assert (powerof2 (ENTRIES_PER_ROW));
294       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
295                        ? ENTRIES_PER_ROW
296                        : init->aio_num & ~(ENTRIES_PER_ROW - 1));
297     }
298
299   if (init->aio_idle_time != 0)
300     optim.aio_idle_time = init->aio_idle_time;
301
302   /* Release the mutex.  */
303   __pthread_mutex_unlock (&__aio_requests_mutex);
304 }
305
306
307 /* The main function of the async I/O handling.  It enqueues requests
308    and if necessary starts and handles threads.  */
309 struct requestlist *
310 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
311 {
312   int result = 0;
313   int policy, prio;
314   struct sched_param param;
315   struct requestlist *last, *runp, *newp;
316   int running = no;
317
318   if (operation == LIO_SYNC || operation == LIO_DSYNC)
319     aiocbp->aiocb.aio_reqprio = 0;
320   else if (aiocbp->aiocb.aio_reqprio < 0
321 #ifdef AIO_PRIO_DELTA_MAX
322            || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX
323 #endif
324            )
325     {
326       /* Invalid priority value.  */
327       __set_errno (EINVAL);
328       aiocbp->aiocb.__error_code = EINVAL;
329       aiocbp->aiocb.__return_value = -1;
330       return NULL;
331     }
332
333   /* Compute priority for this request.  */
334   __pthread_getschedparam (__pthread_self (), &policy, &param);
335   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
336
337   /* Get the mutex.  */
338   __pthread_mutex_lock (&__aio_requests_mutex);
339
340   last = NULL;
341   runp = requests;
342   /* First look whether the current file descriptor is currently
343      worked with.  */
344   while (runp != NULL
345          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
346     {
347       last = runp;
348       runp = runp->next_fd;
349     }
350
351   /* Get a new element for the waiting list.  */
352   newp = get_elem ();
353   if (newp == NULL)
354     {
355       __pthread_mutex_unlock (&__aio_requests_mutex);
356       __set_errno (EAGAIN);
357       return NULL;
358     }
359   newp->aiocbp = aiocbp;
360   newp->waiting = NULL;
361
362   aiocbp->aiocb.__abs_prio = prio;
363   aiocbp->aiocb.__policy = policy;
364   aiocbp->aiocb.aio_lio_opcode = operation;
365   aiocbp->aiocb.__error_code = EINPROGRESS;
366   aiocbp->aiocb.__return_value = 0;
367
368   if (runp != NULL
369       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
370     {
371       /* The current file descriptor is worked on.  It makes no sense
372          to start another thread since this new thread would fight
373          with the running thread for the resources.  But we also cannot
374          say that the thread processing this descriptor shall immediately
375          after finishing the current job process this request if there
376          are other threads in the running queue which have a higher
377          priority.  */
378
379       /* Simply enqueue it after the running one according to the
380          priority.  */
381       last = NULL;
382       while (runp->next_prio != NULL
383              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
384         {
385           last = runp;
386           runp = runp->next_prio;
387         }
388
389       newp->next_prio = runp->next_prio;
390       runp->next_prio = newp;
391
392       running = queued;
393     }
394   else
395     {
396       running = yes;
397       /* Enqueue this request for a new descriptor.  */
398       if (last == NULL)
399         {
400           newp->last_fd = NULL;
401           newp->next_fd = requests;
402           if (requests != NULL)
403             requests->last_fd = newp;
404           requests = newp;
405         }
406       else
407         {
408           newp->next_fd = last->next_fd;
409           newp->last_fd = last;
410           last->next_fd = newp;
411           if (newp->next_fd != NULL)
412             newp->next_fd->last_fd = newp;
413         }
414
415       newp->next_prio = NULL;
416       last = NULL;
417     }
418
419   if (running == yes)
420     {
421       /* We try to create a new thread for this file descriptor.  The
422          function which gets called will handle all available requests
423          for this descriptor and when all are processed it will
424          terminate.
425
426          If no new thread can be created or if the specified limit of
427          threads for AIO is reached we queue the request.  */
428
429       /* See if we need to and are able to create a thread.  */
430       if (nthreads < optim.aio_threads && idle_thread_count == 0)
431         {
432           pthread_t thid;
433
434           running = newp->running = allocated;
435
436           /* Now try to start a thread.  */
437           result = aio_create_helper_thread (&thid, handle_fildes_io, newp);
438           if (result == 0)
439             /* We managed to enqueue the request.  All errors which can
440                happen now can be recognized by calls to `aio_return' and
441                `aio_error'.  */
442             ++nthreads;
443           else
444             {
445               /* Reset the running flag.  The new request is not running.  */
446               running = newp->running = yes;
447
448               if (nthreads == 0)
449                 {
450                   /* We cannot create a thread in the moment and there is
451                      also no thread running.  This is a problem.  `errno' is
452                      set to EAGAIN if this is only a temporary problem.  */
453                   __aio_remove_request (last, newp, 0);
454                 }
455               else
456                 result = 0;
457             }
458         }
459     }
460
461   /* Enqueue the request in the run queue if it is not yet running.  */
462   if (running == yes && result == 0)
463     {
464       add_request_to_runlist (newp);
465
466       /* If there is a thread waiting for work, then let it know that we
467          have just given it something to do. */
468       if (idle_thread_count > 0)
469         __pthread_cond_signal (&__aio_new_request_notification);
470     }
471
472   if (result == 0)
473     newp->running = running;
474   else
475     {
476       /* Something went wrong.  */
477       __aio_free_request (newp);
478       aiocbp->aiocb.__error_code = result;
479       __set_errno (result);
480       newp = NULL;
481     }
482
483   /* Release the mutex.  */
484   __pthread_mutex_unlock (&__aio_requests_mutex);
485
486   return newp;
487 }
488
489
490 static void *
491 handle_fildes_io (void *arg)
492 {
493   pthread_t self = __pthread_self ();
494   struct sched_param param;
495   struct requestlist *runp = (struct requestlist *) arg;
496   aiocb_union *aiocbp;
497   int policy;
498   int fildes;
499
500   __pthread_getschedparam (self, &policy, &param);
501
502   do
503     {
504       /* If runp is NULL, then we were created to service the work queue
505          in general, not to handle any particular request. In that case we
506          skip the "do work" stuff on the first pass, and go directly to the
507          "get work off the work queue" part of this loop, which is near the
508          end. */
509       if (runp == NULL)
510         __pthread_mutex_lock (&__aio_requests_mutex);
511       else
512         {
513           /* Hopefully this request is marked as running.  */
514           assert (runp->running == allocated);
515
516           /* Update our variables.  */
517           aiocbp = runp->aiocbp;
518           fildes = aiocbp->aiocb.aio_fildes;
519
520           /* Change the priority to the requested value (if necessary).  */
521           if (aiocbp->aiocb.__abs_prio != param.sched_priority
522               || aiocbp->aiocb.__policy != policy)
523             {
524               param.sched_priority = aiocbp->aiocb.__abs_prio;
525               policy = aiocbp->aiocb.__policy;
526               __pthread_setschedparam (self, policy, &param);
527             }
528
529           /* Process request pointed to by RUNP.  We must not be disturbed
530              by signals.  */
531           if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
532             {
533               if (sizeof (off_t) != sizeof (off64_t)
534                   && aiocbp->aiocb.aio_lio_opcode & 128)
535                 aiocbp->aiocb.__return_value =
536                   TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
537                                                  aiocbp->aiocb64.aio_buf,
538                                                  aiocbp->aiocb64.aio_nbytes,
539                                                  aiocbp->aiocb64.aio_offset));
540               else
541                 aiocbp->aiocb.__return_value =
542                   TEMP_FAILURE_RETRY (__pread (fildes,
543                                                (void *)
544                                                aiocbp->aiocb.aio_buf,
545                                                aiocbp->aiocb.aio_nbytes,
546                                                aiocbp->aiocb.aio_offset));
547
548               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
549                 /* The Linux kernel is different from others.  It returns
550                    ESPIPE if using pread on a socket.  Other platforms
551                    simply ignore the offset parameter and behave like
552                    read.  */
553                 aiocbp->aiocb.__return_value =
554                   TEMP_FAILURE_RETRY (read (fildes,
555                                             (void *) aiocbp->aiocb64.aio_buf,
556                                             aiocbp->aiocb64.aio_nbytes));
557             }
558           else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
559             {
560               if (sizeof (off_t) != sizeof (off64_t)
561                   && aiocbp->aiocb.aio_lio_opcode & 128)
562                 aiocbp->aiocb.__return_value =
563                   TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
564                                                   aiocbp->aiocb64.aio_buf,
565                                                   aiocbp->aiocb64.aio_nbytes,
566                                                   aiocbp->aiocb64.aio_offset));
567               else
568                 aiocbp->aiocb.__return_value =
569                   TEMP_FAILURE_RETRY (__pwrite (fildes, (const void *)
570                                                 aiocbp->aiocb.aio_buf,
571                                                 aiocbp->aiocb.aio_nbytes,
572                                                 aiocbp->aiocb.aio_offset));
573
574               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
575                 /* The Linux kernel is different from others.  It returns
576                    ESPIPE if using pwrite on a socket.  Other platforms
577                    simply ignore the offset parameter and behave like
578                    write.  */
579                 aiocbp->aiocb.__return_value =
580                   TEMP_FAILURE_RETRY (write (fildes,
581                                              (void *) aiocbp->aiocb64.aio_buf,
582                                              aiocbp->aiocb64.aio_nbytes));
583             }
584           else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
585             aiocbp->aiocb.__return_value =
586               TEMP_FAILURE_RETRY (fdatasync (fildes));
587           else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
588             aiocbp->aiocb.__return_value =
589               TEMP_FAILURE_RETRY (fsync (fildes));
590           else
591             {
592               /* This is an invalid opcode.  */
593               aiocbp->aiocb.__return_value = -1;
594               __set_errno (EINVAL);
595             }
596
597           /* Get the mutex.  */
598           __pthread_mutex_lock (&__aio_requests_mutex);
599
600           if (aiocbp->aiocb.__return_value == -1)
601             aiocbp->aiocb.__error_code = errno;
602           else
603             aiocbp->aiocb.__error_code = 0;
604
605           /* Send the signal to notify about finished processing of the
606              request.  */
607           __aio_notify (runp);
608
609           /* For debugging purposes we reset the running flag of the
610              finished request.  */
611           assert (runp->running == allocated);
612           runp->running = done;
613
614           /* Now dequeue the current request.  */
615           __aio_remove_request (NULL, runp, 0);
616           if (runp->next_prio != NULL)
617             add_request_to_runlist (runp->next_prio);
618
619           /* Free the old element.  */
620           __aio_free_request (runp);
621         }
622
623       runp = runlist;
624
625       /* If the runlist is empty, then we sleep for a while, waiting for
626          something to arrive in it. */
627       if (runp == NULL && optim.aio_idle_time >= 0)
628         {
629           struct timespec now;
630           struct timespec wakeup_time;
631
632           ++idle_thread_count;
633           __clock_gettime (CLOCK_REALTIME, &now);
634           wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
635           wakeup_time.tv_nsec = now.tv_nsec;
636           if (wakeup_time.tv_nsec >= 1000000000)
637             {
638               wakeup_time.tv_nsec -= 1000000000;
639               ++wakeup_time.tv_sec;
640             }
641           __pthread_cond_timedwait (&__aio_new_request_notification,
642                                     &__aio_requests_mutex,
643                                     &wakeup_time);
644           --idle_thread_count;
645           runp = runlist;
646         }
647
648       if (runp == NULL)
649         --nthreads;
650       else
651         {
652           assert (runp->running == yes);
653           runp->running = allocated;
654           runlist = runp->next_run;
655
656           /* If we have a request to process, and there's still another in
657              the run list, then we need to either wake up or create a new
658              thread to service the request that is still in the run list. */
659           if (runlist != NULL)
660             {
661               /* There are at least two items in the work queue to work on.
662                  If there are other idle threads, then we should wake them
663                  up for these other work elements; otherwise, we should try
664                  to create a new thread. */
665               if (idle_thread_count > 0)
666                 __pthread_cond_signal (&__aio_new_request_notification);
667               else if (nthreads < optim.aio_threads)
668                 {
669                   pthread_t thid;
670                   pthread_attr_t attr;
671
672                   /* Make sure the thread is created detached.  */
673                   __pthread_attr_init (&attr);
674                   __pthread_attr_setdetachstate (&attr,
675                                                  PTHREAD_CREATE_DETACHED);
676
677                   /* Now try to start a thread. If we fail, no big deal,
678                      because we know that there is at least one thread (us)
679                      that is working on AIO operations. */
680                   if (__pthread_create (&thid, &attr, handle_fildes_io, NULL)
681                       == 0)
682                     ++nthreads;
683                 }
684             }
685         }
686
687       /* Release the mutex.  */
688       __pthread_mutex_unlock (&__aio_requests_mutex);
689     }
690   while (runp != NULL);
691
692   return NULL;
693 }
694
695
696 /* Free allocated resources.  */
697 #if !PTHREAD_IN_LIBC
698 __attribute__ ((__destructor__)) static
699 #endif
700 void
701 __aio_freemem (void)
702 {
703   size_t row;
704
705   for (row = 0; row < pool_size; ++row)
706     free (pool[row]);
707
708   free (pool);
709 }
710
711
712 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
713    be correctly set to do this. Also, you had better set newrequest's
714    "running" flag to "yes" before you release your lock or you'll throw an
715    assertion. */
716 static void
717 add_request_to_runlist (struct requestlist *newrequest)
718 {
719   int prio = newrequest->aiocbp->aiocb.__abs_prio;
720   struct requestlist *runp;
721
722   if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
723     {
724       newrequest->next_run = runlist;
725       runlist = newrequest;
726     }
727   else
728     {
729       runp = runlist;
730
731       while (runp->next_run != NULL
732              && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
733         runp = runp->next_run;
734
735       newrequest->next_run = runp->next_run;
736       runp->next_run = newrequest;
737     }
738 }
739
740 #if PTHREAD_IN_LIBC
741 versioned_symbol (libc, __aio_init, aio_init, GLIBC_2_34);
742 # if OTHER_SHLIB_COMPAT (librt, GLIBC_2_1, GLIBC_2_34)
743 compat_symbol (librt, __aio_init, aio_init, GLIBC_2_1);
744 # endif
745 #else /* !PTHREAD_IN_LIBC */
746 weak_alias (__aio_init, aio_init)
747 #endif /* !PTHREAD_IN_LIBC */