b2b1b681c1c70f4ee95bea6d67b9416bdc21d31e
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancellable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 #ifdef G_OS_WIN32
88 #  ifndef EWOULDBLOCK
89 #  define EWOULDBLOCK EAGAIN    /* This is just to placate gcc */
90 #  endif
91 #endif /* G_OS_WIN32 */
92
93 /* OS/X needs this because of bad headers */
94 #include <string.h>
95
96 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
97  * so we prefer our own poll emulation.
98  */
99 #if defined(BROKEN_POLL)
100 #undef HAVE_POLL
101 #endif
102
103 #include "gstpoll.h"
104
105 #define GST_CAT_DEFAULT GST_CAT_POLL
106
107 #ifdef G_OS_WIN32
108 typedef struct _WinsockFd WinsockFd;
109
110 struct _WinsockFd
111 {
112   gint fd;
113   glong event_mask;
114   WSANETWORKEVENTS events;
115   glong ignored_event_mask;
116 };
117 #endif
118
119 typedef enum
120 {
121   GST_POLL_MODE_AUTO,
122   GST_POLL_MODE_SELECT,
123   GST_POLL_MODE_PSELECT,
124   GST_POLL_MODE_POLL,
125   GST_POLL_MODE_PPOLL,
126   GST_POLL_MODE_WINDOWS
127 } GstPollMode;
128
129 struct _GstPoll
130 {
131   GstPollMode mode;
132
133   GMutex lock;
134   /* array of fds, always written to and read from with lock */
135   GArray *fds;
136   /* array of active fds, only written to from the waiting thread with the
137    * lock and read from with the lock or without the lock from the waiting
138    * thread */
139   GArray *active_fds;
140
141 #ifndef G_OS_WIN32
142   GstPollFD control_read_fd;
143   GstPollFD control_write_fd;
144 #else
145   GArray *active_fds_ignored;
146   GArray *events;
147   GArray *active_events;
148
149   HANDLE wakeup_event;
150 #endif
151
152   gboolean controllable;
153   volatile gint waiting;
154   volatile gint control_pending;
155   volatile gint flushing;
156   gboolean timer;
157   volatile gint rebuild;
158 };
159
160 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
161     gboolean active);
162 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
163
164 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
165 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
166
167 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
168 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
169 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
170
171 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
172 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
173
174 #ifndef G_OS_WIN32
175
176 static gboolean
177 wake_event (GstPoll * set)
178 {
179   ssize_t num_written;
180   while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
181     if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
182       g_critical ("%p: failed to wake event: %s", set, strerror (errno));
183       return FALSE;
184     }
185   }
186   return TRUE;
187 }
188
189 static gboolean
190 release_event (GstPoll * set)
191 {
192   gchar buf[1] = { '\0' };
193   ssize_t num_read;
194   while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
195     if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
196       g_critical ("%p: failed to release event: %s", set, strerror (errno));
197       return FALSE;
198     }
199   }
200   return TRUE;
201 }
202
203 #else
204
205 static void
206 format_last_error (gchar * buf, size_t buf_len)
207 {
208   DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
209   LPCVOID src = NULL;
210   DWORD lang = 0;
211   DWORD id;
212   id = GetLastError ();
213   FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
214   SetLastError (id);
215 }
216
217 static gboolean
218 wake_event (GstPoll * set)
219 {
220   SetLastError (0);
221   errno = 0;
222   if (!SetEvent (set->wakeup_event)) {
223     gchar msg[1024] = "<unknown>";
224     format_last_error (msg, sizeof (msg));
225     g_critical ("%p: failed to set wakup_event: %s", set, msg);
226     errno = EBADF;
227     return FALSE;
228   }
229
230   return TRUE;
231 }
232
233 static gboolean
234 release_event (GstPoll * set)
235 {
236   DWORD status;
237   SetLastError (0);
238   errno = 0;
239
240   status = WaitForSingleObject (set->wakeup_event, INFINITE);
241   if (status) {
242     const gchar *reason = "unknown";
243     gchar msg[1024] = "<unknown>";
244     switch (status) {
245       case WAIT_ABANDONED:
246         reason = "WAIT_ABANDONED";
247         break;
248       case WAIT_TIMEOUT:
249         reason = "WAIT_TIMEOUT";
250         break;
251       case WAIT_FAILED:
252         format_last_error (msg, sizeof (msg));
253         reason = msg;
254         break;
255       default:
256         reason = "other";
257         break;
258     }
259     g_critical ("%p: failed to block on wakup_event: %s", set, reason);
260     errno = EBADF;
261     return FALSE;
262   }
263
264   if (!ResetEvent (set->wakeup_event)) {
265     gchar msg[1024] = "<unknown>";
266     format_last_error (msg, sizeof (msg));
267     g_critical ("%p: failed to reset wakup_event: %s", set, msg);
268     errno = EBADF;
269     return FALSE;
270   }
271
272   return TRUE;
273 }
274
275 #endif
276
277 /* the poll/select call is also performed on a control socket, that way
278  * we can send special commands to control it */
279 static inline gboolean
280 raise_wakeup (GstPoll * set)
281 {
282   gboolean result = TRUE;
283
284   /* makes testing control_pending and WAKE_EVENT() atomic. */
285   g_mutex_lock (&set->lock);
286
287   if (set->control_pending == 0) {
288     /* raise when nothing pending */
289     GST_LOG ("%p: raise", set);
290     result = wake_event (set);
291   }
292
293   if (result) {
294     set->control_pending++;
295   }
296
297   g_mutex_unlock (&set->lock);
298
299   return result;
300 }
301
302 static inline gboolean
303 release_wakeup (GstPoll * set)
304 {
305   gboolean result = FALSE;
306
307   /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
308   g_mutex_lock (&set->lock);
309
310   if (set->control_pending > 0) {
311     /* release, only if this was the last pending. */
312     if (set->control_pending == 1) {
313       GST_LOG ("%p: release", set);
314       result = release_event (set);
315     } else {
316       result = TRUE;
317     }
318
319     if (result) {
320       set->control_pending--;
321     }
322   } else {
323     errno = EWOULDBLOCK;
324   }
325
326   g_mutex_unlock (&set->lock);
327
328   return result;
329 }
330
331 static inline gint
332 release_all_wakeup (GstPoll * set)
333 {
334   gint old;
335
336   /* makes testing control_pending and RELEASE_EVENT() atomic. */
337   g_mutex_lock (&set->lock);
338
339   if ((old = set->control_pending) > 0) {
340     GST_LOG ("%p: releasing %d", set, old);
341     if (release_event (set)) {
342       set->control_pending = 0;
343     } else {
344       old = 0;
345     }
346   }
347
348   g_mutex_unlock (&set->lock);
349
350   return old;
351 }
352
353 static gint
354 find_index (GArray * array, GstPollFD * fd)
355 {
356 #ifndef G_OS_WIN32
357   struct pollfd *ifd;
358 #else
359   WinsockFd *ifd;
360 #endif
361   guint i;
362
363   /* start by assuming the index found in the fd is still valid */
364   if (fd->idx >= 0 && fd->idx < array->len) {
365 #ifndef G_OS_WIN32
366     ifd = &g_array_index (array, struct pollfd, fd->idx);
367 #else
368     ifd = &g_array_index (array, WinsockFd, fd->idx);
369 #endif
370
371     if (ifd->fd == fd->fd) {
372       return fd->idx;
373     }
374   }
375
376   /* the pollfd array has changed and we need to lookup the fd again */
377   for (i = 0; i < array->len; i++) {
378 #ifndef G_OS_WIN32
379     ifd = &g_array_index (array, struct pollfd, i);
380 #else
381     ifd = &g_array_index (array, WinsockFd, i);
382 #endif
383
384     if (ifd->fd == fd->fd) {
385       fd->idx = (gint) i;
386       return fd->idx;
387     }
388   }
389
390   fd->idx = -1;
391   return fd->idx;
392 }
393
394 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
395 /* check if all file descriptors will fit in an fd_set */
396 static gboolean
397 selectable_fds (GstPoll * set)
398 {
399   guint i;
400
401   g_mutex_lock (&set->lock);
402   for (i = 0; i < set->fds->len; i++) {
403     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
404
405     if (pfd->fd >= FD_SETSIZE)
406       goto too_many;
407   }
408   g_mutex_unlock (&set->lock);
409
410   return TRUE;
411
412 too_many:
413   {
414     g_mutex_unlock (&set->lock);
415     return FALSE;
416   }
417 }
418
419 /* check if the timeout will convert to a timeout value used for poll()
420  * without a loss of precision
421  */
422 static gboolean
423 pollable_timeout (GstClockTime timeout)
424 {
425   if (timeout == GST_CLOCK_TIME_NONE)
426     return TRUE;
427
428   /* not a nice multiple of milliseconds */
429   if (timeout % 1000000)
430     return FALSE;
431
432   return TRUE;
433 }
434 #endif
435
436 static GstPollMode
437 choose_mode (GstPoll * set, GstClockTime timeout)
438 {
439   GstPollMode mode;
440
441   if (set->mode == GST_POLL_MODE_AUTO) {
442 #ifdef HAVE_PPOLL
443     mode = GST_POLL_MODE_PPOLL;
444 #elif defined(HAVE_POLL)
445     if (!selectable_fds (set) || pollable_timeout (timeout)) {
446       mode = GST_POLL_MODE_POLL;
447     } else {
448 #ifdef HAVE_PSELECT
449       mode = GST_POLL_MODE_PSELECT;
450 #else
451       mode = GST_POLL_MODE_SELECT;
452 #endif
453     }
454 #elif defined(HAVE_PSELECT)
455     mode = GST_POLL_MODE_PSELECT;
456 #else
457     mode = GST_POLL_MODE_SELECT;
458 #endif
459   } else {
460     mode = set->mode;
461   }
462   return mode;
463 }
464
465 #ifndef G_OS_WIN32
466 static gint
467 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
468     fd_set * errorfds)
469 {
470   gint max_fd = -1;
471   guint i;
472
473   FD_ZERO (readfds);
474   FD_ZERO (writefds);
475   FD_ZERO (errorfds);
476
477   g_mutex_lock (&set->lock);
478
479   for (i = 0; i < set->active_fds->len; i++) {
480     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
481
482     if (pfd->fd < FD_SETSIZE) {
483       if (pfd->events & POLLIN)
484         FD_SET (pfd->fd, readfds);
485       if (pfd->events & POLLOUT)
486         FD_SET (pfd->fd, writefds);
487       if (pfd->events)
488         FD_SET (pfd->fd, errorfds);
489       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
490         max_fd = pfd->fd;
491     }
492   }
493
494   g_mutex_unlock (&set->lock);
495
496   return max_fd;
497 }
498
499 static void
500 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
501     fd_set * errorfds)
502 {
503   guint i;
504
505   g_mutex_lock (&set->lock);
506
507   for (i = 0; i < set->active_fds->len; i++) {
508     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
509
510     if (pfd->fd < FD_SETSIZE) {
511       pfd->revents = 0;
512       if (FD_ISSET (pfd->fd, readfds))
513         pfd->revents |= POLLIN;
514       if (FD_ISSET (pfd->fd, writefds))
515         pfd->revents |= POLLOUT;
516       if (FD_ISSET (pfd->fd, errorfds))
517         pfd->revents |= POLLERR;
518     }
519   }
520
521   g_mutex_unlock (&set->lock);
522 }
523 #else /* G_OS_WIN32 */
524 /*
525  * Translate errors thrown by the Winsock API used by GstPoll:
526  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
527  */
528 static gint
529 gst_poll_winsock_error_to_errno (DWORD last_error)
530 {
531   switch (last_error) {
532     case WSA_INVALID_HANDLE:
533     case WSAEINVAL:
534     case WSAENOTSOCK:
535       return EBADF;
536
537     case WSA_NOT_ENOUGH_MEMORY:
538       return ENOMEM;
539
540       /*
541        * Anything else, including:
542        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
543        *   WSANOTINITIALISED
544        */
545     default:
546       return EINVAL;
547   }
548 }
549
550 static void
551 gst_poll_free_winsock_event (GstPoll * set, gint idx)
552 {
553   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
554   HANDLE event = g_array_index (set->events, HANDLE, idx);
555
556   WSAEventSelect (wfd->fd, event, 0);
557   CloseHandle (event);
558 }
559
560 static void
561 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
562     gboolean active)
563 {
564   WinsockFd *wfd;
565
566   wfd = &g_array_index (set->fds, WinsockFd, idx);
567
568   if (active)
569     wfd->event_mask |= flags;
570   else
571     wfd->event_mask &= ~flags;
572
573   /* reset ignored state if the new mask doesn't overlap at all */
574   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
575     wfd->ignored_event_mask = 0;
576 }
577
578 static gboolean
579 gst_poll_prepare_winsock_active_sets (GstPoll * set)
580 {
581   guint i;
582
583   g_array_set_size (set->active_fds, 0);
584   g_array_set_size (set->active_fds_ignored, 0);
585   g_array_set_size (set->active_events, 0);
586   g_array_append_val (set->active_events, set->wakeup_event);
587
588   for (i = 0; i < set->fds->len; i++) {
589     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
590     HANDLE event = g_array_index (set->events, HANDLE, i);
591
592     if (wfd->ignored_event_mask == 0) {
593       gint ret;
594
595       g_array_append_val (set->active_fds, *wfd);
596       g_array_append_val (set->active_events, event);
597
598       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
599       if (G_UNLIKELY (ret != 0)) {
600         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
601         return FALSE;
602       }
603     } else {
604       g_array_append_val (set->active_fds_ignored, wfd);
605     }
606   }
607
608   return TRUE;
609 }
610
611 static gint
612 gst_poll_collect_winsock_events (GstPoll * set)
613 {
614   gint res, i;
615
616   /*
617    * We need to check which events are signaled, and call
618    * WSAEnumNetworkEvents for those that are, which resets
619    * the event and clears the internal network event records.
620    */
621   res = 0;
622   for (i = 0; i < set->active_fds->len; i++) {
623     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
624     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
625     DWORD wait_ret;
626
627     wait_ret = WaitForSingleObject (event, 0);
628     if (wait_ret == WAIT_OBJECT_0) {
629       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
630
631       if (G_UNLIKELY (enum_ret != 0)) {
632         res = -1;
633         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
634         break;
635       }
636
637       res++;
638     } else {
639       /* clear any previously stored result */
640       memset (&wfd->events, 0, sizeof (wfd->events));
641     }
642   }
643
644   /* If all went well we also need to reset the ignored fds. */
645   if (res >= 0) {
646     res += set->active_fds_ignored->len;
647
648     for (i = 0; i < set->active_fds_ignored->len; i++) {
649       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
650
651       wfd->ignored_event_mask = 0;
652     }
653
654     g_array_set_size (set->active_fds_ignored, 0);
655   }
656
657   return res;
658 }
659 #endif
660
661 /**
662  * gst_poll_new: (skip)
663  * @controllable: whether it should be possible to control a wait.
664  *
665  * Create a new file descriptor set. If @controllable, it
666  * is possible to restart or flush a call to gst_poll_wait() with
667  * gst_poll_restart() and gst_poll_set_flushing() respectively.
668  *
669  * Free-function: gst_poll_free
670  *
671  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
672  *     case of an error.  Free with gst_poll_free().
673  */
674 GstPoll *
675 gst_poll_new (gboolean controllable)
676 {
677   GstPoll *nset;
678
679   nset = g_slice_new0 (GstPoll);
680   GST_DEBUG ("%p: new controllable : %d", nset, controllable);
681   g_mutex_init (&nset->lock);
682 #ifndef G_OS_WIN32
683   nset->mode = GST_POLL_MODE_AUTO;
684   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
685   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
686   nset->control_read_fd.fd = -1;
687   nset->control_write_fd.fd = -1;
688   {
689     gint control_sock[2];
690
691     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
692       goto no_socket_pair;
693
694     nset->control_read_fd.fd = control_sock[0];
695     nset->control_write_fd.fd = control_sock[1];
696
697     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
698     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
699   }
700 #else
701   nset->mode = GST_POLL_MODE_WINDOWS;
702   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
703   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
704   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
705   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
706   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
707
708   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
709 #endif
710
711   /* ensure (re)build, though already sneakily set in non-windows case */
712   MARK_REBUILD (nset);
713
714   nset->controllable = controllable;
715   nset->control_pending = 0;
716
717   return nset;
718
719   /* ERRORS */
720 #ifndef G_OS_WIN32
721 no_socket_pair:
722   {
723     GST_WARNING ("%p: can't create socket pair !", nset);
724     gst_poll_free (nset);
725     return NULL;
726   }
727 #endif
728 }
729
730 /**
731  * gst_poll_new_timer: (skip)
732  *
733  * Create a new poll object that can be used for scheduling cancellable
734  * timeouts.
735  *
736  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
737  * performed from different threads. 
738  *
739  * Free-function: gst_poll_free
740  *
741  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
742  *     case of an error.  Free with gst_poll_free().
743  */
744 GstPoll *
745 gst_poll_new_timer (void)
746 {
747   GstPoll *poll;
748
749   /* make a new controllable poll set */
750   if (!(poll = gst_poll_new (TRUE)))
751     goto done;
752
753   /* we are a timer */
754   poll->timer = TRUE;
755
756 done:
757   return poll;
758 }
759
760 /**
761  * gst_poll_free:
762  * @set: (transfer full): a file descriptor set.
763  *
764  * Free a file descriptor set.
765  */
766 void
767 gst_poll_free (GstPoll * set)
768 {
769   g_return_if_fail (set != NULL);
770
771   GST_DEBUG ("%p: freeing", set);
772
773 #ifndef G_OS_WIN32
774   if (set->control_write_fd.fd >= 0)
775     close (set->control_write_fd.fd);
776   if (set->control_read_fd.fd >= 0)
777     close (set->control_read_fd.fd);
778 #else
779   CloseHandle (set->wakeup_event);
780
781   {
782     guint i;
783
784     for (i = 0; i < set->events->len; i++)
785       gst_poll_free_winsock_event (set, i);
786   }
787
788   g_array_free (set->active_events, TRUE);
789   g_array_free (set->events, TRUE);
790   g_array_free (set->active_fds_ignored, TRUE);
791 #endif
792
793   g_array_free (set->active_fds, TRUE);
794   g_array_free (set->fds, TRUE);
795   g_mutex_clear (&set->lock);
796   g_slice_free (GstPoll, set);
797 }
798
799 /**
800  * gst_poll_get_read_gpollfd:
801  * @set: a #GstPoll
802  * @fd: a #GPollFD
803  *
804  * Get a GPollFD for the reading part of the control socket. This is useful when
805  * integrating with a GSource and GMainLoop.
806  */
807 void
808 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
809 {
810   g_return_if_fail (set != NULL);
811   g_return_if_fail (fd != NULL);
812
813 #ifndef G_OS_WIN32
814   fd->fd = set->control_read_fd.fd;
815 #else
816 #if GLIB_SIZEOF_VOID_P == 8
817   fd->fd = (gint64) set->wakeup_event;
818 #else
819   fd->fd = (gint) set->wakeup_event;
820 #endif
821 #endif
822   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
823   fd->revents = 0;
824 }
825
826 /**
827  * gst_poll_fd_init:
828  * @fd: a #GstPollFD
829  *
830  * Initializes @fd. Alternatively you can initialize it with
831  * #GST_POLL_FD_INIT.
832  */
833 void
834 gst_poll_fd_init (GstPollFD * fd)
835 {
836   g_return_if_fail (fd != NULL);
837
838   fd->fd = -1;
839   fd->idx = -1;
840 }
841
842 static gboolean
843 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
844 {
845   gint idx;
846
847   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
848
849   idx = find_index (set->fds, fd);
850   if (idx < 0) {
851 #ifndef G_OS_WIN32
852     struct pollfd nfd;
853
854     nfd.fd = fd->fd;
855     nfd.events = POLLERR | POLLNVAL | POLLHUP;
856     nfd.revents = 0;
857
858     g_array_append_val (set->fds, nfd);
859
860     fd->idx = set->fds->len - 1;
861 #else
862     WinsockFd wfd;
863     HANDLE event;
864
865     wfd.fd = fd->fd;
866     wfd.event_mask = FD_CLOSE;
867     memset (&wfd.events, 0, sizeof (wfd.events));
868     wfd.ignored_event_mask = 0;
869     event = WSACreateEvent ();
870
871     g_array_append_val (set->fds, wfd);
872     g_array_append_val (set->events, event);
873
874     fd->idx = set->fds->len - 1;
875 #endif
876     MARK_REBUILD (set);
877   } else {
878     GST_WARNING ("%p: fd already added !", set);
879   }
880
881   return TRUE;
882 }
883
884 /**
885  * gst_poll_add_fd:
886  * @set: a file descriptor set.
887  * @fd: a file descriptor.
888  *
889  * Add a file descriptor to the file descriptor set.
890  *
891  * Returns: %TRUE if the file descriptor was successfully added to the set.
892  */
893 gboolean
894 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
895 {
896   gboolean ret;
897
898   g_return_val_if_fail (set != NULL, FALSE);
899   g_return_val_if_fail (fd != NULL, FALSE);
900   g_return_val_if_fail (fd->fd >= 0, FALSE);
901
902   g_mutex_lock (&set->lock);
903
904   ret = gst_poll_add_fd_unlocked (set, fd);
905
906   g_mutex_unlock (&set->lock);
907
908   return ret;
909 }
910
911 /**
912  * gst_poll_remove_fd:
913  * @set: a file descriptor set.
914  * @fd: a file descriptor.
915  *
916  * Remove a file descriptor from the file descriptor set.
917  *
918  * Returns: %TRUE if the file descriptor was successfully removed from the set.
919  */
920 gboolean
921 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
922 {
923   gint idx;
924
925   g_return_val_if_fail (set != NULL, FALSE);
926   g_return_val_if_fail (fd != NULL, FALSE);
927   g_return_val_if_fail (fd->fd >= 0, FALSE);
928
929
930   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
931
932   g_mutex_lock (&set->lock);
933
934   /* get the index, -1 is an fd that is not added */
935   idx = find_index (set->fds, fd);
936   if (idx >= 0) {
937 #ifdef G_OS_WIN32
938     gst_poll_free_winsock_event (set, idx);
939     g_array_remove_index_fast (set->events, idx);
940 #endif
941
942     /* remove the fd at index, we use _remove_index_fast, which copies the last
943      * element of the array to the freed index */
944     g_array_remove_index_fast (set->fds, idx);
945
946     /* mark fd as removed by setting the index to -1 */
947     fd->idx = -1;
948     MARK_REBUILD (set);
949   } else {
950     GST_WARNING ("%p: couldn't find fd !", set);
951   }
952
953   g_mutex_unlock (&set->lock);
954
955   return idx >= 0;
956 }
957
958 /**
959  * gst_poll_fd_ctl_write:
960  * @set: a file descriptor set.
961  * @fd: a file descriptor.
962  * @active: a new status.
963  *
964  * Control whether the descriptor @fd in @set will be monitored for
965  * writability.
966  *
967  * Returns: %TRUE if the descriptor was successfully updated.
968  */
969 gboolean
970 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
971 {
972   gint idx;
973
974   g_return_val_if_fail (set != NULL, FALSE);
975   g_return_val_if_fail (fd != NULL, FALSE);
976   g_return_val_if_fail (fd->fd >= 0, FALSE);
977
978   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
979       fd->fd, fd->idx, active);
980
981   g_mutex_lock (&set->lock);
982
983   idx = find_index (set->fds, fd);
984   if (idx >= 0) {
985 #ifndef G_OS_WIN32
986     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
987
988     if (active)
989       pfd->events |= POLLOUT;
990     else
991       pfd->events &= ~POLLOUT;
992
993     GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
994 #else
995     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
996         active);
997 #endif
998     MARK_REBUILD (set);
999   } else {
1000     GST_WARNING ("%p: couldn't find fd !", set);
1001   }
1002
1003   g_mutex_unlock (&set->lock);
1004
1005   return idx >= 0;
1006 }
1007
1008 static gboolean
1009 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
1010 {
1011   gint idx;
1012
1013   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1014       fd->fd, fd->idx, active);
1015
1016   idx = find_index (set->fds, fd);
1017
1018   if (idx >= 0) {
1019 #ifndef G_OS_WIN32
1020     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1021
1022     if (active)
1023       pfd->events |= (POLLIN | POLLPRI);
1024     else
1025       pfd->events &= ~(POLLIN | POLLPRI);
1026 #else
1027     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
1028 #endif
1029     MARK_REBUILD (set);
1030   } else {
1031     GST_WARNING ("%p: couldn't find fd !", set);
1032   }
1033
1034   return idx >= 0;
1035 }
1036
1037 /**
1038  * gst_poll_fd_ctl_read:
1039  * @set: a file descriptor set.
1040  * @fd: a file descriptor.
1041  * @active: a new status.
1042  *
1043  * Control whether the descriptor @fd in @set will be monitored for
1044  * readability.
1045  *
1046  * Returns: %TRUE if the descriptor was successfully updated.
1047  */
1048 gboolean
1049 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
1050 {
1051   gboolean ret;
1052
1053   g_return_val_if_fail (set != NULL, FALSE);
1054   g_return_val_if_fail (fd != NULL, FALSE);
1055   g_return_val_if_fail (fd->fd >= 0, FALSE);
1056
1057   g_mutex_lock (&set->lock);
1058
1059   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
1060
1061   g_mutex_unlock (&set->lock);
1062
1063   return ret;
1064 }
1065
1066 /**
1067  * gst_poll_fd_ignored:
1068  * @set: a file descriptor set.
1069  * @fd: a file descriptor.
1070  *
1071  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
1072  * the same result for @fd as last time. This function must be called if no
1073  * operation (read/write/recv/send/etc.) will be performed on @fd before
1074  * the next call to gst_poll_wait().
1075  *
1076  * The reason why this is needed is because the underlying implementation
1077  * might not allow querying the fd more than once between calls to one of
1078  * the re-enabling operations.
1079  */
1080 void
1081 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
1082 {
1083 #ifdef G_OS_WIN32
1084   gint idx;
1085
1086   g_return_if_fail (set != NULL);
1087   g_return_if_fail (fd != NULL);
1088   g_return_if_fail (fd->fd >= 0);
1089
1090   g_mutex_lock (&set->lock);
1091
1092   idx = find_index (set->fds, fd);
1093   if (idx >= 0) {
1094     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
1095
1096     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
1097     MARK_REBUILD (set);
1098   }
1099
1100   g_mutex_unlock (&set->lock);
1101 #endif
1102 }
1103
1104 /**
1105  * gst_poll_fd_has_closed:
1106  * @set: a file descriptor set.
1107  * @fd: a file descriptor.
1108  *
1109  * Check if @fd in @set has closed the connection.
1110  *
1111  * Returns: %TRUE if the connection was closed.
1112  */
1113 gboolean
1114 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1115 {
1116   gboolean res = FALSE;
1117   gint idx;
1118
1119   g_return_val_if_fail (set != NULL, FALSE);
1120   g_return_val_if_fail (fd != NULL, FALSE);
1121   g_return_val_if_fail (fd->fd >= 0, FALSE);
1122
1123   g_mutex_lock (&((GstPoll *) set)->lock);
1124
1125   idx = find_index (set->active_fds, fd);
1126   if (idx >= 0) {
1127 #ifndef G_OS_WIN32
1128     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1129
1130     res = (pfd->revents & POLLHUP) != 0;
1131 #else
1132     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1133
1134     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1135 #endif
1136   } else {
1137     GST_WARNING ("%p: couldn't find fd !", set);
1138   }
1139   g_mutex_unlock (&((GstPoll *) set)->lock);
1140
1141   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1142
1143   return res;
1144 }
1145
1146 /**
1147  * gst_poll_fd_has_error:
1148  * @set: a file descriptor set.
1149  * @fd: a file descriptor.
1150  *
1151  * Check if @fd in @set has an error.
1152  *
1153  * Returns: %TRUE if the descriptor has an error.
1154  */
1155 gboolean
1156 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1157 {
1158   gboolean res = FALSE;
1159   gint idx;
1160
1161   g_return_val_if_fail (set != NULL, FALSE);
1162   g_return_val_if_fail (fd != NULL, FALSE);
1163   g_return_val_if_fail (fd->fd >= 0, FALSE);
1164
1165   g_mutex_lock (&((GstPoll *) set)->lock);
1166
1167   idx = find_index (set->active_fds, fd);
1168   if (idx >= 0) {
1169 #ifndef G_OS_WIN32
1170     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1171
1172     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1173 #else
1174     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1175
1176     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1177         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1178         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1179         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1180         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1181 #endif
1182   } else {
1183     GST_WARNING ("%p: couldn't find fd !", set);
1184   }
1185   g_mutex_unlock (&((GstPoll *) set)->lock);
1186
1187   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1188
1189   return res;
1190 }
1191
1192 static gboolean
1193 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1194 {
1195   gboolean res = FALSE;
1196   gint idx;
1197
1198   idx = find_index (set->active_fds, fd);
1199   if (idx >= 0) {
1200 #ifndef G_OS_WIN32
1201     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1202
1203     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1204 #else
1205     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1206
1207     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1208 #endif
1209   } else {
1210     GST_WARNING ("%p: couldn't find fd !", set);
1211   }
1212   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1213
1214   return res;
1215 }
1216
1217 /**
1218  * gst_poll_fd_can_read:
1219  * @set: a file descriptor set.
1220  * @fd: a file descriptor.
1221  *
1222  * Check if @fd in @set has data to be read.
1223  *
1224  * Returns: %TRUE if the descriptor has data to be read.
1225  */
1226 gboolean
1227 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1228 {
1229   gboolean res = FALSE;
1230
1231   g_return_val_if_fail (set != NULL, FALSE);
1232   g_return_val_if_fail (fd != NULL, FALSE);
1233   g_return_val_if_fail (fd->fd >= 0, FALSE);
1234
1235   g_mutex_lock (&((GstPoll *) set)->lock);
1236
1237   res = gst_poll_fd_can_read_unlocked (set, fd);
1238
1239   g_mutex_unlock (&((GstPoll *) set)->lock);
1240
1241   return res;
1242 }
1243
1244 /**
1245  * gst_poll_fd_can_write:
1246  * @set: a file descriptor set.
1247  * @fd: a file descriptor.
1248  *
1249  * Check if @fd in @set can be used for writing.
1250  *
1251  * Returns: %TRUE if the descriptor can be used for writing.
1252  */
1253 gboolean
1254 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1255 {
1256   gboolean res = FALSE;
1257   gint idx;
1258
1259   g_return_val_if_fail (set != NULL, FALSE);
1260   g_return_val_if_fail (fd != NULL, FALSE);
1261   g_return_val_if_fail (fd->fd >= 0, FALSE);
1262
1263   g_mutex_lock (&((GstPoll *) set)->lock);
1264
1265   idx = find_index (set->active_fds, fd);
1266   if (idx >= 0) {
1267 #ifndef G_OS_WIN32
1268     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1269
1270     res = (pfd->revents & POLLOUT) != 0;
1271 #else
1272     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1273
1274     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1275 #endif
1276   } else {
1277     GST_WARNING ("%p: couldn't find fd !", set);
1278   }
1279   g_mutex_unlock (&((GstPoll *) set)->lock);
1280
1281   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1282
1283   return res;
1284 }
1285
1286 /**
1287  * gst_poll_wait:
1288  * @set: a #GstPoll.
1289  * @timeout: a timeout in nanoseconds.
1290  *
1291  * Wait for activity on the file descriptors in @set. This function waits up to
1292  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1293  *
1294  * For #GstPoll objects created with gst_poll_new(), this function can only be
1295  * called from a single thread at a time.  If called from multiple threads,
1296  * -1 will be returned with errno set to EPERM.
1297  *
1298  * This is not true for timer #GstPoll objects created with
1299  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1300  * simultaneously.
1301  *
1302  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1303  * activity was detected after @timeout. If an error occurs, -1 is returned
1304  * and errno is set.
1305  */
1306 gint
1307 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1308 {
1309   gboolean restarting;
1310   gboolean is_timer;
1311   int res;
1312   gint old_waiting;
1313
1314   g_return_val_if_fail (set != NULL, -1);
1315
1316   GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1317
1318   is_timer = set->timer;
1319
1320   /* add one more waiter */
1321   old_waiting = INC_WAITING (set);
1322
1323   /* we cannot wait from multiple threads unless we are a timer */
1324   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1325     goto already_waiting;
1326
1327   /* flushing, exit immediately */
1328   if (G_UNLIKELY (IS_FLUSHING (set)))
1329     goto flushing;
1330
1331   do {
1332     GstPollMode mode;
1333
1334     res = -1;
1335     restarting = FALSE;
1336
1337     mode = choose_mode (set, timeout);
1338
1339     if (TEST_REBUILD (set)) {
1340       g_mutex_lock (&set->lock);
1341 #ifndef G_OS_WIN32
1342       g_array_set_size (set->active_fds, set->fds->len);
1343       memcpy (set->active_fds->data, set->fds->data,
1344           set->fds->len * sizeof (struct pollfd));
1345 #else
1346       if (!gst_poll_prepare_winsock_active_sets (set))
1347         goto winsock_error;
1348 #endif
1349       g_mutex_unlock (&set->lock);
1350     }
1351
1352     switch (mode) {
1353       case GST_POLL_MODE_AUTO:
1354         g_assert_not_reached ();
1355         break;
1356       case GST_POLL_MODE_PPOLL:
1357       {
1358 #ifdef HAVE_PPOLL
1359         struct timespec ts;
1360         struct timespec *tsptr;
1361
1362         if (timeout != GST_CLOCK_TIME_NONE) {
1363           GST_TIME_TO_TIMESPEC (timeout, ts);
1364           tsptr = &ts;
1365         } else {
1366           tsptr = NULL;
1367         }
1368
1369         res =
1370             ppoll ((struct pollfd *) set->active_fds->data,
1371             set->active_fds->len, tsptr, NULL);
1372 #else
1373         g_assert_not_reached ();
1374         errno = ENOSYS;
1375 #endif
1376         break;
1377       }
1378       case GST_POLL_MODE_POLL:
1379       {
1380 #ifdef HAVE_POLL
1381         gint t;
1382
1383         if (timeout != GST_CLOCK_TIME_NONE) {
1384           t = GST_TIME_AS_MSECONDS (timeout);
1385         } else {
1386           t = -1;
1387         }
1388
1389         res =
1390             poll ((struct pollfd *) set->active_fds->data,
1391             set->active_fds->len, t);
1392 #else
1393         g_assert_not_reached ();
1394         errno = ENOSYS;
1395 #endif
1396         break;
1397       }
1398       case GST_POLL_MODE_PSELECT:
1399 #ifndef HAVE_PSELECT
1400       {
1401         g_assert_not_reached ();
1402         errno = ENOSYS;
1403         break;
1404       }
1405 #endif
1406       case GST_POLL_MODE_SELECT:
1407       {
1408 #ifndef G_OS_WIN32
1409         fd_set readfds;
1410         fd_set writefds;
1411         fd_set errorfds;
1412         gint max_fd;
1413
1414         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1415
1416         if (mode == GST_POLL_MODE_SELECT) {
1417           struct timeval tv;
1418           struct timeval *tvptr;
1419
1420           if (timeout != GST_CLOCK_TIME_NONE) {
1421             GST_TIME_TO_TIMEVAL (timeout, tv);
1422             tvptr = &tv;
1423           } else {
1424             tvptr = NULL;
1425           }
1426
1427           GST_DEBUG ("%p: Calling select", set);
1428           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1429           GST_DEBUG ("%p: After select, res:%d", set, res);
1430         } else {
1431 #ifdef HAVE_PSELECT
1432           struct timespec ts;
1433           struct timespec *tsptr;
1434
1435           if (timeout != GST_CLOCK_TIME_NONE) {
1436             GST_TIME_TO_TIMESPEC (timeout, ts);
1437             tsptr = &ts;
1438           } else {
1439             tsptr = NULL;
1440           }
1441
1442           GST_DEBUG ("%p: Calling pselect", set);
1443           res =
1444               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1445           GST_DEBUG ("%p: After pselect, res:%d", set, res);
1446 #endif
1447         }
1448
1449         if (res >= 0) {
1450           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1451         }
1452 #else /* G_OS_WIN32 */
1453         g_assert_not_reached ();
1454         errno = ENOSYS;
1455 #endif
1456         break;
1457       }
1458       case GST_POLL_MODE_WINDOWS:
1459       {
1460 #ifdef G_OS_WIN32
1461         gint ignore_count = set->active_fds_ignored->len;
1462         DWORD t, wait_ret;
1463
1464         if (G_LIKELY (ignore_count == 0)) {
1465           if (timeout != GST_CLOCK_TIME_NONE)
1466             t = GST_TIME_AS_MSECONDS (timeout);
1467           else
1468             t = INFINITE;
1469         } else {
1470           /* already one or more ignored fds, so we quickly sweep the others */
1471           t = 0;
1472         }
1473
1474         if (set->active_events->len != 0) {
1475           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1476               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1477         } else {
1478           wait_ret = WSA_WAIT_FAILED;
1479           WSASetLastError (WSA_INVALID_PARAMETER);
1480         }
1481
1482         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1483           res = 0;
1484         } else if (wait_ret == WSA_WAIT_FAILED) {
1485           res = -1;
1486           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1487         } else {
1488           /* the first entry is the wakeup event */
1489           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1490             res = gst_poll_collect_winsock_events (set);
1491           } else {
1492             res = 1;            /* wakeup event */
1493           }
1494         }
1495 #else
1496         g_assert_not_reached ();
1497         errno = ENOSYS;
1498 #endif
1499         break;
1500       }
1501     }
1502
1503     if (!is_timer) {
1504       /* Applications needs to clear the control socket themselves for timer
1505        * polls.
1506        * For other polls, we need to clear the control socket. If there was only
1507        * one socket with activity and it was the control socket, we need to
1508        * restart */
1509       if (release_all_wakeup (set) > 0 && res == 1)
1510         restarting = TRUE;
1511     }
1512
1513     /* we got woken up and we are flushing, we need to stop */
1514     if (G_UNLIKELY (IS_FLUSHING (set)))
1515       goto flushing;
1516
1517   } while (G_UNLIKELY (restarting));
1518
1519   DEC_WAITING (set);
1520
1521   return res;
1522
1523   /* ERRORS */
1524 already_waiting:
1525   {
1526     GST_LOG ("%p: we are already waiting", set);
1527     DEC_WAITING (set);
1528     errno = EPERM;
1529     return -1;
1530   }
1531 flushing:
1532   {
1533     GST_LOG ("%p: we are flushing", set);
1534     DEC_WAITING (set);
1535     errno = EBUSY;
1536     return -1;
1537   }
1538 #ifdef G_OS_WIN32
1539 winsock_error:
1540   {
1541     GST_LOG ("%p: winsock error", set);
1542     g_mutex_unlock (&set->lock);
1543     DEC_WAITING (set);
1544     return -1;
1545   }
1546 #endif
1547 }
1548
1549 /**
1550  * gst_poll_set_controllable:
1551  * @set: a #GstPoll.
1552  * @controllable: new controllable state.
1553  *
1554  * When @controllable is %TRUE, this function ensures that future calls to
1555  * gst_poll_wait() will be affected by gst_poll_restart() and
1556  * gst_poll_set_flushing().
1557  *
1558  * This function only works for non-timer #GstPoll objects created with
1559  * gst_poll_new().
1560  *
1561  * Returns: %TRUE if the controllability of @set could be updated.
1562  */
1563 gboolean
1564 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1565 {
1566   g_return_val_if_fail (set != NULL, FALSE);
1567   g_return_val_if_fail (!set->timer, FALSE);
1568
1569   GST_LOG ("%p: controllable : %d", set, controllable);
1570
1571   set->controllable = controllable;
1572
1573   return TRUE;
1574 }
1575
1576 /**
1577  * gst_poll_restart:
1578  * @set: a #GstPoll.
1579  *
1580  * Restart any gst_poll_wait() that is in progress. This function is typically
1581  * used after adding or removing descriptors to @set.
1582  *
1583  * If @set is not controllable, then this call will have no effect.
1584  *
1585  * This function only works for non-timer #GstPoll objects created with
1586  * gst_poll_new().
1587  */
1588 void
1589 gst_poll_restart (GstPoll * set)
1590 {
1591   g_return_if_fail (set != NULL);
1592   g_return_if_fail (!set->timer);
1593
1594   if (set->controllable && GET_WAITING (set) > 0) {
1595     /* we are controllable and waiting, wake up the waiter. The socket will be
1596      * cleared by the _wait() thread and the poll will be restarted */
1597     raise_wakeup (set);
1598   }
1599 }
1600
1601 /**
1602  * gst_poll_set_flushing:
1603  * @set: a #GstPoll.
1604  * @flushing: new flushing state.
1605  *
1606  * When @flushing is %TRUE, this function ensures that current and future calls
1607  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1608  *
1609  * Unsetting the flushing state will restore normal operation of @set.
1610  *
1611  * This function only works for non-timer #GstPoll objects created with
1612  * gst_poll_new().
1613  */
1614 void
1615 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1616 {
1617   g_return_if_fail (set != NULL);
1618   g_return_if_fail (!set->timer);
1619
1620   GST_LOG ("%p: flushing: %d", set, flushing);
1621
1622   /* update the new state first */
1623   SET_FLUSHING (set, flushing);
1624
1625   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1626     /* we are flushing, controllable and waiting, wake up the waiter. When we
1627      * stop the flushing operation we don't clear the wakeup fd here, this will
1628      * happen in the _wait() thread. */
1629     raise_wakeup (set);
1630   }
1631 }
1632
1633 /**
1634  * gst_poll_write_control:
1635  * @set: a #GstPoll.
1636  *
1637  * Write a byte to the control socket of the controllable @set.
1638  * This function is mostly useful for timer #GstPoll objects created with
1639  * gst_poll_new_timer(). 
1640  *
1641  * It will make any current and future gst_poll_wait() function return with
1642  * 1, meaning the control socket is set. After an equal amount of calls to
1643  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1644  * block again until their timeout expired.
1645  *
1646  * This function only works for timer #GstPoll objects created with
1647  * gst_poll_new_timer().
1648  *
1649  * Returns: %TRUE on success. %FALSE when when the byte could not be written.
1650  * errno contains the detailed error code but will never be EAGAIN, EINTR or
1651  * EWOULDBLOCK. %FALSE always signals a critical error.
1652  */
1653 gboolean
1654 gst_poll_write_control (GstPoll * set)
1655 {
1656   gboolean res;
1657
1658   g_return_val_if_fail (set != NULL, FALSE);
1659   g_return_val_if_fail (set->timer, FALSE);
1660
1661   res = raise_wakeup (set);
1662
1663   return res;
1664 }
1665
1666 /**
1667  * gst_poll_read_control:
1668  * @set: a #GstPoll.
1669  *
1670  * Read a byte from the control socket of the controllable @set.
1671  *
1672  * This function only works for timer #GstPoll objects created with
1673  * gst_poll_new_timer().
1674  *
1675  * Returns: %TRUE on success. %FALSE when when there was no byte to read or
1676  * reading the byte failed. If there was no byte to read, and only then, errno
1677  * will contain EWOULDBLOCK or EAGAIN. For all other values of errno this always signals a
1678  * critical error.
1679  */
1680 gboolean
1681 gst_poll_read_control (GstPoll * set)
1682 {
1683   gboolean res;
1684
1685   g_return_val_if_fail (set != NULL, FALSE);
1686   g_return_val_if_fail (set->timer, FALSE);
1687
1688   res = release_wakeup (set);
1689
1690   return res;
1691 }