poll: add API to watch for POLLPRI
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @title: GstPoll
27  * @short_description: Keep track of file descriptors and make it possible
28  *                     to wait on them in a cancellable way
29  *
30  * A #GstPoll keeps track of file descriptors much like fd_set (used with
31  * select()) or a struct pollfd array (used with poll()). Once created with
32  * gst_poll_new(), the set can be used to wait for file descriptors to be
33  * readable and/or writable. It is possible to make this wait be controlled
34  * by specifying %TRUE for the @controllable flag when creating the set (or
35  * later calling gst_poll_set_controllable()).
36  *
37  * New file descriptors are added to the set using gst_poll_add_fd(), and
38  * removed using gst_poll_remove_fd(). Controlling which file descriptors
39  * should be waited for to become readable and/or writable are done using
40  * gst_poll_fd_ctl_read(), gst_poll_fd_ctl_write() and gst_poll_fd_ctl_pri().
41  *
42  * Use gst_poll_wait() to wait for the file descriptors to actually become
43  * readable and/or writable, or to timeout if no file descriptor is available
44  * in time. The wait can be controlled by calling gst_poll_restart() and
45  * gst_poll_set_flushing().
46  *
47  * Once the file descriptor set has been waited for, one can use
48  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
49  * gst_poll_fd_has_error() to see if it has generated an error,
50  * gst_poll_fd_can_read() to see if it is possible to read from the file
51  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
52  * write to it.
53  *
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include "gst_private.h"
61 #include "glib-compat-private.h"
62
63 #include <sys/types.h>
64
65 #ifdef HAVE_UNISTD_H
66 #include <unistd.h>
67 #endif
68
69 #include <errno.h>
70 #include <fcntl.h>
71
72 #include <glib.h>
73
74 #ifdef G_OS_WIN32
75 #include <winsock2.h>
76 #else
77 #define _GNU_SOURCE 1
78 #ifdef HAVE_SYS_POLL_H
79 #include <sys/poll.h>
80 #endif
81 #ifdef HAVE_POLL_H
82 #include <poll.h>
83 #endif
84 #include <sys/time.h>
85 #include <sys/socket.h>
86 #endif
87
88 #ifdef G_OS_WIN32
89 #  ifndef EWOULDBLOCK
90 #  define EWOULDBLOCK EAGAIN    /* This is just to placate gcc */
91 #  endif
92 #endif /* G_OS_WIN32 */
93
94 /* OS/X needs this because of bad headers */
95 #include <string.h>
96
97 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
98  * so we prefer our own poll emulation.
99  */
100 #if defined(BROKEN_POLL)
101 #undef HAVE_POLL
102 #endif
103
104 #include "gstpoll.h"
105
106 #define GST_CAT_DEFAULT GST_CAT_POLL
107
108 #ifdef G_OS_WIN32
109 typedef struct _WinsockFd WinsockFd;
110
111 struct _WinsockFd
112 {
113   gint fd;
114   glong event_mask;
115   WSANETWORKEVENTS events;
116   glong ignored_event_mask;
117 };
118 #endif
119
120 typedef enum
121 {
122   GST_POLL_MODE_AUTO,
123   GST_POLL_MODE_SELECT,
124   GST_POLL_MODE_PSELECT,
125   GST_POLL_MODE_POLL,
126   GST_POLL_MODE_PPOLL,
127   GST_POLL_MODE_WINDOWS
128 } GstPollMode;
129
130 struct _GstPoll
131 {
132   GstPollMode mode;
133
134   GMutex lock;
135   /* array of fds, always written to and read from with lock */
136   GArray *fds;
137   /* array of active fds, only written to from the waiting thread with the
138    * lock and read from with the lock or without the lock from the waiting
139    * thread */
140   GArray *active_fds;
141
142 #ifndef G_OS_WIN32
143   GstPollFD control_read_fd;
144   GstPollFD control_write_fd;
145 #else
146   GArray *active_fds_ignored;
147   GArray *events;
148   GArray *active_events;
149
150   HANDLE wakeup_event;
151 #endif
152
153   gboolean controllable;
154   volatile gint waiting;
155   volatile gint control_pending;
156   volatile gint flushing;
157   gboolean timer;
158   volatile gint rebuild;
159 };
160
161 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
162     gboolean active);
163 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
164
165 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
166 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
167
168 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
169 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
170 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
171
172 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
173 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
174
175 #ifndef G_OS_WIN32
176
177 static gboolean
178 wake_event (GstPoll * set)
179 {
180   ssize_t num_written;
181   while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
182     if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
183       g_critical ("%p: failed to wake event: %s", set, strerror (errno));
184       return FALSE;
185     }
186   }
187   return TRUE;
188 }
189
190 static gboolean
191 release_event (GstPoll * set)
192 {
193   gchar buf[1] = { '\0' };
194   ssize_t num_read;
195   while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
196     if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
197       g_critical ("%p: failed to release event: %s", set, strerror (errno));
198       return FALSE;
199     }
200   }
201   return TRUE;
202 }
203
204 #else
205
206 static void
207 format_last_error (gchar * buf, size_t buf_len)
208 {
209   DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
210   LPCVOID src = NULL;
211   DWORD lang = 0;
212   DWORD id;
213   id = GetLastError ();
214   FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
215   SetLastError (id);
216 }
217
218 static gboolean
219 wake_event (GstPoll * set)
220 {
221   SetLastError (0);
222   errno = 0;
223   if (!SetEvent (set->wakeup_event)) {
224     gchar msg[1024] = "<unknown>";
225     format_last_error (msg, sizeof (msg));
226     g_critical ("%p: failed to set wakup_event: %s", set, msg);
227     errno = EBADF;
228     return FALSE;
229   }
230
231   return TRUE;
232 }
233
234 static gboolean
235 release_event (GstPoll * set)
236 {
237   DWORD status;
238   SetLastError (0);
239   errno = 0;
240
241   status = WaitForSingleObject (set->wakeup_event, INFINITE);
242   if (status) {
243     const gchar *reason = "unknown";
244     gchar msg[1024] = "<unknown>";
245     switch (status) {
246       case WAIT_ABANDONED:
247         reason = "WAIT_ABANDONED";
248         break;
249       case WAIT_TIMEOUT:
250         reason = "WAIT_TIMEOUT";
251         break;
252       case WAIT_FAILED:
253         format_last_error (msg, sizeof (msg));
254         reason = msg;
255         break;
256       default:
257         reason = "other";
258         break;
259     }
260     g_critical ("%p: failed to block on wakup_event: %s", set, reason);
261     errno = EBADF;
262     return FALSE;
263   }
264
265   if (!ResetEvent (set->wakeup_event)) {
266     gchar msg[1024] = "<unknown>";
267     format_last_error (msg, sizeof (msg));
268     g_critical ("%p: failed to reset wakup_event: %s", set, msg);
269     errno = EBADF;
270     return FALSE;
271   }
272
273   return TRUE;
274 }
275
276 #endif
277
278 /* the poll/select call is also performed on a control socket, that way
279  * we can send special commands to control it */
280 static inline gboolean
281 raise_wakeup (GstPoll * set)
282 {
283   gboolean result = TRUE;
284
285   /* makes testing control_pending and WAKE_EVENT() atomic. */
286   g_mutex_lock (&set->lock);
287
288   if (set->control_pending == 0) {
289     /* raise when nothing pending */
290     GST_LOG ("%p: raise", set);
291     result = wake_event (set);
292   }
293
294   if (result) {
295     set->control_pending++;
296   }
297
298   g_mutex_unlock (&set->lock);
299
300   return result;
301 }
302
303 static inline gboolean
304 release_wakeup (GstPoll * set)
305 {
306   gboolean result = FALSE;
307
308   /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
309   g_mutex_lock (&set->lock);
310
311   if (set->control_pending > 0) {
312     /* release, only if this was the last pending. */
313     if (set->control_pending == 1) {
314       GST_LOG ("%p: release", set);
315       result = release_event (set);
316     } else {
317       result = TRUE;
318     }
319
320     if (result) {
321       set->control_pending--;
322     }
323   } else {
324     errno = EWOULDBLOCK;
325   }
326
327   g_mutex_unlock (&set->lock);
328
329   return result;
330 }
331
332 static inline gint
333 release_all_wakeup (GstPoll * set)
334 {
335   gint old;
336
337   /* makes testing control_pending and RELEASE_EVENT() atomic. */
338   g_mutex_lock (&set->lock);
339
340   if ((old = set->control_pending) > 0) {
341     GST_LOG ("%p: releasing %d", set, old);
342     if (release_event (set)) {
343       set->control_pending = 0;
344     } else {
345       old = 0;
346     }
347   }
348
349   g_mutex_unlock (&set->lock);
350
351   return old;
352 }
353
354 static gint
355 find_index (GArray * array, GstPollFD * fd)
356 {
357 #ifndef G_OS_WIN32
358   struct pollfd *ifd;
359 #else
360   WinsockFd *ifd;
361 #endif
362   guint i;
363
364   /* start by assuming the index found in the fd is still valid */
365   if (fd->idx >= 0 && fd->idx < array->len) {
366 #ifndef G_OS_WIN32
367     ifd = &g_array_index (array, struct pollfd, fd->idx);
368 #else
369     ifd = &g_array_index (array, WinsockFd, fd->idx);
370 #endif
371
372     if (ifd->fd == fd->fd) {
373       return fd->idx;
374     }
375   }
376
377   /* the pollfd array has changed and we need to lookup the fd again */
378   for (i = 0; i < array->len; i++) {
379 #ifndef G_OS_WIN32
380     ifd = &g_array_index (array, struct pollfd, i);
381 #else
382     ifd = &g_array_index (array, WinsockFd, i);
383 #endif
384
385     if (ifd->fd == fd->fd) {
386       fd->idx = (gint) i;
387       return fd->idx;
388     }
389   }
390
391   fd->idx = -1;
392   return fd->idx;
393 }
394
395 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
396 /* check if all file descriptors will fit in an fd_set */
397 static gboolean
398 selectable_fds (GstPoll * set)
399 {
400   guint i;
401
402   g_mutex_lock (&set->lock);
403   for (i = 0; i < set->fds->len; i++) {
404     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
405
406     if (pfd->fd >= FD_SETSIZE)
407       goto too_many;
408   }
409   g_mutex_unlock (&set->lock);
410
411   return TRUE;
412
413 too_many:
414   {
415     g_mutex_unlock (&set->lock);
416     return FALSE;
417   }
418 }
419
420 /* check if the timeout will convert to a timeout value used for poll()
421  * without a loss of precision
422  */
423 static gboolean
424 pollable_timeout (GstClockTime timeout)
425 {
426   if (timeout == GST_CLOCK_TIME_NONE)
427     return TRUE;
428
429   /* not a nice multiple of milliseconds */
430   if (timeout % 1000000)
431     return FALSE;
432
433   return TRUE;
434 }
435 #endif
436
437 static GstPollMode
438 choose_mode (GstPoll * set, GstClockTime timeout)
439 {
440   GstPollMode mode;
441
442   if (set->mode == GST_POLL_MODE_AUTO) {
443 #ifdef HAVE_PPOLL
444     mode = GST_POLL_MODE_PPOLL;
445 #elif defined(HAVE_POLL)
446     if (!selectable_fds (set) || pollable_timeout (timeout)) {
447       mode = GST_POLL_MODE_POLL;
448     } else {
449 #ifdef HAVE_PSELECT
450       mode = GST_POLL_MODE_PSELECT;
451 #else
452       mode = GST_POLL_MODE_SELECT;
453 #endif
454     }
455 #elif defined(HAVE_PSELECT)
456     mode = GST_POLL_MODE_PSELECT;
457 #else
458     mode = GST_POLL_MODE_SELECT;
459 #endif
460   } else {
461     mode = set->mode;
462   }
463   return mode;
464 }
465
466 #ifndef G_OS_WIN32
467 static gint
468 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
469     fd_set * errorfds)
470 {
471   gint max_fd = -1;
472   guint i;
473
474   FD_ZERO (readfds);
475   FD_ZERO (writefds);
476   FD_ZERO (errorfds);
477
478   g_mutex_lock (&set->lock);
479
480   for (i = 0; i < set->active_fds->len; i++) {
481     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
482
483     if (pfd->fd < FD_SETSIZE) {
484       if (pfd->events & POLLIN)
485         FD_SET (pfd->fd, readfds);
486       if (pfd->events & POLLOUT)
487         FD_SET (pfd->fd, writefds);
488       if (pfd->events)
489         FD_SET (pfd->fd, errorfds);
490       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
491         max_fd = pfd->fd;
492     }
493   }
494
495   g_mutex_unlock (&set->lock);
496
497   return max_fd;
498 }
499
500 static void
501 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
502     fd_set * errorfds)
503 {
504   guint i;
505
506   g_mutex_lock (&set->lock);
507
508   for (i = 0; i < set->active_fds->len; i++) {
509     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
510
511     if (pfd->fd < FD_SETSIZE) {
512       pfd->revents = 0;
513       if (FD_ISSET (pfd->fd, readfds))
514         pfd->revents |= POLLIN;
515       if (FD_ISSET (pfd->fd, writefds))
516         pfd->revents |= POLLOUT;
517       if (FD_ISSET (pfd->fd, errorfds))
518         pfd->revents |= POLLERR;
519     }
520   }
521
522   g_mutex_unlock (&set->lock);
523 }
524 #else /* G_OS_WIN32 */
525 /*
526  * Translate errors thrown by the Winsock API used by GstPoll:
527  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
528  */
529 static gint
530 gst_poll_winsock_error_to_errno (DWORD last_error)
531 {
532   switch (last_error) {
533     case WSA_INVALID_HANDLE:
534     case WSAEINVAL:
535     case WSAENOTSOCK:
536       return EBADF;
537
538     case WSA_NOT_ENOUGH_MEMORY:
539       return ENOMEM;
540
541       /*
542        * Anything else, including:
543        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
544        *   WSANOTINITIALISED
545        */
546     default:
547       return EINVAL;
548   }
549 }
550
551 static void
552 gst_poll_free_winsock_event (GstPoll * set, gint idx)
553 {
554   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
555   HANDLE event = g_array_index (set->events, HANDLE, idx);
556
557   WSAEventSelect (wfd->fd, event, 0);
558   CloseHandle (event);
559 }
560
561 static void
562 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
563     gboolean active)
564 {
565   WinsockFd *wfd;
566
567   wfd = &g_array_index (set->fds, WinsockFd, idx);
568
569   if (active)
570     wfd->event_mask |= flags;
571   else
572     wfd->event_mask &= ~flags;
573
574   /* reset ignored state if the new mask doesn't overlap at all */
575   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
576     wfd->ignored_event_mask = 0;
577 }
578
579 static gboolean
580 gst_poll_prepare_winsock_active_sets (GstPoll * set)
581 {
582   guint i;
583
584   g_array_set_size (set->active_fds, 0);
585   g_array_set_size (set->active_fds_ignored, 0);
586   g_array_set_size (set->active_events, 0);
587   g_array_append_val (set->active_events, set->wakeup_event);
588
589   for (i = 0; i < set->fds->len; i++) {
590     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
591     HANDLE event = g_array_index (set->events, HANDLE, i);
592
593     if (wfd->ignored_event_mask == 0) {
594       gint ret;
595
596       g_array_append_val (set->active_fds, *wfd);
597       g_array_append_val (set->active_events, event);
598
599       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
600       if (G_UNLIKELY (ret != 0)) {
601         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
602         return FALSE;
603       }
604     } else {
605       g_array_append_val (set->active_fds_ignored, wfd);
606     }
607   }
608
609   return TRUE;
610 }
611
612 static gint
613 gst_poll_collect_winsock_events (GstPoll * set)
614 {
615   gint res, i;
616
617   /*
618    * We need to check which events are signaled, and call
619    * WSAEnumNetworkEvents for those that are, which resets
620    * the event and clears the internal network event records.
621    */
622   res = 0;
623   for (i = 0; i < set->active_fds->len; i++) {
624     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
625     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
626     DWORD wait_ret;
627
628     wait_ret = WaitForSingleObject (event, 0);
629     if (wait_ret == WAIT_OBJECT_0) {
630       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
631
632       if (G_UNLIKELY (enum_ret != 0)) {
633         res = -1;
634         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
635         break;
636       }
637
638       res++;
639     } else {
640       /* clear any previously stored result */
641       memset (&wfd->events, 0, sizeof (wfd->events));
642     }
643   }
644
645   /* If all went well we also need to reset the ignored fds. */
646   if (res >= 0) {
647     res += set->active_fds_ignored->len;
648
649     for (i = 0; i < set->active_fds_ignored->len; i++) {
650       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
651
652       wfd->ignored_event_mask = 0;
653     }
654
655     g_array_set_size (set->active_fds_ignored, 0);
656   }
657
658   return res;
659 }
660 #endif
661
662 /**
663  * gst_poll_new: (skip)
664  * @controllable: whether it should be possible to control a wait.
665  *
666  * Create a new file descriptor set. If @controllable, it
667  * is possible to restart or flush a call to gst_poll_wait() with
668  * gst_poll_restart() and gst_poll_set_flushing() respectively.
669  *
670  * Free-function: gst_poll_free
671  *
672  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
673  *     case of an error.  Free with gst_poll_free().
674  */
675 GstPoll *
676 gst_poll_new (gboolean controllable)
677 {
678   GstPoll *nset;
679
680   nset = g_slice_new0 (GstPoll);
681   GST_DEBUG ("%p: new controllable : %d", nset, controllable);
682   g_mutex_init (&nset->lock);
683 #ifndef G_OS_WIN32
684   nset->mode = GST_POLL_MODE_AUTO;
685   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
686   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
687   nset->control_read_fd.fd = -1;
688   nset->control_write_fd.fd = -1;
689   {
690     gint control_sock[2];
691
692     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
693       goto no_socket_pair;
694
695     nset->control_read_fd.fd = control_sock[0];
696     nset->control_write_fd.fd = control_sock[1];
697
698     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
699     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
700   }
701 #else
702   nset->mode = GST_POLL_MODE_WINDOWS;
703   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
704   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
705   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
706   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
707   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
708
709   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
710 #endif
711
712   /* ensure (re)build, though already sneakily set in non-windows case */
713   MARK_REBUILD (nset);
714
715   nset->controllable = controllable;
716   nset->control_pending = 0;
717
718   return nset;
719
720   /* ERRORS */
721 #ifndef G_OS_WIN32
722 no_socket_pair:
723   {
724     GST_WARNING ("%p: can't create socket pair !", nset);
725     gst_poll_free (nset);
726     return NULL;
727   }
728 #endif
729 }
730
731 /**
732  * gst_poll_new_timer: (skip)
733  *
734  * Create a new poll object that can be used for scheduling cancellable
735  * timeouts.
736  *
737  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
738  * performed from different threads.
739  *
740  * Free-function: gst_poll_free
741  *
742  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
743  *     case of an error.  Free with gst_poll_free().
744  */
745 GstPoll *
746 gst_poll_new_timer (void)
747 {
748   GstPoll *poll;
749
750   /* make a new controllable poll set */
751   if (!(poll = gst_poll_new (TRUE)))
752     goto done;
753
754   /* we are a timer */
755   poll->timer = TRUE;
756
757 done:
758   return poll;
759 }
760
761 /**
762  * gst_poll_free:
763  * @set: (transfer full): a file descriptor set.
764  *
765  * Free a file descriptor set.
766  */
767 void
768 gst_poll_free (GstPoll * set)
769 {
770   g_return_if_fail (set != NULL);
771
772   GST_DEBUG ("%p: freeing", set);
773
774 #ifndef G_OS_WIN32
775   if (set->control_write_fd.fd >= 0)
776     close (set->control_write_fd.fd);
777   if (set->control_read_fd.fd >= 0)
778     close (set->control_read_fd.fd);
779 #else
780   CloseHandle (set->wakeup_event);
781
782   {
783     guint i;
784
785     for (i = 0; i < set->events->len; i++)
786       gst_poll_free_winsock_event (set, i);
787   }
788
789   g_array_free (set->active_events, TRUE);
790   g_array_free (set->events, TRUE);
791   g_array_free (set->active_fds_ignored, TRUE);
792 #endif
793
794   g_array_free (set->active_fds, TRUE);
795   g_array_free (set->fds, TRUE);
796   g_mutex_clear (&set->lock);
797   g_slice_free (GstPoll, set);
798 }
799
800 /**
801  * gst_poll_get_read_gpollfd:
802  * @set: a #GstPoll
803  * @fd: a #GPollFD
804  *
805  * Get a GPollFD for the reading part of the control socket. This is useful when
806  * integrating with a GSource and GMainLoop.
807  */
808 void
809 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
810 {
811   g_return_if_fail (set != NULL);
812   g_return_if_fail (fd != NULL);
813
814 #ifndef G_OS_WIN32
815   fd->fd = set->control_read_fd.fd;
816 #else
817 #if GLIB_SIZEOF_VOID_P == 8
818   fd->fd = (gint64) set->wakeup_event;
819 #else
820   fd->fd = (gint) set->wakeup_event;
821 #endif
822 #endif
823   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
824   fd->revents = 0;
825 }
826
827 /**
828  * gst_poll_fd_init:
829  * @fd: a #GstPollFD
830  *
831  * Initializes @fd. Alternatively you can initialize it with
832  * #GST_POLL_FD_INIT.
833  */
834 void
835 gst_poll_fd_init (GstPollFD * fd)
836 {
837   g_return_if_fail (fd != NULL);
838
839   fd->fd = -1;
840   fd->idx = -1;
841 }
842
843 static gboolean
844 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
845 {
846   gint idx;
847
848   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
849
850   idx = find_index (set->fds, fd);
851   if (idx < 0) {
852 #ifndef G_OS_WIN32
853     struct pollfd nfd;
854
855     nfd.fd = fd->fd;
856     nfd.events = POLLERR | POLLNVAL | POLLHUP;
857     nfd.revents = 0;
858
859     g_array_append_val (set->fds, nfd);
860
861     fd->idx = set->fds->len - 1;
862 #else
863     WinsockFd wfd;
864     HANDLE event;
865
866     wfd.fd = fd->fd;
867     wfd.event_mask = FD_CLOSE;
868     memset (&wfd.events, 0, sizeof (wfd.events));
869     wfd.ignored_event_mask = 0;
870     event = WSACreateEvent ();
871
872     g_array_append_val (set->fds, wfd);
873     g_array_append_val (set->events, event);
874
875     fd->idx = set->fds->len - 1;
876 #endif
877     MARK_REBUILD (set);
878   } else {
879     GST_WARNING ("%p: fd already added !", set);
880   }
881
882   return TRUE;
883 }
884
885 /**
886  * gst_poll_add_fd:
887  * @set: a file descriptor set.
888  * @fd: a file descriptor.
889  *
890  * Add a file descriptor to the file descriptor set.
891  *
892  * Returns: %TRUE if the file descriptor was successfully added to the set.
893  */
894 gboolean
895 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
896 {
897   gboolean ret;
898
899   g_return_val_if_fail (set != NULL, FALSE);
900   g_return_val_if_fail (fd != NULL, FALSE);
901   g_return_val_if_fail (fd->fd >= 0, FALSE);
902
903   g_mutex_lock (&set->lock);
904
905   ret = gst_poll_add_fd_unlocked (set, fd);
906
907   g_mutex_unlock (&set->lock);
908
909   return ret;
910 }
911
912 /**
913  * gst_poll_remove_fd:
914  * @set: a file descriptor set.
915  * @fd: a file descriptor.
916  *
917  * Remove a file descriptor from the file descriptor set.
918  *
919  * Returns: %TRUE if the file descriptor was successfully removed from the set.
920  */
921 gboolean
922 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
923 {
924   gint idx;
925
926   g_return_val_if_fail (set != NULL, FALSE);
927   g_return_val_if_fail (fd != NULL, FALSE);
928   g_return_val_if_fail (fd->fd >= 0, FALSE);
929
930
931   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
932
933   g_mutex_lock (&set->lock);
934
935   /* get the index, -1 is an fd that is not added */
936   idx = find_index (set->fds, fd);
937   if (idx >= 0) {
938 #ifdef G_OS_WIN32
939     gst_poll_free_winsock_event (set, idx);
940     g_array_remove_index_fast (set->events, idx);
941 #endif
942
943     /* remove the fd at index, we use _remove_index_fast, which copies the last
944      * element of the array to the freed index */
945     g_array_remove_index_fast (set->fds, idx);
946
947     /* mark fd as removed by setting the index to -1 */
948     fd->idx = -1;
949     MARK_REBUILD (set);
950   } else {
951     GST_WARNING ("%p: couldn't find fd !", set);
952   }
953
954   g_mutex_unlock (&set->lock);
955
956   return idx >= 0;
957 }
958
959 /**
960  * gst_poll_fd_ctl_write:
961  * @set: a file descriptor set.
962  * @fd: a file descriptor.
963  * @active: a new status.
964  *
965  * Control whether the descriptor @fd in @set will be monitored for
966  * writability.
967  *
968  * Returns: %TRUE if the descriptor was successfully updated.
969  */
970 gboolean
971 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
972 {
973   gint idx;
974
975   g_return_val_if_fail (set != NULL, FALSE);
976   g_return_val_if_fail (fd != NULL, FALSE);
977   g_return_val_if_fail (fd->fd >= 0, FALSE);
978
979   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
980       fd->fd, fd->idx, active);
981
982   g_mutex_lock (&set->lock);
983
984   idx = find_index (set->fds, fd);
985   if (idx >= 0) {
986 #ifndef G_OS_WIN32
987     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
988
989     if (active)
990       pfd->events |= POLLOUT;
991     else
992       pfd->events &= ~POLLOUT;
993
994     GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
995 #else
996     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
997         active);
998 #endif
999     MARK_REBUILD (set);
1000   } else {
1001     GST_WARNING ("%p: couldn't find fd !", set);
1002   }
1003
1004   g_mutex_unlock (&set->lock);
1005
1006   return idx >= 0;
1007 }
1008
1009 static gboolean
1010 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
1011 {
1012   gint idx;
1013
1014   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1015       fd->fd, fd->idx, active);
1016
1017   idx = find_index (set->fds, fd);
1018
1019   if (idx >= 0) {
1020 #ifndef G_OS_WIN32
1021     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1022
1023     if (active)
1024       pfd->events |= POLLIN;
1025     else
1026       pfd->events &= ~POLLIN;
1027 #else
1028     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
1029 #endif
1030     MARK_REBUILD (set);
1031   } else {
1032     GST_WARNING ("%p: couldn't find fd !", set);
1033   }
1034
1035   return idx >= 0;
1036 }
1037
1038 /**
1039  * gst_poll_fd_ctl_read:
1040  * @set: a file descriptor set.
1041  * @fd: a file descriptor.
1042  * @active: a new status.
1043  *
1044  * Control whether the descriptor @fd in @set will be monitored for
1045  * readability.
1046  *
1047  * Returns: %TRUE if the descriptor was successfully updated.
1048  */
1049 gboolean
1050 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
1051 {
1052   gboolean ret;
1053
1054   g_return_val_if_fail (set != NULL, FALSE);
1055   g_return_val_if_fail (fd != NULL, FALSE);
1056   g_return_val_if_fail (fd->fd >= 0, FALSE);
1057
1058   g_mutex_lock (&set->lock);
1059
1060   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
1061
1062   g_mutex_unlock (&set->lock);
1063
1064   return ret;
1065 }
1066
1067 /**
1068  * gst_poll_fd_ctl_pri:
1069  * @set: a file descriptor set.
1070  * @fd: a file descriptor.
1071  * @active: a new status.
1072  *
1073  * Control whether the descriptor @fd in @set will be monitored for
1074  * exceptional conditions (POLLPRI).
1075  *
1076  * Not available on Windows.
1077  *
1078  * Returns: %TRUE if the descriptor was successfully updated.
1079  * Since: 1.16
1080  */
1081 gboolean
1082 gst_poll_fd_ctl_pri (GstPoll * set, GstPollFD * fd, gboolean active)
1083 {
1084 #ifdef G_OS_WIN32
1085   return FALSE;
1086 #else
1087   gint idx;
1088
1089   g_return_val_if_fail (set != NULL, FALSE);
1090   g_return_val_if_fail (fd != NULL, FALSE);
1091   g_return_val_if_fail (fd->fd >= 0, FALSE);
1092
1093   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1094       fd->fd, fd->idx, active);
1095
1096   g_mutex_lock (&set->lock);
1097
1098   idx = find_index (set->fds, fd);
1099   if (idx >= 0) {
1100     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1101
1102     if (active)
1103       pfd->events |= POLLPRI;
1104     else
1105       pfd->events &= ~POLLPRI;
1106
1107     GST_LOG ("%p: pfd->events now %d (POLLPRI:%d)", set, pfd->events, POLLOUT);
1108     MARK_REBUILD (set);
1109   } else {
1110     GST_WARNING ("%p: couldn't find fd !", set);
1111   }
1112
1113   g_mutex_unlock (&set->lock);
1114
1115   return idx >= 0;
1116 #endif
1117 }
1118
1119 /**
1120  * gst_poll_fd_ignored:
1121  * @set: a file descriptor set.
1122  * @fd: a file descriptor.
1123  *
1124  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
1125  * the same result for @fd as last time. This function must be called if no
1126  * operation (read/write/recv/send/etc.) will be performed on @fd before
1127  * the next call to gst_poll_wait().
1128  *
1129  * The reason why this is needed is because the underlying implementation
1130  * might not allow querying the fd more than once between calls to one of
1131  * the re-enabling operations.
1132  */
1133 void
1134 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
1135 {
1136 #ifdef G_OS_WIN32
1137   gint idx;
1138
1139   g_return_if_fail (set != NULL);
1140   g_return_if_fail (fd != NULL);
1141   g_return_if_fail (fd->fd >= 0);
1142
1143   g_mutex_lock (&set->lock);
1144
1145   idx = find_index (set->fds, fd);
1146   if (idx >= 0) {
1147     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
1148
1149     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
1150     MARK_REBUILD (set);
1151   }
1152
1153   g_mutex_unlock (&set->lock);
1154 #endif
1155 }
1156
1157 /**
1158  * gst_poll_fd_has_closed:
1159  * @set: a file descriptor set.
1160  * @fd: a file descriptor.
1161  *
1162  * Check if @fd in @set has closed the connection.
1163  *
1164  * Returns: %TRUE if the connection was closed.
1165  */
1166 gboolean
1167 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1168 {
1169   gboolean res = FALSE;
1170   gint idx;
1171
1172   g_return_val_if_fail (set != NULL, FALSE);
1173   g_return_val_if_fail (fd != NULL, FALSE);
1174   g_return_val_if_fail (fd->fd >= 0, FALSE);
1175
1176   g_mutex_lock (&((GstPoll *) set)->lock);
1177
1178   idx = find_index (set->active_fds, fd);
1179   if (idx >= 0) {
1180 #ifndef G_OS_WIN32
1181     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1182
1183     res = (pfd->revents & POLLHUP) != 0;
1184 #else
1185     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1186
1187     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1188 #endif
1189   } else {
1190     GST_WARNING ("%p: couldn't find fd !", set);
1191   }
1192   g_mutex_unlock (&((GstPoll *) set)->lock);
1193
1194   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1195
1196   return res;
1197 }
1198
1199 /**
1200  * gst_poll_fd_has_error:
1201  * @set: a file descriptor set.
1202  * @fd: a file descriptor.
1203  *
1204  * Check if @fd in @set has an error.
1205  *
1206  * Returns: %TRUE if the descriptor has an error.
1207  */
1208 gboolean
1209 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1210 {
1211   gboolean res = FALSE;
1212   gint idx;
1213
1214   g_return_val_if_fail (set != NULL, FALSE);
1215   g_return_val_if_fail (fd != NULL, FALSE);
1216   g_return_val_if_fail (fd->fd >= 0, FALSE);
1217
1218   g_mutex_lock (&((GstPoll *) set)->lock);
1219
1220   idx = find_index (set->active_fds, fd);
1221   if (idx >= 0) {
1222 #ifndef G_OS_WIN32
1223     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1224
1225     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1226 #else
1227     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1228
1229     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1230         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1231         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1232         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1233         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1234 #endif
1235   } else {
1236     GST_WARNING ("%p: couldn't find fd !", set);
1237   }
1238   g_mutex_unlock (&((GstPoll *) set)->lock);
1239
1240   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1241
1242   return res;
1243 }
1244
1245 static gboolean
1246 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1247 {
1248   gboolean res = FALSE;
1249   gint idx;
1250
1251   idx = find_index (set->active_fds, fd);
1252   if (idx >= 0) {
1253 #ifndef G_OS_WIN32
1254     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1255
1256     res = (pfd->revents & POLLIN) != 0;
1257 #else
1258     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1259
1260     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1261 #endif
1262   } else {
1263     GST_WARNING ("%p: couldn't find fd !", set);
1264   }
1265   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1266
1267   return res;
1268 }
1269
1270 /**
1271  * gst_poll_fd_can_read:
1272  * @set: a file descriptor set.
1273  * @fd: a file descriptor.
1274  *
1275  * Check if @fd in @set has data to be read.
1276  *
1277  * Returns: %TRUE if the descriptor has data to be read.
1278  */
1279 gboolean
1280 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1281 {
1282   gboolean res = FALSE;
1283
1284   g_return_val_if_fail (set != NULL, FALSE);
1285   g_return_val_if_fail (fd != NULL, FALSE);
1286   g_return_val_if_fail (fd->fd >= 0, FALSE);
1287
1288   g_mutex_lock (&((GstPoll *) set)->lock);
1289
1290   res = gst_poll_fd_can_read_unlocked (set, fd);
1291
1292   g_mutex_unlock (&((GstPoll *) set)->lock);
1293
1294   return res;
1295 }
1296
1297 /**
1298  * gst_poll_fd_can_write:
1299  * @set: a file descriptor set.
1300  * @fd: a file descriptor.
1301  *
1302  * Check if @fd in @set can be used for writing.
1303  *
1304  * Returns: %TRUE if the descriptor can be used for writing.
1305  */
1306 gboolean
1307 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1308 {
1309   gboolean res = FALSE;
1310   gint idx;
1311
1312   g_return_val_if_fail (set != NULL, FALSE);
1313   g_return_val_if_fail (fd != NULL, FALSE);
1314   g_return_val_if_fail (fd->fd >= 0, FALSE);
1315
1316   g_mutex_lock (&((GstPoll *) set)->lock);
1317
1318   idx = find_index (set->active_fds, fd);
1319   if (idx >= 0) {
1320 #ifndef G_OS_WIN32
1321     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1322
1323     res = (pfd->revents & POLLOUT) != 0;
1324 #else
1325     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1326
1327     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1328 #endif
1329   } else {
1330     GST_WARNING ("%p: couldn't find fd !", set);
1331   }
1332   g_mutex_unlock (&((GstPoll *) set)->lock);
1333
1334   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1335
1336   return res;
1337 }
1338
1339 /**
1340  * gst_poll_fd_has_pri:
1341  * @set: a file descriptor set.
1342  * @fd: a file descriptor.
1343  *
1344  * Check if @fd in @set has an exceptional condition (POLLPRI).
1345  *
1346  * Not available on Windows.
1347  *
1348  * Returns: %TRUE if the descriptor has an exceptional condition.
1349  * Since: 1.16
1350  */
1351 gboolean
1352 gst_poll_fd_has_pri (const GstPoll * set, GstPollFD * fd)
1353 {
1354 #ifdef G_OS_WIN32
1355   return FALSE;
1356 #else
1357   gboolean res = FALSE;
1358   gint idx;
1359
1360   g_return_val_if_fail (set != NULL, FALSE);
1361   g_return_val_if_fail (fd != NULL, FALSE);
1362   g_return_val_if_fail (fd->fd >= 0, FALSE);
1363
1364   g_mutex_lock (&((GstPoll *) set)->lock);
1365
1366   idx = find_index (set->active_fds, fd);
1367   if (idx >= 0) {
1368     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1369
1370     res = (pfd->revents & POLLPRI) != 0;
1371   } else {
1372     GST_WARNING ("%p: couldn't find fd !", set);
1373   }
1374   g_mutex_unlock (&((GstPoll *) set)->lock);
1375
1376   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1377
1378   return res;
1379 #endif
1380 }
1381
1382 /**
1383  * gst_poll_wait:
1384  * @set: a #GstPoll.
1385  * @timeout: a timeout in nanoseconds.
1386  *
1387  * Wait for activity on the file descriptors in @set. This function waits up to
1388  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1389  *
1390  * For #GstPoll objects created with gst_poll_new(), this function can only be
1391  * called from a single thread at a time.  If called from multiple threads,
1392  * -1 will be returned with errno set to EPERM.
1393  *
1394  * This is not true for timer #GstPoll objects created with
1395  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1396  * simultaneously.
1397  *
1398  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1399  * activity was detected after @timeout. If an error occurs, -1 is returned
1400  * and errno is set.
1401  */
1402 gint
1403 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1404 {
1405   gboolean restarting;
1406   gboolean is_timer;
1407   int res;
1408   gint old_waiting;
1409
1410   g_return_val_if_fail (set != NULL, -1);
1411
1412   GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1413
1414   is_timer = set->timer;
1415
1416   /* add one more waiter */
1417   old_waiting = INC_WAITING (set);
1418
1419   /* we cannot wait from multiple threads unless we are a timer */
1420   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1421     goto already_waiting;
1422
1423   /* flushing, exit immediately */
1424   if (G_UNLIKELY (IS_FLUSHING (set)))
1425     goto flushing;
1426
1427   do {
1428     GstPollMode mode;
1429
1430     res = -1;
1431     restarting = FALSE;
1432
1433     mode = choose_mode (set, timeout);
1434
1435     if (TEST_REBUILD (set)) {
1436       g_mutex_lock (&set->lock);
1437 #ifndef G_OS_WIN32
1438       g_array_set_size (set->active_fds, set->fds->len);
1439       memcpy (set->active_fds->data, set->fds->data,
1440           set->fds->len * sizeof (struct pollfd));
1441 #else
1442       if (!gst_poll_prepare_winsock_active_sets (set))
1443         goto winsock_error;
1444 #endif
1445       g_mutex_unlock (&set->lock);
1446     }
1447
1448     switch (mode) {
1449       case GST_POLL_MODE_AUTO:
1450         g_assert_not_reached ();
1451         break;
1452       case GST_POLL_MODE_PPOLL:
1453       {
1454 #ifdef HAVE_PPOLL
1455         struct timespec ts;
1456         struct timespec *tsptr;
1457
1458         if (timeout != GST_CLOCK_TIME_NONE) {
1459           GST_TIME_TO_TIMESPEC (timeout, ts);
1460           tsptr = &ts;
1461         } else {
1462           tsptr = NULL;
1463         }
1464
1465         res =
1466             ppoll ((struct pollfd *) set->active_fds->data,
1467             set->active_fds->len, tsptr, NULL);
1468 #else
1469         g_assert_not_reached ();
1470         errno = ENOSYS;
1471 #endif
1472         break;
1473       }
1474       case GST_POLL_MODE_POLL:
1475       {
1476 #ifdef HAVE_POLL
1477         gint t;
1478
1479         if (timeout != GST_CLOCK_TIME_NONE) {
1480           t = GST_TIME_AS_MSECONDS (timeout);
1481         } else {
1482           t = -1;
1483         }
1484
1485         res =
1486             poll ((struct pollfd *) set->active_fds->data,
1487             set->active_fds->len, t);
1488 #else
1489         g_assert_not_reached ();
1490         errno = ENOSYS;
1491 #endif
1492         break;
1493       }
1494       case GST_POLL_MODE_PSELECT:
1495 #ifndef HAVE_PSELECT
1496       {
1497         g_assert_not_reached ();
1498         errno = ENOSYS;
1499         break;
1500       }
1501 #endif
1502       case GST_POLL_MODE_SELECT:
1503       {
1504 #ifndef G_OS_WIN32
1505         fd_set readfds;
1506         fd_set writefds;
1507         fd_set errorfds;
1508         gint max_fd;
1509
1510         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1511
1512         if (mode == GST_POLL_MODE_SELECT) {
1513           struct timeval tv;
1514           struct timeval *tvptr;
1515
1516           if (timeout != GST_CLOCK_TIME_NONE) {
1517             GST_TIME_TO_TIMEVAL (timeout, tv);
1518             tvptr = &tv;
1519           } else {
1520             tvptr = NULL;
1521           }
1522
1523           GST_DEBUG ("%p: Calling select", set);
1524           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1525           GST_DEBUG ("%p: After select, res:%d", set, res);
1526         } else {
1527 #ifdef HAVE_PSELECT
1528           struct timespec ts;
1529           struct timespec *tsptr;
1530
1531           if (timeout != GST_CLOCK_TIME_NONE) {
1532             GST_TIME_TO_TIMESPEC (timeout, ts);
1533             tsptr = &ts;
1534           } else {
1535             tsptr = NULL;
1536           }
1537
1538           GST_DEBUG ("%p: Calling pselect", set);
1539           res =
1540               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1541           GST_DEBUG ("%p: After pselect, res:%d", set, res);
1542 #endif
1543         }
1544
1545         if (res >= 0) {
1546           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1547         }
1548 #else /* G_OS_WIN32 */
1549         g_assert_not_reached ();
1550         errno = ENOSYS;
1551 #endif
1552         break;
1553       }
1554       case GST_POLL_MODE_WINDOWS:
1555       {
1556 #ifdef G_OS_WIN32
1557         gint ignore_count = set->active_fds_ignored->len;
1558         DWORD t, wait_ret;
1559
1560         if (G_LIKELY (ignore_count == 0)) {
1561           if (timeout != GST_CLOCK_TIME_NONE)
1562             t = GST_TIME_AS_MSECONDS (timeout);
1563           else
1564             t = INFINITE;
1565         } else {
1566           /* already one or more ignored fds, so we quickly sweep the others */
1567           t = 0;
1568         }
1569
1570         if (set->active_events->len != 0) {
1571           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1572               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1573         } else {
1574           wait_ret = WSA_WAIT_FAILED;
1575           WSASetLastError (WSA_INVALID_PARAMETER);
1576         }
1577
1578         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1579           res = 0;
1580         } else if (wait_ret == WSA_WAIT_FAILED) {
1581           res = -1;
1582           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1583         } else {
1584           /* the first entry is the wakeup event */
1585           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1586             res = gst_poll_collect_winsock_events (set);
1587           } else {
1588             res = 1;            /* wakeup event */
1589           }
1590         }
1591 #else
1592         g_assert_not_reached ();
1593         errno = ENOSYS;
1594 #endif
1595         break;
1596       }
1597     }
1598
1599     if (!is_timer) {
1600       /* Applications needs to clear the control socket themselves for timer
1601        * polls.
1602        * For other polls, we need to clear the control socket. If there was only
1603        * one socket with activity and it was the control socket, we need to
1604        * restart */
1605       if (release_all_wakeup (set) > 0 && res == 1)
1606         restarting = TRUE;
1607     }
1608
1609     /* we got woken up and we are flushing, we need to stop */
1610     if (G_UNLIKELY (IS_FLUSHING (set)))
1611       goto flushing;
1612
1613   } while (G_UNLIKELY (restarting));
1614
1615   DEC_WAITING (set);
1616
1617   return res;
1618
1619   /* ERRORS */
1620 already_waiting:
1621   {
1622     GST_LOG ("%p: we are already waiting", set);
1623     DEC_WAITING (set);
1624     errno = EPERM;
1625     return -1;
1626   }
1627 flushing:
1628   {
1629     GST_LOG ("%p: we are flushing", set);
1630     DEC_WAITING (set);
1631     errno = EBUSY;
1632     return -1;
1633   }
1634 #ifdef G_OS_WIN32
1635 winsock_error:
1636   {
1637     GST_LOG ("%p: winsock error", set);
1638     g_mutex_unlock (&set->lock);
1639     DEC_WAITING (set);
1640     return -1;
1641   }
1642 #endif
1643 }
1644
1645 /**
1646  * gst_poll_set_controllable:
1647  * @set: a #GstPoll.
1648  * @controllable: new controllable state.
1649  *
1650  * When @controllable is %TRUE, this function ensures that future calls to
1651  * gst_poll_wait() will be affected by gst_poll_restart() and
1652  * gst_poll_set_flushing().
1653  *
1654  * This function only works for non-timer #GstPoll objects created with
1655  * gst_poll_new().
1656  *
1657  * Returns: %TRUE if the controllability of @set could be updated.
1658  */
1659 gboolean
1660 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1661 {
1662   g_return_val_if_fail (set != NULL, FALSE);
1663   g_return_val_if_fail (!set->timer, FALSE);
1664
1665   GST_LOG ("%p: controllable : %d", set, controllable);
1666
1667   set->controllable = controllable;
1668
1669   return TRUE;
1670 }
1671
1672 /**
1673  * gst_poll_restart:
1674  * @set: a #GstPoll.
1675  *
1676  * Restart any gst_poll_wait() that is in progress. This function is typically
1677  * used after adding or removing descriptors to @set.
1678  *
1679  * If @set is not controllable, then this call will have no effect.
1680  *
1681  * This function only works for non-timer #GstPoll objects created with
1682  * gst_poll_new().
1683  */
1684 void
1685 gst_poll_restart (GstPoll * set)
1686 {
1687   g_return_if_fail (set != NULL);
1688   g_return_if_fail (!set->timer);
1689
1690   if (set->controllable && GET_WAITING (set) > 0) {
1691     /* we are controllable and waiting, wake up the waiter. The socket will be
1692      * cleared by the _wait() thread and the poll will be restarted */
1693     raise_wakeup (set);
1694   }
1695 }
1696
1697 /**
1698  * gst_poll_set_flushing:
1699  * @set: a #GstPoll.
1700  * @flushing: new flushing state.
1701  *
1702  * When @flushing is %TRUE, this function ensures that current and future calls
1703  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1704  *
1705  * Unsetting the flushing state will restore normal operation of @set.
1706  *
1707  * This function only works for non-timer #GstPoll objects created with
1708  * gst_poll_new().
1709  */
1710 void
1711 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1712 {
1713   g_return_if_fail (set != NULL);
1714   g_return_if_fail (!set->timer);
1715
1716   GST_LOG ("%p: flushing: %d", set, flushing);
1717
1718   /* update the new state first */
1719   SET_FLUSHING (set, flushing);
1720
1721   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1722     /* we are flushing, controllable and waiting, wake up the waiter. When we
1723      * stop the flushing operation we don't clear the wakeup fd here, this will
1724      * happen in the _wait() thread. */
1725     raise_wakeup (set);
1726   }
1727 }
1728
1729 /**
1730  * gst_poll_write_control:
1731  * @set: a #GstPoll.
1732  *
1733  * Write a byte to the control socket of the controllable @set.
1734  * This function is mostly useful for timer #GstPoll objects created with
1735  * gst_poll_new_timer().
1736  *
1737  * It will make any current and future gst_poll_wait() function return with
1738  * 1, meaning the control socket is set. After an equal amount of calls to
1739  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1740  * block again until their timeout expired.
1741  *
1742  * This function only works for timer #GstPoll objects created with
1743  * gst_poll_new_timer().
1744  *
1745  * Returns: %TRUE on success. %FALSE when when the byte could not be written.
1746  * errno contains the detailed error code but will never be EAGAIN, EINTR or
1747  * EWOULDBLOCK. %FALSE always signals a critical error.
1748  */
1749 gboolean
1750 gst_poll_write_control (GstPoll * set)
1751 {
1752   gboolean res;
1753
1754   g_return_val_if_fail (set != NULL, FALSE);
1755   g_return_val_if_fail (set->timer, FALSE);
1756
1757   res = raise_wakeup (set);
1758
1759   return res;
1760 }
1761
1762 /**
1763  * gst_poll_read_control:
1764  * @set: a #GstPoll.
1765  *
1766  * Read a byte from the control socket of the controllable @set.
1767  *
1768  * This function only works for timer #GstPoll objects created with
1769  * gst_poll_new_timer().
1770  *
1771  * Returns: %TRUE on success. %FALSE when when there was no byte to read or
1772  * reading the byte failed. If there was no byte to read, and only then, errno
1773  * will contain EWOULDBLOCK or EAGAIN. For all other values of errno this always signals a
1774  * critical error.
1775  */
1776 gboolean
1777 gst_poll_read_control (GstPoll * set)
1778 {
1779   gboolean res;
1780
1781   g_return_val_if_fail (set != NULL, FALSE);
1782   g_return_val_if_fail (set->timer, FALSE);
1783
1784   res = release_wakeup (set);
1785
1786   return res;
1787 }