poll: #define EWOULDBLOCK to EAGAIN if it's not defined on Windows
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancellable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 #ifdef G_OS_WIN32
88 #  ifndef EWOULDBLOCK
89 #  define EWOULDBLOCK EAGAIN    /* This is just to placate gcc */
90 #  endif
91 #endif /* G_OS_WIN32 */
92
93 /* OS/X needs this because of bad headers */
94 #include <string.h>
95
96 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
97  * so we prefer our own poll emulation.
98  */
99 #if defined(BROKEN_POLL)
100 #undef HAVE_POLL
101 #endif
102
103 #include "gstpoll.h"
104
105 #define GST_CAT_DEFAULT GST_CAT_POLL
106
107 #ifdef G_OS_WIN32
108 typedef struct _WinsockFd WinsockFd;
109
110 struct _WinsockFd
111 {
112   gint fd;
113   glong event_mask;
114   WSANETWORKEVENTS events;
115   glong ignored_event_mask;
116 };
117 #endif
118
119 typedef enum
120 {
121   GST_POLL_MODE_AUTO,
122   GST_POLL_MODE_SELECT,
123   GST_POLL_MODE_PSELECT,
124   GST_POLL_MODE_POLL,
125   GST_POLL_MODE_PPOLL,
126   GST_POLL_MODE_WINDOWS
127 } GstPollMode;
128
129 struct _GstPoll
130 {
131   GstPollMode mode;
132
133   GMutex lock;
134   /* array of fds, always written to and read from with lock */
135   GArray *fds;
136   /* array of active fds, only written to from the waiting thread with the
137    * lock and read from with the lock or without the lock from the waiting
138    * thread */
139   GArray *active_fds;
140
141 #ifndef G_OS_WIN32
142   GstPollFD control_read_fd;
143   GstPollFD control_write_fd;
144 #else
145   GArray *active_fds_ignored;
146   GArray *events;
147   GArray *active_events;
148
149   HANDLE wakeup_event;
150 #endif
151
152   gboolean controllable;
153   volatile gint waiting;
154   volatile gint control_pending;
155   volatile gint flushing;
156   gboolean timer;
157   volatile gint rebuild;
158 };
159
160 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
161     gboolean active);
162 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
163
164 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
165 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
166
167 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
168 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
169 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
170
171 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
172 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
173
174 #ifndef G_OS_WIN32
175
176 static gboolean
177 wake_event (GstPoll * set)
178 {
179   ssize_t num_written;
180   while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
181     if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
182       g_critical ("%p: failed to wake event: %s", set, strerror (errno));
183       return FALSE;
184     }
185   }
186   return TRUE;
187 }
188
189 static gboolean
190 release_event (GstPoll * set)
191 {
192   gchar buf[1] = { '\0' };
193   ssize_t num_read;
194   while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
195     if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
196       g_critical ("%p: failed to release event: %s", set, strerror (errno));
197       return FALSE;
198     }
199   }
200   return TRUE;
201 }
202
203 #else
204
205 static void
206 format_last_error (gchar * buf, size_t buf_len)
207 {
208   DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
209   LPCVOID src = NULL;
210   DWORD lang = 0;
211   DWORD id;
212   id = GetLastError ();
213   FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
214   SetLastError (id);
215 }
216
217 static gboolean
218 wake_event (GstPoll * set)
219 {
220   SetLastError (0);
221   errno = 0;
222   if (!SetEvent (set->wakeup_event)) {
223     gchar msg[1024] = "<unknown>";
224     format_last_error (msg, sizeof (msg));
225     g_critical ("%p: failed to set wakup_event: %s", set, msg);
226     errno = EBADF;
227     return FALSE;
228   }
229
230   return TRUE;
231 }
232
233 static gboolean
234 release_event (GstPoll * set)
235 {
236   DWORD status;
237   SetLastError (0);
238   errno = 0;
239   if (status = WaitForSingleObject (set->wakeup_event, INFINITE)) {
240     const gchar *reason = "unknown";
241     gchar msg[1024] = "<unknown>";
242     switch (status) {
243       case WAIT_ABANDONED:
244         reason = "WAIT_ABANDONED";
245         break;
246       case WAIT_TIMEOUT:
247         reason = "WAIT_TIMEOUT";
248         break;
249       case WAIT_FAILED:
250         format_last_error (msg, sizeof (msg));
251         reason = msg;
252         break;
253       default:
254         reason = "other";
255         break;
256     }
257     g_critical ("%p: failed to block on wakup_event: %s", set, reason);
258     errno = EBADF;
259     return FALSE;
260   }
261
262   if (!ResetEvent (set->wakeup_event)) {
263     gchar msg[1024] = "<unknown>";
264     format_last_error (msg, sizeof (msg));
265     g_critical ("%p: failed to reset wakup_event: %s", set, msg);
266     errno = EBADF;
267     return FALSE;
268   }
269
270   return TRUE;
271 }
272
273 #endif
274
275 /* the poll/select call is also performed on a control socket, that way
276  * we can send special commands to control it */
277 static inline gboolean
278 raise_wakeup (GstPoll * set)
279 {
280   gboolean result = TRUE;
281
282   /* makes testing control_pending and WAKE_EVENT() atomic. */
283   g_mutex_lock (&set->lock);
284
285   if (set->control_pending == 0) {
286     /* raise when nothing pending */
287     GST_LOG ("%p: raise", set);
288     result = wake_event (set);
289   }
290
291   if (result) {
292     set->control_pending++;
293   }
294
295   g_mutex_unlock (&set->lock);
296
297   return result;
298 }
299
300 static inline gboolean
301 release_wakeup (GstPoll * set)
302 {
303   gboolean result = FALSE;
304
305   /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
306   g_mutex_lock (&set->lock);
307
308   if (set->control_pending > 0) {
309     /* release, only if this was the last pending. */
310     if (set->control_pending == 1) {
311       GST_LOG ("%p: release", set);
312       result = release_event (set);
313     } else {
314       result = TRUE;
315     }
316
317     if (result) {
318       set->control_pending--;
319     }
320   } else {
321     errno = EWOULDBLOCK;
322   }
323
324   g_mutex_unlock (&set->lock);
325
326   return result;
327 }
328
329 static inline gint
330 release_all_wakeup (GstPoll * set)
331 {
332   gint old;
333
334   /* makes testing control_pending and RELEASE_EVENT() atomic. */
335   g_mutex_lock (&set->lock);
336
337   if ((old = set->control_pending) > 0) {
338     GST_LOG ("%p: releasing %d", set, old);
339     if (release_event (set)) {
340       set->control_pending = 0;
341     } else {
342       old = 0;
343     }
344   }
345
346   g_mutex_unlock (&set->lock);
347
348   return old;
349 }
350
351 static gint
352 find_index (GArray * array, GstPollFD * fd)
353 {
354 #ifndef G_OS_WIN32
355   struct pollfd *ifd;
356 #else
357   WinsockFd *ifd;
358 #endif
359   guint i;
360
361   /* start by assuming the index found in the fd is still valid */
362   if (fd->idx >= 0 && fd->idx < array->len) {
363 #ifndef G_OS_WIN32
364     ifd = &g_array_index (array, struct pollfd, fd->idx);
365 #else
366     ifd = &g_array_index (array, WinsockFd, fd->idx);
367 #endif
368
369     if (ifd->fd == fd->fd) {
370       return fd->idx;
371     }
372   }
373
374   /* the pollfd array has changed and we need to lookup the fd again */
375   for (i = 0; i < array->len; i++) {
376 #ifndef G_OS_WIN32
377     ifd = &g_array_index (array, struct pollfd, i);
378 #else
379     ifd = &g_array_index (array, WinsockFd, i);
380 #endif
381
382     if (ifd->fd == fd->fd) {
383       fd->idx = (gint) i;
384       return fd->idx;
385     }
386   }
387
388   fd->idx = -1;
389   return fd->idx;
390 }
391
392 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
393 /* check if all file descriptors will fit in an fd_set */
394 static gboolean
395 selectable_fds (GstPoll * set)
396 {
397   guint i;
398
399   g_mutex_lock (&set->lock);
400   for (i = 0; i < set->fds->len; i++) {
401     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
402
403     if (pfd->fd >= FD_SETSIZE)
404       goto too_many;
405   }
406   g_mutex_unlock (&set->lock);
407
408   return TRUE;
409
410 too_many:
411   {
412     g_mutex_unlock (&set->lock);
413     return FALSE;
414   }
415 }
416
417 /* check if the timeout will convert to a timeout value used for poll()
418  * without a loss of precision
419  */
420 static gboolean
421 pollable_timeout (GstClockTime timeout)
422 {
423   if (timeout == GST_CLOCK_TIME_NONE)
424     return TRUE;
425
426   /* not a nice multiple of milliseconds */
427   if (timeout % 1000000)
428     return FALSE;
429
430   return TRUE;
431 }
432 #endif
433
434 static GstPollMode
435 choose_mode (GstPoll * set, GstClockTime timeout)
436 {
437   GstPollMode mode;
438
439   if (set->mode == GST_POLL_MODE_AUTO) {
440 #ifdef HAVE_PPOLL
441     mode = GST_POLL_MODE_PPOLL;
442 #elif defined(HAVE_POLL)
443     if (!selectable_fds (set) || pollable_timeout (timeout)) {
444       mode = GST_POLL_MODE_POLL;
445     } else {
446 #ifdef HAVE_PSELECT
447       mode = GST_POLL_MODE_PSELECT;
448 #else
449       mode = GST_POLL_MODE_SELECT;
450 #endif
451     }
452 #elif defined(HAVE_PSELECT)
453     mode = GST_POLL_MODE_PSELECT;
454 #else
455     mode = GST_POLL_MODE_SELECT;
456 #endif
457   } else {
458     mode = set->mode;
459   }
460   return mode;
461 }
462
463 #ifndef G_OS_WIN32
464 static gint
465 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
466     fd_set * errorfds)
467 {
468   gint max_fd = -1;
469   guint i;
470
471   FD_ZERO (readfds);
472   FD_ZERO (writefds);
473   FD_ZERO (errorfds);
474
475   g_mutex_lock (&set->lock);
476
477   for (i = 0; i < set->active_fds->len; i++) {
478     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
479
480     if (pfd->fd < FD_SETSIZE) {
481       if (pfd->events & POLLIN)
482         FD_SET (pfd->fd, readfds);
483       if (pfd->events & POLLOUT)
484         FD_SET (pfd->fd, writefds);
485       if (pfd->events)
486         FD_SET (pfd->fd, errorfds);
487       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
488         max_fd = pfd->fd;
489     }
490   }
491
492   g_mutex_unlock (&set->lock);
493
494   return max_fd;
495 }
496
497 static void
498 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
499     fd_set * errorfds)
500 {
501   guint i;
502
503   g_mutex_lock (&set->lock);
504
505   for (i = 0; i < set->active_fds->len; i++) {
506     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
507
508     if (pfd->fd < FD_SETSIZE) {
509       pfd->revents = 0;
510       if (FD_ISSET (pfd->fd, readfds))
511         pfd->revents |= POLLIN;
512       if (FD_ISSET (pfd->fd, writefds))
513         pfd->revents |= POLLOUT;
514       if (FD_ISSET (pfd->fd, errorfds))
515         pfd->revents |= POLLERR;
516     }
517   }
518
519   g_mutex_unlock (&set->lock);
520 }
521 #else /* G_OS_WIN32 */
522 /*
523  * Translate errors thrown by the Winsock API used by GstPoll:
524  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
525  */
526 static gint
527 gst_poll_winsock_error_to_errno (DWORD last_error)
528 {
529   switch (last_error) {
530     case WSA_INVALID_HANDLE:
531     case WSAEINVAL:
532     case WSAENOTSOCK:
533       return EBADF;
534
535     case WSA_NOT_ENOUGH_MEMORY:
536       return ENOMEM;
537
538       /*
539        * Anything else, including:
540        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
541        *   WSANOTINITIALISED
542        */
543     default:
544       return EINVAL;
545   }
546 }
547
548 static void
549 gst_poll_free_winsock_event (GstPoll * set, gint idx)
550 {
551   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
552   HANDLE event = g_array_index (set->events, HANDLE, idx);
553
554   WSAEventSelect (wfd->fd, event, 0);
555   CloseHandle (event);
556 }
557
558 static void
559 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
560     gboolean active)
561 {
562   WinsockFd *wfd;
563
564   wfd = &g_array_index (set->fds, WinsockFd, idx);
565
566   if (active)
567     wfd->event_mask |= flags;
568   else
569     wfd->event_mask &= ~flags;
570
571   /* reset ignored state if the new mask doesn't overlap at all */
572   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
573     wfd->ignored_event_mask = 0;
574 }
575
576 static gboolean
577 gst_poll_prepare_winsock_active_sets (GstPoll * set)
578 {
579   guint i;
580
581   g_array_set_size (set->active_fds, 0);
582   g_array_set_size (set->active_fds_ignored, 0);
583   g_array_set_size (set->active_events, 0);
584   g_array_append_val (set->active_events, set->wakeup_event);
585
586   for (i = 0; i < set->fds->len; i++) {
587     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
588     HANDLE event = g_array_index (set->events, HANDLE, i);
589
590     if (wfd->ignored_event_mask == 0) {
591       gint ret;
592
593       g_array_append_val (set->active_fds, *wfd);
594       g_array_append_val (set->active_events, event);
595
596       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
597       if (G_UNLIKELY (ret != 0)) {
598         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
599         return FALSE;
600       }
601     } else {
602       g_array_append_val (set->active_fds_ignored, wfd);
603     }
604   }
605
606   return TRUE;
607 }
608
609 static gint
610 gst_poll_collect_winsock_events (GstPoll * set)
611 {
612   gint res, i;
613
614   /*
615    * We need to check which events are signaled, and call
616    * WSAEnumNetworkEvents for those that are, which resets
617    * the event and clears the internal network event records.
618    */
619   res = 0;
620   for (i = 0; i < set->active_fds->len; i++) {
621     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
622     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
623     DWORD wait_ret;
624
625     wait_ret = WaitForSingleObject (event, 0);
626     if (wait_ret == WAIT_OBJECT_0) {
627       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
628
629       if (G_UNLIKELY (enum_ret != 0)) {
630         res = -1;
631         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
632         break;
633       }
634
635       res++;
636     } else {
637       /* clear any previously stored result */
638       memset (&wfd->events, 0, sizeof (wfd->events));
639     }
640   }
641
642   /* If all went well we also need to reset the ignored fds. */
643   if (res >= 0) {
644     res += set->active_fds_ignored->len;
645
646     for (i = 0; i < set->active_fds_ignored->len; i++) {
647       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
648
649       wfd->ignored_event_mask = 0;
650     }
651
652     g_array_set_size (set->active_fds_ignored, 0);
653   }
654
655   return res;
656 }
657 #endif
658
659 /**
660  * gst_poll_new: (skip)
661  * @controllable: whether it should be possible to control a wait.
662  *
663  * Create a new file descriptor set. If @controllable, it
664  * is possible to restart or flush a call to gst_poll_wait() with
665  * gst_poll_restart() and gst_poll_set_flushing() respectively.
666  *
667  * Free-function: gst_poll_free
668  *
669  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
670  *     case of an error.  Free with gst_poll_free().
671  */
672 GstPoll *
673 gst_poll_new (gboolean controllable)
674 {
675   GstPoll *nset;
676
677   nset = g_slice_new0 (GstPoll);
678   GST_DEBUG ("%p: new controllable : %d", nset, controllable);
679   g_mutex_init (&nset->lock);
680 #ifndef G_OS_WIN32
681   nset->mode = GST_POLL_MODE_AUTO;
682   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
683   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
684   nset->control_read_fd.fd = -1;
685   nset->control_write_fd.fd = -1;
686   {
687     gint control_sock[2];
688
689     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
690       goto no_socket_pair;
691
692     nset->control_read_fd.fd = control_sock[0];
693     nset->control_write_fd.fd = control_sock[1];
694
695     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
696     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
697   }
698 #else
699   nset->mode = GST_POLL_MODE_WINDOWS;
700   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
701   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
702   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
703   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
704   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
705
706   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
707 #endif
708
709   /* ensure (re)build, though already sneakily set in non-windows case */
710   MARK_REBUILD (nset);
711
712   nset->controllable = controllable;
713   nset->control_pending = 0;
714
715   return nset;
716
717   /* ERRORS */
718 #ifndef G_OS_WIN32
719 no_socket_pair:
720   {
721     GST_WARNING ("%p: can't create socket pair !", nset);
722     gst_poll_free (nset);
723     return NULL;
724   }
725 #endif
726 }
727
728 /**
729  * gst_poll_new_timer: (skip)
730  *
731  * Create a new poll object that can be used for scheduling cancellable
732  * timeouts.
733  *
734  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
735  * performed from different threads. 
736  *
737  * Free-function: gst_poll_free
738  *
739  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
740  *     case of an error.  Free with gst_poll_free().
741  */
742 GstPoll *
743 gst_poll_new_timer (void)
744 {
745   GstPoll *poll;
746
747   /* make a new controllable poll set */
748   if (!(poll = gst_poll_new (TRUE)))
749     goto done;
750
751   /* we are a timer */
752   poll->timer = TRUE;
753
754 done:
755   return poll;
756 }
757
758 /**
759  * gst_poll_free:
760  * @set: (transfer full): a file descriptor set.
761  *
762  * Free a file descriptor set.
763  */
764 void
765 gst_poll_free (GstPoll * set)
766 {
767   g_return_if_fail (set != NULL);
768
769   GST_DEBUG ("%p: freeing", set);
770
771 #ifndef G_OS_WIN32
772   if (set->control_write_fd.fd >= 0)
773     close (set->control_write_fd.fd);
774   if (set->control_read_fd.fd >= 0)
775     close (set->control_read_fd.fd);
776 #else
777   CloseHandle (set->wakeup_event);
778
779   {
780     guint i;
781
782     for (i = 0; i < set->events->len; i++)
783       gst_poll_free_winsock_event (set, i);
784   }
785
786   g_array_free (set->active_events, TRUE);
787   g_array_free (set->events, TRUE);
788   g_array_free (set->active_fds_ignored, TRUE);
789 #endif
790
791   g_array_free (set->active_fds, TRUE);
792   g_array_free (set->fds, TRUE);
793   g_mutex_clear (&set->lock);
794   g_slice_free (GstPoll, set);
795 }
796
797 /**
798  * gst_poll_get_read_gpollfd:
799  * @set: a #GstPoll
800  * @fd: a #GPollFD
801  *
802  * Get a GPollFD for the reading part of the control socket. This is useful when
803  * integrating with a GSource and GMainLoop.
804  */
805 void
806 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
807 {
808   g_return_if_fail (set != NULL);
809   g_return_if_fail (fd != NULL);
810
811 #ifndef G_OS_WIN32
812   fd->fd = set->control_read_fd.fd;
813 #else
814 #if GLIB_SIZEOF_VOID_P == 8
815   fd->fd = (gint64) set->wakeup_event;
816 #else
817   fd->fd = (gint) set->wakeup_event;
818 #endif
819 #endif
820   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
821   fd->revents = 0;
822 }
823
824 /**
825  * gst_poll_fd_init:
826  * @fd: a #GstPollFD
827  *
828  * Initializes @fd. Alternatively you can initialize it with
829  * #GST_POLL_FD_INIT.
830  */
831 void
832 gst_poll_fd_init (GstPollFD * fd)
833 {
834   g_return_if_fail (fd != NULL);
835
836   fd->fd = -1;
837   fd->idx = -1;
838 }
839
840 static gboolean
841 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
842 {
843   gint idx;
844
845   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
846
847   idx = find_index (set->fds, fd);
848   if (idx < 0) {
849 #ifndef G_OS_WIN32
850     struct pollfd nfd;
851
852     nfd.fd = fd->fd;
853     nfd.events = POLLERR | POLLNVAL | POLLHUP;
854     nfd.revents = 0;
855
856     g_array_append_val (set->fds, nfd);
857
858     fd->idx = set->fds->len - 1;
859 #else
860     WinsockFd wfd;
861     HANDLE event;
862
863     wfd.fd = fd->fd;
864     wfd.event_mask = FD_CLOSE;
865     memset (&wfd.events, 0, sizeof (wfd.events));
866     wfd.ignored_event_mask = 0;
867     event = WSACreateEvent ();
868
869     g_array_append_val (set->fds, wfd);
870     g_array_append_val (set->events, event);
871
872     fd->idx = set->fds->len - 1;
873 #endif
874     MARK_REBUILD (set);
875   } else {
876     GST_WARNING ("%p: fd already added !", set);
877   }
878
879   return TRUE;
880 }
881
882 /**
883  * gst_poll_add_fd:
884  * @set: a file descriptor set.
885  * @fd: a file descriptor.
886  *
887  * Add a file descriptor to the file descriptor set.
888  *
889  * Returns: %TRUE if the file descriptor was successfully added to the set.
890  */
891 gboolean
892 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
893 {
894   gboolean ret;
895
896   g_return_val_if_fail (set != NULL, FALSE);
897   g_return_val_if_fail (fd != NULL, FALSE);
898   g_return_val_if_fail (fd->fd >= 0, FALSE);
899
900   g_mutex_lock (&set->lock);
901
902   ret = gst_poll_add_fd_unlocked (set, fd);
903
904   g_mutex_unlock (&set->lock);
905
906   return ret;
907 }
908
909 /**
910  * gst_poll_remove_fd:
911  * @set: a file descriptor set.
912  * @fd: a file descriptor.
913  *
914  * Remove a file descriptor from the file descriptor set.
915  *
916  * Returns: %TRUE if the file descriptor was successfully removed from the set.
917  */
918 gboolean
919 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
920 {
921   gint idx;
922
923   g_return_val_if_fail (set != NULL, FALSE);
924   g_return_val_if_fail (fd != NULL, FALSE);
925   g_return_val_if_fail (fd->fd >= 0, FALSE);
926
927
928   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
929
930   g_mutex_lock (&set->lock);
931
932   /* get the index, -1 is an fd that is not added */
933   idx = find_index (set->fds, fd);
934   if (idx >= 0) {
935 #ifdef G_OS_WIN32
936     gst_poll_free_winsock_event (set, idx);
937     g_array_remove_index_fast (set->events, idx);
938 #endif
939
940     /* remove the fd at index, we use _remove_index_fast, which copies the last
941      * element of the array to the freed index */
942     g_array_remove_index_fast (set->fds, idx);
943
944     /* mark fd as removed by setting the index to -1 */
945     fd->idx = -1;
946     MARK_REBUILD (set);
947   } else {
948     GST_WARNING ("%p: couldn't find fd !", set);
949   }
950
951   g_mutex_unlock (&set->lock);
952
953   return idx >= 0;
954 }
955
956 /**
957  * gst_poll_fd_ctl_write:
958  * @set: a file descriptor set.
959  * @fd: a file descriptor.
960  * @active: a new status.
961  *
962  * Control whether the descriptor @fd in @set will be monitored for
963  * writability.
964  *
965  * Returns: %TRUE if the descriptor was successfully updated.
966  */
967 gboolean
968 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
969 {
970   gint idx;
971
972   g_return_val_if_fail (set != NULL, FALSE);
973   g_return_val_if_fail (fd != NULL, FALSE);
974   g_return_val_if_fail (fd->fd >= 0, FALSE);
975
976   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
977       fd->fd, fd->idx, active);
978
979   g_mutex_lock (&set->lock);
980
981   idx = find_index (set->fds, fd);
982   if (idx >= 0) {
983 #ifndef G_OS_WIN32
984     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
985
986     if (active)
987       pfd->events |= POLLOUT;
988     else
989       pfd->events &= ~POLLOUT;
990
991     GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
992 #else
993     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
994         active);
995 #endif
996     MARK_REBUILD (set);
997   } else {
998     GST_WARNING ("%p: couldn't find fd !", set);
999   }
1000
1001   g_mutex_unlock (&set->lock);
1002
1003   return idx >= 0;
1004 }
1005
1006 static gboolean
1007 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
1008 {
1009   gint idx;
1010
1011   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1012       fd->fd, fd->idx, active);
1013
1014   idx = find_index (set->fds, fd);
1015
1016   if (idx >= 0) {
1017 #ifndef G_OS_WIN32
1018     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1019
1020     if (active)
1021       pfd->events |= (POLLIN | POLLPRI);
1022     else
1023       pfd->events &= ~(POLLIN | POLLPRI);
1024 #else
1025     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
1026 #endif
1027     MARK_REBUILD (set);
1028   } else {
1029     GST_WARNING ("%p: couldn't find fd !", set);
1030   }
1031
1032   return idx >= 0;
1033 }
1034
1035 /**
1036  * gst_poll_fd_ctl_read:
1037  * @set: a file descriptor set.
1038  * @fd: a file descriptor.
1039  * @active: a new status.
1040  *
1041  * Control whether the descriptor @fd in @set will be monitored for
1042  * readability.
1043  *
1044  * Returns: %TRUE if the descriptor was successfully updated.
1045  */
1046 gboolean
1047 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
1048 {
1049   gboolean ret;
1050
1051   g_return_val_if_fail (set != NULL, FALSE);
1052   g_return_val_if_fail (fd != NULL, FALSE);
1053   g_return_val_if_fail (fd->fd >= 0, FALSE);
1054
1055   g_mutex_lock (&set->lock);
1056
1057   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
1058
1059   g_mutex_unlock (&set->lock);
1060
1061   return ret;
1062 }
1063
1064 /**
1065  * gst_poll_fd_ignored:
1066  * @set: a file descriptor set.
1067  * @fd: a file descriptor.
1068  *
1069  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
1070  * the same result for @fd as last time. This function must be called if no
1071  * operation (read/write/recv/send/etc.) will be performed on @fd before
1072  * the next call to gst_poll_wait().
1073  *
1074  * The reason why this is needed is because the underlying implementation
1075  * might not allow querying the fd more than once between calls to one of
1076  * the re-enabling operations.
1077  */
1078 void
1079 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
1080 {
1081 #ifdef G_OS_WIN32
1082   gint idx;
1083
1084   g_return_if_fail (set != NULL);
1085   g_return_if_fail (fd != NULL);
1086   g_return_if_fail (fd->fd >= 0);
1087
1088   g_mutex_lock (&set->lock);
1089
1090   idx = find_index (set->fds, fd);
1091   if (idx >= 0) {
1092     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
1093
1094     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
1095     MARK_REBUILD (set);
1096   }
1097
1098   g_mutex_unlock (&set->lock);
1099 #endif
1100 }
1101
1102 /**
1103  * gst_poll_fd_has_closed:
1104  * @set: a file descriptor set.
1105  * @fd: a file descriptor.
1106  *
1107  * Check if @fd in @set has closed the connection.
1108  *
1109  * Returns: %TRUE if the connection was closed.
1110  */
1111 gboolean
1112 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1113 {
1114   gboolean res = FALSE;
1115   gint idx;
1116
1117   g_return_val_if_fail (set != NULL, FALSE);
1118   g_return_val_if_fail (fd != NULL, FALSE);
1119   g_return_val_if_fail (fd->fd >= 0, FALSE);
1120
1121   g_mutex_lock (&((GstPoll *) set)->lock);
1122
1123   idx = find_index (set->active_fds, fd);
1124   if (idx >= 0) {
1125 #ifndef G_OS_WIN32
1126     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1127
1128     res = (pfd->revents & POLLHUP) != 0;
1129 #else
1130     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1131
1132     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1133 #endif
1134   } else {
1135     GST_WARNING ("%p: couldn't find fd !", set);
1136   }
1137   g_mutex_unlock (&((GstPoll *) set)->lock);
1138
1139   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1140
1141   return res;
1142 }
1143
1144 /**
1145  * gst_poll_fd_has_error:
1146  * @set: a file descriptor set.
1147  * @fd: a file descriptor.
1148  *
1149  * Check if @fd in @set has an error.
1150  *
1151  * Returns: %TRUE if the descriptor has an error.
1152  */
1153 gboolean
1154 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1155 {
1156   gboolean res = FALSE;
1157   gint idx;
1158
1159   g_return_val_if_fail (set != NULL, FALSE);
1160   g_return_val_if_fail (fd != NULL, FALSE);
1161   g_return_val_if_fail (fd->fd >= 0, FALSE);
1162
1163   g_mutex_lock (&((GstPoll *) set)->lock);
1164
1165   idx = find_index (set->active_fds, fd);
1166   if (idx >= 0) {
1167 #ifndef G_OS_WIN32
1168     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1169
1170     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1171 #else
1172     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1173
1174     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1175         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1176         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1177         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1178         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1179 #endif
1180   } else {
1181     GST_WARNING ("%p: couldn't find fd !", set);
1182   }
1183   g_mutex_unlock (&((GstPoll *) set)->lock);
1184
1185   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1186
1187   return res;
1188 }
1189
1190 static gboolean
1191 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1192 {
1193   gboolean res = FALSE;
1194   gint idx;
1195
1196   idx = find_index (set->active_fds, fd);
1197   if (idx >= 0) {
1198 #ifndef G_OS_WIN32
1199     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1200
1201     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1202 #else
1203     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1204
1205     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1206 #endif
1207   } else {
1208     GST_WARNING ("%p: couldn't find fd !", set);
1209   }
1210   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1211
1212   return res;
1213 }
1214
1215 /**
1216  * gst_poll_fd_can_read:
1217  * @set: a file descriptor set.
1218  * @fd: a file descriptor.
1219  *
1220  * Check if @fd in @set has data to be read.
1221  *
1222  * Returns: %TRUE if the descriptor has data to be read.
1223  */
1224 gboolean
1225 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1226 {
1227   gboolean res = FALSE;
1228
1229   g_return_val_if_fail (set != NULL, FALSE);
1230   g_return_val_if_fail (fd != NULL, FALSE);
1231   g_return_val_if_fail (fd->fd >= 0, FALSE);
1232
1233   g_mutex_lock (&((GstPoll *) set)->lock);
1234
1235   res = gst_poll_fd_can_read_unlocked (set, fd);
1236
1237   g_mutex_unlock (&((GstPoll *) set)->lock);
1238
1239   return res;
1240 }
1241
1242 /**
1243  * gst_poll_fd_can_write:
1244  * @set: a file descriptor set.
1245  * @fd: a file descriptor.
1246  *
1247  * Check if @fd in @set can be used for writing.
1248  *
1249  * Returns: %TRUE if the descriptor can be used for writing.
1250  */
1251 gboolean
1252 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1253 {
1254   gboolean res = FALSE;
1255   gint idx;
1256
1257   g_return_val_if_fail (set != NULL, FALSE);
1258   g_return_val_if_fail (fd != NULL, FALSE);
1259   g_return_val_if_fail (fd->fd >= 0, FALSE);
1260
1261   g_mutex_lock (&((GstPoll *) set)->lock);
1262
1263   idx = find_index (set->active_fds, fd);
1264   if (idx >= 0) {
1265 #ifndef G_OS_WIN32
1266     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1267
1268     res = (pfd->revents & POLLOUT) != 0;
1269 #else
1270     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1271
1272     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1273 #endif
1274   } else {
1275     GST_WARNING ("%p: couldn't find fd !", set);
1276   }
1277   g_mutex_unlock (&((GstPoll *) set)->lock);
1278
1279   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1280
1281   return res;
1282 }
1283
1284 /**
1285  * gst_poll_wait:
1286  * @set: a #GstPoll.
1287  * @timeout: a timeout in nanoseconds.
1288  *
1289  * Wait for activity on the file descriptors in @set. This function waits up to
1290  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1291  *
1292  * For #GstPoll objects created with gst_poll_new(), this function can only be
1293  * called from a single thread at a time.  If called from multiple threads,
1294  * -1 will be returned with errno set to EPERM.
1295  *
1296  * This is not true for timer #GstPoll objects created with
1297  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1298  * simultaneously.
1299  *
1300  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1301  * activity was detected after @timeout. If an error occurs, -1 is returned
1302  * and errno is set.
1303  */
1304 gint
1305 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1306 {
1307   gboolean restarting;
1308   gboolean is_timer;
1309   int res;
1310   gint old_waiting;
1311
1312   g_return_val_if_fail (set != NULL, -1);
1313
1314   GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1315
1316   is_timer = set->timer;
1317
1318   /* add one more waiter */
1319   old_waiting = INC_WAITING (set);
1320
1321   /* we cannot wait from multiple threads unless we are a timer */
1322   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1323     goto already_waiting;
1324
1325   /* flushing, exit immediately */
1326   if (G_UNLIKELY (IS_FLUSHING (set)))
1327     goto flushing;
1328
1329   do {
1330     GstPollMode mode;
1331
1332     res = -1;
1333     restarting = FALSE;
1334
1335     mode = choose_mode (set, timeout);
1336
1337     if (TEST_REBUILD (set)) {
1338       g_mutex_lock (&set->lock);
1339 #ifndef G_OS_WIN32
1340       g_array_set_size (set->active_fds, set->fds->len);
1341       memcpy (set->active_fds->data, set->fds->data,
1342           set->fds->len * sizeof (struct pollfd));
1343 #else
1344       if (!gst_poll_prepare_winsock_active_sets (set))
1345         goto winsock_error;
1346 #endif
1347       g_mutex_unlock (&set->lock);
1348     }
1349
1350     switch (mode) {
1351       case GST_POLL_MODE_AUTO:
1352         g_assert_not_reached ();
1353         break;
1354       case GST_POLL_MODE_PPOLL:
1355       {
1356 #ifdef HAVE_PPOLL
1357         struct timespec ts;
1358         struct timespec *tsptr;
1359
1360         if (timeout != GST_CLOCK_TIME_NONE) {
1361           GST_TIME_TO_TIMESPEC (timeout, ts);
1362           tsptr = &ts;
1363         } else {
1364           tsptr = NULL;
1365         }
1366
1367         res =
1368             ppoll ((struct pollfd *) set->active_fds->data,
1369             set->active_fds->len, tsptr, NULL);
1370 #else
1371         g_assert_not_reached ();
1372         errno = ENOSYS;
1373 #endif
1374         break;
1375       }
1376       case GST_POLL_MODE_POLL:
1377       {
1378 #ifdef HAVE_POLL
1379         gint t;
1380
1381         if (timeout != GST_CLOCK_TIME_NONE) {
1382           t = GST_TIME_AS_MSECONDS (timeout);
1383         } else {
1384           t = -1;
1385         }
1386
1387         res =
1388             poll ((struct pollfd *) set->active_fds->data,
1389             set->active_fds->len, t);
1390 #else
1391         g_assert_not_reached ();
1392         errno = ENOSYS;
1393 #endif
1394         break;
1395       }
1396       case GST_POLL_MODE_PSELECT:
1397 #ifndef HAVE_PSELECT
1398       {
1399         g_assert_not_reached ();
1400         errno = ENOSYS;
1401         break;
1402       }
1403 #endif
1404       case GST_POLL_MODE_SELECT:
1405       {
1406 #ifndef G_OS_WIN32
1407         fd_set readfds;
1408         fd_set writefds;
1409         fd_set errorfds;
1410         gint max_fd;
1411
1412         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1413
1414         if (mode == GST_POLL_MODE_SELECT) {
1415           struct timeval tv;
1416           struct timeval *tvptr;
1417
1418           if (timeout != GST_CLOCK_TIME_NONE) {
1419             GST_TIME_TO_TIMEVAL (timeout, tv);
1420             tvptr = &tv;
1421           } else {
1422             tvptr = NULL;
1423           }
1424
1425           GST_DEBUG ("%p: Calling select", set);
1426           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1427           GST_DEBUG ("%p: After select, res:%d", set, res);
1428         } else {
1429 #ifdef HAVE_PSELECT
1430           struct timespec ts;
1431           struct timespec *tsptr;
1432
1433           if (timeout != GST_CLOCK_TIME_NONE) {
1434             GST_TIME_TO_TIMESPEC (timeout, ts);
1435             tsptr = &ts;
1436           } else {
1437             tsptr = NULL;
1438           }
1439
1440           GST_DEBUG ("%p: Calling pselect", set);
1441           res =
1442               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1443           GST_DEBUG ("%p: After pselect, res:%d", set, res);
1444 #endif
1445         }
1446
1447         if (res >= 0) {
1448           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1449         }
1450 #else /* G_OS_WIN32 */
1451         g_assert_not_reached ();
1452         errno = ENOSYS;
1453 #endif
1454         break;
1455       }
1456       case GST_POLL_MODE_WINDOWS:
1457       {
1458 #ifdef G_OS_WIN32
1459         gint ignore_count = set->active_fds_ignored->len;
1460         DWORD t, wait_ret;
1461
1462         if (G_LIKELY (ignore_count == 0)) {
1463           if (timeout != GST_CLOCK_TIME_NONE)
1464             t = GST_TIME_AS_MSECONDS (timeout);
1465           else
1466             t = INFINITE;
1467         } else {
1468           /* already one or more ignored fds, so we quickly sweep the others */
1469           t = 0;
1470         }
1471
1472         if (set->active_events->len != 0) {
1473           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1474               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1475         } else {
1476           wait_ret = WSA_WAIT_FAILED;
1477           WSASetLastError (WSA_INVALID_PARAMETER);
1478         }
1479
1480         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1481           res = 0;
1482         } else if (wait_ret == WSA_WAIT_FAILED) {
1483           res = -1;
1484           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1485         } else {
1486           /* the first entry is the wakeup event */
1487           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1488             res = gst_poll_collect_winsock_events (set);
1489           } else {
1490             res = 1;            /* wakeup event */
1491           }
1492         }
1493 #else
1494         g_assert_not_reached ();
1495         errno = ENOSYS;
1496 #endif
1497         break;
1498       }
1499     }
1500
1501     if (!is_timer) {
1502       /* Applications needs to clear the control socket themselves for timer
1503        * polls.
1504        * For other polls, we need to clear the control socket. If there was only
1505        * one socket with activity and it was the control socket, we need to
1506        * restart */
1507       if (release_all_wakeup (set) > 0 && res == 1)
1508         restarting = TRUE;
1509     }
1510
1511     /* we got woken up and we are flushing, we need to stop */
1512     if (G_UNLIKELY (IS_FLUSHING (set)))
1513       goto flushing;
1514
1515   } while (G_UNLIKELY (restarting));
1516
1517   DEC_WAITING (set);
1518
1519   return res;
1520
1521   /* ERRORS */
1522 already_waiting:
1523   {
1524     GST_LOG ("%p: we are already waiting", set);
1525     DEC_WAITING (set);
1526     errno = EPERM;
1527     return -1;
1528   }
1529 flushing:
1530   {
1531     GST_LOG ("%p: we are flushing", set);
1532     DEC_WAITING (set);
1533     errno = EBUSY;
1534     return -1;
1535   }
1536 #ifdef G_OS_WIN32
1537 winsock_error:
1538   {
1539     GST_LOG ("%p: winsock error", set);
1540     g_mutex_unlock (&set->lock);
1541     DEC_WAITING (set);
1542     return -1;
1543   }
1544 #endif
1545 }
1546
1547 /**
1548  * gst_poll_set_controllable:
1549  * @set: a #GstPoll.
1550  * @controllable: new controllable state.
1551  *
1552  * When @controllable is %TRUE, this function ensures that future calls to
1553  * gst_poll_wait() will be affected by gst_poll_restart() and
1554  * gst_poll_set_flushing().
1555  *
1556  * This function only works for non-timer #GstPoll objects created with
1557  * gst_poll_new().
1558  *
1559  * Returns: %TRUE if the controllability of @set could be updated.
1560  */
1561 gboolean
1562 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1563 {
1564   g_return_val_if_fail (set != NULL, FALSE);
1565   g_return_val_if_fail (!set->timer, FALSE);
1566
1567   GST_LOG ("%p: controllable : %d", set, controllable);
1568
1569   set->controllable = controllable;
1570
1571   return TRUE;
1572 }
1573
1574 /**
1575  * gst_poll_restart:
1576  * @set: a #GstPoll.
1577  *
1578  * Restart any gst_poll_wait() that is in progress. This function is typically
1579  * used after adding or removing descriptors to @set.
1580  *
1581  * If @set is not controllable, then this call will have no effect.
1582  *
1583  * This function only works for non-timer #GstPoll objects created with
1584  * gst_poll_new().
1585  */
1586 void
1587 gst_poll_restart (GstPoll * set)
1588 {
1589   g_return_if_fail (set != NULL);
1590   g_return_if_fail (!set->timer);
1591
1592   if (set->controllable && GET_WAITING (set) > 0) {
1593     /* we are controllable and waiting, wake up the waiter. The socket will be
1594      * cleared by the _wait() thread and the poll will be restarted */
1595     raise_wakeup (set);
1596   }
1597 }
1598
1599 /**
1600  * gst_poll_set_flushing:
1601  * @set: a #GstPoll.
1602  * @flushing: new flushing state.
1603  *
1604  * When @flushing is %TRUE, this function ensures that current and future calls
1605  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1606  *
1607  * Unsetting the flushing state will restore normal operation of @set.
1608  *
1609  * This function only works for non-timer #GstPoll objects created with
1610  * gst_poll_new().
1611  */
1612 void
1613 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1614 {
1615   g_return_if_fail (set != NULL);
1616   g_return_if_fail (!set->timer);
1617
1618   GST_LOG ("%p: flushing: %d", set, flushing);
1619
1620   /* update the new state first */
1621   SET_FLUSHING (set, flushing);
1622
1623   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1624     /* we are flushing, controllable and waiting, wake up the waiter. When we
1625      * stop the flushing operation we don't clear the wakeup fd here, this will
1626      * happen in the _wait() thread. */
1627     raise_wakeup (set);
1628   }
1629 }
1630
1631 /**
1632  * gst_poll_write_control:
1633  * @set: a #GstPoll.
1634  *
1635  * Write a byte to the control socket of the controllable @set.
1636  * This function is mostly useful for timer #GstPoll objects created with
1637  * gst_poll_new_timer(). 
1638  *
1639  * It will make any current and future gst_poll_wait() function return with
1640  * 1, meaning the control socket is set. After an equal amount of calls to
1641  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1642  * block again until their timeout expired.
1643  *
1644  * This function only works for timer #GstPoll objects created with
1645  * gst_poll_new_timer().
1646  *
1647  * Returns: %TRUE on success. %FALSE when when the byte could not be written.
1648  * errno contains the detailed error code but will never be EAGAIN, EINTR or
1649  * EWOULDBLOCK. %FALSE always signals a critical error.
1650  */
1651 gboolean
1652 gst_poll_write_control (GstPoll * set)
1653 {
1654   gboolean res;
1655
1656   g_return_val_if_fail (set != NULL, FALSE);
1657   g_return_val_if_fail (set->timer, FALSE);
1658
1659   res = raise_wakeup (set);
1660
1661   return res;
1662 }
1663
1664 /**
1665  * gst_poll_read_control:
1666  * @set: a #GstPoll.
1667  *
1668  * Read a byte from the control socket of the controllable @set.
1669  *
1670  * This function only works for timer #GstPoll objects created with
1671  * gst_poll_new_timer().
1672  *
1673  * Returns: %TRUE on success. %FALSE when when there was no byte to read or
1674  * reading the byte failed. If there was no byte to read, and only then, errno
1675  * will contain EWOULDBLOCK or EAGAIN. For all other values of errno this always signals a
1676  * critical error.
1677  */
1678 gboolean
1679 gst_poll_read_control (GstPoll * set)
1680 {
1681   gboolean res;
1682
1683   g_return_val_if_fail (set != NULL, FALSE);
1684   g_return_val_if_fail (set->timer, FALSE);
1685
1686   res = release_wakeup (set);
1687
1688   return res;
1689 }