poll: Fix various race conditions with read_control() and write_control()
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancellable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 /* OS/X needs this because of bad headers */
88 #include <string.h>
89
90 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
91  * so we prefer our own poll emulation.
92  */
93 #if defined(BROKEN_POLL)
94 #undef HAVE_POLL
95 #endif
96
97 #include "gstpoll.h"
98
99 #define GST_CAT_DEFAULT GST_CAT_POLL
100
101 #ifdef G_OS_WIN32
102 typedef struct _WinsockFd WinsockFd;
103
104 struct _WinsockFd
105 {
106   gint fd;
107   glong event_mask;
108   WSANETWORKEVENTS events;
109   glong ignored_event_mask;
110 };
111 #endif
112
113 typedef enum
114 {
115   GST_POLL_MODE_AUTO,
116   GST_POLL_MODE_SELECT,
117   GST_POLL_MODE_PSELECT,
118   GST_POLL_MODE_POLL,
119   GST_POLL_MODE_PPOLL,
120   GST_POLL_MODE_WINDOWS
121 } GstPollMode;
122
123 struct _GstPoll
124 {
125   GstPollMode mode;
126
127   GMutex lock;
128   /* array of fds, always written to and read from with lock */
129   GArray *fds;
130   /* array of active fds, only written to from the waiting thread with the
131    * lock and read from with the lock or without the lock from the waiting
132    * thread */
133   GArray *active_fds;
134
135 #ifndef G_OS_WIN32
136   GstPollFD control_read_fd;
137   GstPollFD control_write_fd;
138 #else
139   GArray *active_fds_ignored;
140   GArray *events;
141   GArray *active_events;
142
143   HANDLE wakeup_event;
144 #endif
145
146   gboolean controllable;
147   volatile gint waiting;
148   volatile gint control_pending;
149   volatile gint flushing;
150   gboolean timer;
151   volatile gint rebuild;
152 };
153
154 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
155     gboolean active);
156 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
157
158 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
159 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
160
161 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
162 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
163 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
164
165 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
166 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
167
168 #ifndef G_OS_WIN32
169
170 static gboolean
171 wake_event (GstPoll * set)
172 {
173   ssize_t num_written;
174   while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
175     if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
176       g_critical ("%p: failed to wake event: %s", set, strerror (errno));
177       return FALSE;
178     }
179   }
180   return TRUE;
181 }
182
183 static gboolean
184 release_event (GstPoll * set)
185 {
186   gchar buf[1] = { '\0' };
187   ssize_t num_read;
188   while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
189     if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
190       g_critical ("%p: failed to release event: %s", set, strerror (errno));
191       return FALSE;
192     }
193   }
194   return TRUE;
195 }
196
197 #else
198
199 static void
200 format_last_error (gchar * buf, size_t buf_len)
201 {
202   DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
203   LPCVOID src = NULL;
204   DWORD lang = 0;
205   DWORD id;
206   id = GetLastError ();
207   FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
208   SetLastError (id);
209 }
210
211 static gboolean
212 wake_event (GstPoll * set)
213 {
214   SetLastError (0);
215   errno = 0;
216   if (!SetEvent (set->wakeup_event)) {
217     gchar msg[1024] = "<unknown>";
218     format_last_error (msg, sizeof (msg));
219     g_critical ("%p: failed to set wakup_event: %s", set, msg);
220     errno = EBADF;
221     return FALSE;
222   }
223
224   return TRUE;
225 }
226
227 static gboolean
228 release_event (GstPoll * set)
229 {
230   DWORD status;
231   SetLastError (0);
232   errno = 0;
233   if (status = WaitForSingleObject (set->wakeup_event, INFINITE)) {
234     const gchar *reason = "unknown";
235     gchar msg[1024] = "<unknown>";
236     switch (status) {
237       case WAIT_ABANDONED:
238         reason = "WAIT_ABANDONED";
239         break;
240       case WAIT_TIMEOUT:
241         reason = "WAIT_TIMEOUT";
242         break;
243       case WAIT_FAILED:
244         format_last_error (msg, sizeof (msg));
245         reason = msg;
246         break;
247       default:
248         reason = "other";
249         break;
250     }
251     g_critical ("%p: failed to block on wakup_event: %s", set, reason);
252     errno = EBADF;
253     return FALSE;
254   }
255
256   if (!ResetEvent (set->wakeup_event)) {
257     gchar msg[1024] = "<unknown>";
258     format_last_error (msg, sizeof (msg));
259     g_critical ("%p: failed to reset wakup_event: %s", set, msg);
260     errno = EBADF;
261     return FALSE;
262   }
263
264   return TRUE;
265 }
266
267 #endif
268
269 /* the poll/select call is also performed on a control socket, that way
270  * we can send special commands to control it */
271 static inline gboolean
272 raise_wakeup (GstPoll * set)
273 {
274   gboolean result = TRUE;
275
276   /* makes testing control_pending and WAKE_EVENT() atomic. */
277   g_mutex_lock (&set->lock);
278
279   if (set->control_pending == 0) {
280     /* raise when nothing pending */
281     GST_LOG ("%p: raise", set);
282     result = wake_event (set);
283   }
284
285   if (result) {
286     set->control_pending++;
287   }
288
289   g_mutex_unlock (&set->lock);
290
291   return result;
292 }
293
294 static inline gboolean
295 release_wakeup (GstPoll * set)
296 {
297   gboolean result = FALSE;
298
299   /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
300   g_mutex_lock (&set->lock);
301
302   if (set->control_pending > 0) {
303     /* release, only if this was the last pending. */
304     if (set->control_pending == 1) {
305       GST_LOG ("%p: release", set);
306       result = release_event (set);
307     } else {
308       result = TRUE;
309     }
310
311     if (result) {
312       set->control_pending--;
313     }
314   } else {
315     errno = EWOULDBLOCK;
316   }
317
318   g_mutex_unlock (&set->lock);
319
320   return result;
321 }
322
323 static inline gint
324 release_all_wakeup (GstPoll * set)
325 {
326   gint old;
327
328   /* makes testing control_pending and RELEASE_EVENT() atomic. */
329   g_mutex_lock (&set->lock);
330
331   if ((old = set->control_pending) > 0) {
332     GST_LOG ("%p: releasing %d", set, old);
333     if (release_event (set)) {
334       set->control_pending = 0;
335     } else {
336       old = 0;
337     }
338   }
339
340   g_mutex_unlock (&set->lock);
341
342   return old;
343 }
344
345 static gint
346 find_index (GArray * array, GstPollFD * fd)
347 {
348 #ifndef G_OS_WIN32
349   struct pollfd *ifd;
350 #else
351   WinsockFd *ifd;
352 #endif
353   guint i;
354
355   /* start by assuming the index found in the fd is still valid */
356   if (fd->idx >= 0 && fd->idx < array->len) {
357 #ifndef G_OS_WIN32
358     ifd = &g_array_index (array, struct pollfd, fd->idx);
359 #else
360     ifd = &g_array_index (array, WinsockFd, fd->idx);
361 #endif
362
363     if (ifd->fd == fd->fd) {
364       return fd->idx;
365     }
366   }
367
368   /* the pollfd array has changed and we need to lookup the fd again */
369   for (i = 0; i < array->len; i++) {
370 #ifndef G_OS_WIN32
371     ifd = &g_array_index (array, struct pollfd, i);
372 #else
373     ifd = &g_array_index (array, WinsockFd, i);
374 #endif
375
376     if (ifd->fd == fd->fd) {
377       fd->idx = (gint) i;
378       return fd->idx;
379     }
380   }
381
382   fd->idx = -1;
383   return fd->idx;
384 }
385
386 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
387 /* check if all file descriptors will fit in an fd_set */
388 static gboolean
389 selectable_fds (GstPoll * set)
390 {
391   guint i;
392
393   g_mutex_lock (&set->lock);
394   for (i = 0; i < set->fds->len; i++) {
395     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
396
397     if (pfd->fd >= FD_SETSIZE)
398       goto too_many;
399   }
400   g_mutex_unlock (&set->lock);
401
402   return TRUE;
403
404 too_many:
405   {
406     g_mutex_unlock (&set->lock);
407     return FALSE;
408   }
409 }
410
411 /* check if the timeout will convert to a timeout value used for poll()
412  * without a loss of precision
413  */
414 static gboolean
415 pollable_timeout (GstClockTime timeout)
416 {
417   if (timeout == GST_CLOCK_TIME_NONE)
418     return TRUE;
419
420   /* not a nice multiple of milliseconds */
421   if (timeout % 1000000)
422     return FALSE;
423
424   return TRUE;
425 }
426 #endif
427
428 static GstPollMode
429 choose_mode (GstPoll * set, GstClockTime timeout)
430 {
431   GstPollMode mode;
432
433   if (set->mode == GST_POLL_MODE_AUTO) {
434 #ifdef HAVE_PPOLL
435     mode = GST_POLL_MODE_PPOLL;
436 #elif defined(HAVE_POLL)
437     if (!selectable_fds (set) || pollable_timeout (timeout)) {
438       mode = GST_POLL_MODE_POLL;
439     } else {
440 #ifdef HAVE_PSELECT
441       mode = GST_POLL_MODE_PSELECT;
442 #else
443       mode = GST_POLL_MODE_SELECT;
444 #endif
445     }
446 #elif defined(HAVE_PSELECT)
447     mode = GST_POLL_MODE_PSELECT;
448 #else
449     mode = GST_POLL_MODE_SELECT;
450 #endif
451   } else {
452     mode = set->mode;
453   }
454   return mode;
455 }
456
457 #ifndef G_OS_WIN32
458 static gint
459 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
460     fd_set * errorfds)
461 {
462   gint max_fd = -1;
463   guint i;
464
465   FD_ZERO (readfds);
466   FD_ZERO (writefds);
467   FD_ZERO (errorfds);
468
469   g_mutex_lock (&set->lock);
470
471   for (i = 0; i < set->active_fds->len; i++) {
472     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
473
474     if (pfd->fd < FD_SETSIZE) {
475       if (pfd->events & POLLIN)
476         FD_SET (pfd->fd, readfds);
477       if (pfd->events & POLLOUT)
478         FD_SET (pfd->fd, writefds);
479       if (pfd->events)
480         FD_SET (pfd->fd, errorfds);
481       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
482         max_fd = pfd->fd;
483     }
484   }
485
486   g_mutex_unlock (&set->lock);
487
488   return max_fd;
489 }
490
491 static void
492 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
493     fd_set * errorfds)
494 {
495   guint i;
496
497   g_mutex_lock (&set->lock);
498
499   for (i = 0; i < set->active_fds->len; i++) {
500     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
501
502     if (pfd->fd < FD_SETSIZE) {
503       pfd->revents = 0;
504       if (FD_ISSET (pfd->fd, readfds))
505         pfd->revents |= POLLIN;
506       if (FD_ISSET (pfd->fd, writefds))
507         pfd->revents |= POLLOUT;
508       if (FD_ISSET (pfd->fd, errorfds))
509         pfd->revents |= POLLERR;
510     }
511   }
512
513   g_mutex_unlock (&set->lock);
514 }
515 #else /* G_OS_WIN32 */
516 /*
517  * Translate errors thrown by the Winsock API used by GstPoll:
518  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
519  */
520 static gint
521 gst_poll_winsock_error_to_errno (DWORD last_error)
522 {
523   switch (last_error) {
524     case WSA_INVALID_HANDLE:
525     case WSAEINVAL:
526     case WSAENOTSOCK:
527       return EBADF;
528
529     case WSA_NOT_ENOUGH_MEMORY:
530       return ENOMEM;
531
532       /*
533        * Anything else, including:
534        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
535        *   WSANOTINITIALISED
536        */
537     default:
538       return EINVAL;
539   }
540 }
541
542 static void
543 gst_poll_free_winsock_event (GstPoll * set, gint idx)
544 {
545   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
546   HANDLE event = g_array_index (set->events, HANDLE, idx);
547
548   WSAEventSelect (wfd->fd, event, 0);
549   CloseHandle (event);
550 }
551
552 static void
553 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
554     gboolean active)
555 {
556   WinsockFd *wfd;
557
558   wfd = &g_array_index (set->fds, WinsockFd, idx);
559
560   if (active)
561     wfd->event_mask |= flags;
562   else
563     wfd->event_mask &= ~flags;
564
565   /* reset ignored state if the new mask doesn't overlap at all */
566   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
567     wfd->ignored_event_mask = 0;
568 }
569
570 static gboolean
571 gst_poll_prepare_winsock_active_sets (GstPoll * set)
572 {
573   guint i;
574
575   g_array_set_size (set->active_fds, 0);
576   g_array_set_size (set->active_fds_ignored, 0);
577   g_array_set_size (set->active_events, 0);
578   g_array_append_val (set->active_events, set->wakeup_event);
579
580   for (i = 0; i < set->fds->len; i++) {
581     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
582     HANDLE event = g_array_index (set->events, HANDLE, i);
583
584     if (wfd->ignored_event_mask == 0) {
585       gint ret;
586
587       g_array_append_val (set->active_fds, *wfd);
588       g_array_append_val (set->active_events, event);
589
590       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
591       if (G_UNLIKELY (ret != 0)) {
592         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
593         return FALSE;
594       }
595     } else {
596       g_array_append_val (set->active_fds_ignored, wfd);
597     }
598   }
599
600   return TRUE;
601 }
602
603 static gint
604 gst_poll_collect_winsock_events (GstPoll * set)
605 {
606   gint res, i;
607
608   /*
609    * We need to check which events are signaled, and call
610    * WSAEnumNetworkEvents for those that are, which resets
611    * the event and clears the internal network event records.
612    */
613   res = 0;
614   for (i = 0; i < set->active_fds->len; i++) {
615     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
616     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
617     DWORD wait_ret;
618
619     wait_ret = WaitForSingleObject (event, 0);
620     if (wait_ret == WAIT_OBJECT_0) {
621       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
622
623       if (G_UNLIKELY (enum_ret != 0)) {
624         res = -1;
625         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
626         break;
627       }
628
629       res++;
630     } else {
631       /* clear any previously stored result */
632       memset (&wfd->events, 0, sizeof (wfd->events));
633     }
634   }
635
636   /* If all went well we also need to reset the ignored fds. */
637   if (res >= 0) {
638     res += set->active_fds_ignored->len;
639
640     for (i = 0; i < set->active_fds_ignored->len; i++) {
641       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
642
643       wfd->ignored_event_mask = 0;
644     }
645
646     g_array_set_size (set->active_fds_ignored, 0);
647   }
648
649   return res;
650 }
651 #endif
652
653 /**
654  * gst_poll_new: (skip)
655  * @controllable: whether it should be possible to control a wait.
656  *
657  * Create a new file descriptor set. If @controllable, it
658  * is possible to restart or flush a call to gst_poll_wait() with
659  * gst_poll_restart() and gst_poll_set_flushing() respectively.
660  *
661  * Free-function: gst_poll_free
662  *
663  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
664  *     case of an error.  Free with gst_poll_free().
665  */
666 GstPoll *
667 gst_poll_new (gboolean controllable)
668 {
669   GstPoll *nset;
670
671   nset = g_slice_new0 (GstPoll);
672   GST_DEBUG ("%p: new controllable : %d", nset, controllable);
673   g_mutex_init (&nset->lock);
674 #ifndef G_OS_WIN32
675   nset->mode = GST_POLL_MODE_AUTO;
676   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
677   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
678   nset->control_read_fd.fd = -1;
679   nset->control_write_fd.fd = -1;
680   {
681     gint control_sock[2];
682
683     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
684       goto no_socket_pair;
685
686     nset->control_read_fd.fd = control_sock[0];
687     nset->control_write_fd.fd = control_sock[1];
688
689     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
690     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
691   }
692 #else
693   nset->mode = GST_POLL_MODE_WINDOWS;
694   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
695   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
696   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
697   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
698   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
699
700   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
701 #endif
702
703   /* ensure (re)build, though already sneakily set in non-windows case */
704   MARK_REBUILD (nset);
705
706   nset->controllable = controllable;
707   nset->control_pending = 0;
708
709   return nset;
710
711   /* ERRORS */
712 #ifndef G_OS_WIN32
713 no_socket_pair:
714   {
715     GST_WARNING ("%p: can't create socket pair !", nset);
716     gst_poll_free (nset);
717     return NULL;
718   }
719 #endif
720 }
721
722 /**
723  * gst_poll_new_timer: (skip)
724  *
725  * Create a new poll object that can be used for scheduling cancellable
726  * timeouts.
727  *
728  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
729  * performed from different threads. 
730  *
731  * Free-function: gst_poll_free
732  *
733  * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
734  *     case of an error.  Free with gst_poll_free().
735  */
736 GstPoll *
737 gst_poll_new_timer (void)
738 {
739   GstPoll *poll;
740
741   /* make a new controllable poll set */
742   if (!(poll = gst_poll_new (TRUE)))
743     goto done;
744
745   /* we are a timer */
746   poll->timer = TRUE;
747
748 done:
749   return poll;
750 }
751
752 /**
753  * gst_poll_free:
754  * @set: (transfer full): a file descriptor set.
755  *
756  * Free a file descriptor set.
757  */
758 void
759 gst_poll_free (GstPoll * set)
760 {
761   g_return_if_fail (set != NULL);
762
763   GST_DEBUG ("%p: freeing", set);
764
765 #ifndef G_OS_WIN32
766   if (set->control_write_fd.fd >= 0)
767     close (set->control_write_fd.fd);
768   if (set->control_read_fd.fd >= 0)
769     close (set->control_read_fd.fd);
770 #else
771   CloseHandle (set->wakeup_event);
772
773   {
774     guint i;
775
776     for (i = 0; i < set->events->len; i++)
777       gst_poll_free_winsock_event (set, i);
778   }
779
780   g_array_free (set->active_events, TRUE);
781   g_array_free (set->events, TRUE);
782   g_array_free (set->active_fds_ignored, TRUE);
783 #endif
784
785   g_array_free (set->active_fds, TRUE);
786   g_array_free (set->fds, TRUE);
787   g_mutex_clear (&set->lock);
788   g_slice_free (GstPoll, set);
789 }
790
791 /**
792  * gst_poll_get_read_gpollfd:
793  * @set: a #GstPoll
794  * @fd: a #GPollFD
795  *
796  * Get a GPollFD for the reading part of the control socket. This is useful when
797  * integrating with a GSource and GMainLoop.
798  */
799 void
800 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
801 {
802   g_return_if_fail (set != NULL);
803   g_return_if_fail (fd != NULL);
804
805 #ifndef G_OS_WIN32
806   fd->fd = set->control_read_fd.fd;
807 #else
808 #if GLIB_SIZEOF_VOID_P == 8
809   fd->fd = (gint64) set->wakeup_event;
810 #else
811   fd->fd = (gint) set->wakeup_event;
812 #endif
813 #endif
814   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
815   fd->revents = 0;
816 }
817
818 /**
819  * gst_poll_fd_init:
820  * @fd: a #GstPollFD
821  *
822  * Initializes @fd. Alternatively you can initialize it with
823  * #GST_POLL_FD_INIT.
824  */
825 void
826 gst_poll_fd_init (GstPollFD * fd)
827 {
828   g_return_if_fail (fd != NULL);
829
830   fd->fd = -1;
831   fd->idx = -1;
832 }
833
834 static gboolean
835 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
836 {
837   gint idx;
838
839   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
840
841   idx = find_index (set->fds, fd);
842   if (idx < 0) {
843 #ifndef G_OS_WIN32
844     struct pollfd nfd;
845
846     nfd.fd = fd->fd;
847     nfd.events = POLLERR | POLLNVAL | POLLHUP;
848     nfd.revents = 0;
849
850     g_array_append_val (set->fds, nfd);
851
852     fd->idx = set->fds->len - 1;
853 #else
854     WinsockFd wfd;
855     HANDLE event;
856
857     wfd.fd = fd->fd;
858     wfd.event_mask = FD_CLOSE;
859     memset (&wfd.events, 0, sizeof (wfd.events));
860     wfd.ignored_event_mask = 0;
861     event = WSACreateEvent ();
862
863     g_array_append_val (set->fds, wfd);
864     g_array_append_val (set->events, event);
865
866     fd->idx = set->fds->len - 1;
867 #endif
868     MARK_REBUILD (set);
869   } else {
870     GST_WARNING ("%p: fd already added !", set);
871   }
872
873   return TRUE;
874 }
875
876 /**
877  * gst_poll_add_fd:
878  * @set: a file descriptor set.
879  * @fd: a file descriptor.
880  *
881  * Add a file descriptor to the file descriptor set.
882  *
883  * Returns: %TRUE if the file descriptor was successfully added to the set.
884  */
885 gboolean
886 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
887 {
888   gboolean ret;
889
890   g_return_val_if_fail (set != NULL, FALSE);
891   g_return_val_if_fail (fd != NULL, FALSE);
892   g_return_val_if_fail (fd->fd >= 0, FALSE);
893
894   g_mutex_lock (&set->lock);
895
896   ret = gst_poll_add_fd_unlocked (set, fd);
897
898   g_mutex_unlock (&set->lock);
899
900   return ret;
901 }
902
903 /**
904  * gst_poll_remove_fd:
905  * @set: a file descriptor set.
906  * @fd: a file descriptor.
907  *
908  * Remove a file descriptor from the file descriptor set.
909  *
910  * Returns: %TRUE if the file descriptor was successfully removed from the set.
911  */
912 gboolean
913 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
914 {
915   gint idx;
916
917   g_return_val_if_fail (set != NULL, FALSE);
918   g_return_val_if_fail (fd != NULL, FALSE);
919   g_return_val_if_fail (fd->fd >= 0, FALSE);
920
921
922   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
923
924   g_mutex_lock (&set->lock);
925
926   /* get the index, -1 is an fd that is not added */
927   idx = find_index (set->fds, fd);
928   if (idx >= 0) {
929 #ifdef G_OS_WIN32
930     gst_poll_free_winsock_event (set, idx);
931     g_array_remove_index_fast (set->events, idx);
932 #endif
933
934     /* remove the fd at index, we use _remove_index_fast, which copies the last
935      * element of the array to the freed index */
936     g_array_remove_index_fast (set->fds, idx);
937
938     /* mark fd as removed by setting the index to -1 */
939     fd->idx = -1;
940     MARK_REBUILD (set);
941   } else {
942     GST_WARNING ("%p: couldn't find fd !", set);
943   }
944
945   g_mutex_unlock (&set->lock);
946
947   return idx >= 0;
948 }
949
950 /**
951  * gst_poll_fd_ctl_write:
952  * @set: a file descriptor set.
953  * @fd: a file descriptor.
954  * @active: a new status.
955  *
956  * Control whether the descriptor @fd in @set will be monitored for
957  * writability.
958  *
959  * Returns: %TRUE if the descriptor was successfully updated.
960  */
961 gboolean
962 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
963 {
964   gint idx;
965
966   g_return_val_if_fail (set != NULL, FALSE);
967   g_return_val_if_fail (fd != NULL, FALSE);
968   g_return_val_if_fail (fd->fd >= 0, FALSE);
969
970   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
971       fd->fd, fd->idx, active);
972
973   g_mutex_lock (&set->lock);
974
975   idx = find_index (set->fds, fd);
976   if (idx >= 0) {
977 #ifndef G_OS_WIN32
978     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
979
980     if (active)
981       pfd->events |= POLLOUT;
982     else
983       pfd->events &= ~POLLOUT;
984
985     GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
986 #else
987     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
988         active);
989 #endif
990     MARK_REBUILD (set);
991   } else {
992     GST_WARNING ("%p: couldn't find fd !", set);
993   }
994
995   g_mutex_unlock (&set->lock);
996
997   return idx >= 0;
998 }
999
1000 static gboolean
1001 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
1002 {
1003   gint idx;
1004
1005   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1006       fd->fd, fd->idx, active);
1007
1008   idx = find_index (set->fds, fd);
1009
1010   if (idx >= 0) {
1011 #ifndef G_OS_WIN32
1012     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1013
1014     if (active)
1015       pfd->events |= (POLLIN | POLLPRI);
1016     else
1017       pfd->events &= ~(POLLIN | POLLPRI);
1018 #else
1019     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
1020 #endif
1021     MARK_REBUILD (set);
1022   } else {
1023     GST_WARNING ("%p: couldn't find fd !", set);
1024   }
1025
1026   return idx >= 0;
1027 }
1028
1029 /**
1030  * gst_poll_fd_ctl_read:
1031  * @set: a file descriptor set.
1032  * @fd: a file descriptor.
1033  * @active: a new status.
1034  *
1035  * Control whether the descriptor @fd in @set will be monitored for
1036  * readability.
1037  *
1038  * Returns: %TRUE if the descriptor was successfully updated.
1039  */
1040 gboolean
1041 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
1042 {
1043   gboolean ret;
1044
1045   g_return_val_if_fail (set != NULL, FALSE);
1046   g_return_val_if_fail (fd != NULL, FALSE);
1047   g_return_val_if_fail (fd->fd >= 0, FALSE);
1048
1049   g_mutex_lock (&set->lock);
1050
1051   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
1052
1053   g_mutex_unlock (&set->lock);
1054
1055   return ret;
1056 }
1057
1058 /**
1059  * gst_poll_fd_ignored:
1060  * @set: a file descriptor set.
1061  * @fd: a file descriptor.
1062  *
1063  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
1064  * the same result for @fd as last time. This function must be called if no
1065  * operation (read/write/recv/send/etc.) will be performed on @fd before
1066  * the next call to gst_poll_wait().
1067  *
1068  * The reason why this is needed is because the underlying implementation
1069  * might not allow querying the fd more than once between calls to one of
1070  * the re-enabling operations.
1071  */
1072 void
1073 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
1074 {
1075 #ifdef G_OS_WIN32
1076   gint idx;
1077
1078   g_return_if_fail (set != NULL);
1079   g_return_if_fail (fd != NULL);
1080   g_return_if_fail (fd->fd >= 0);
1081
1082   g_mutex_lock (&set->lock);
1083
1084   idx = find_index (set->fds, fd);
1085   if (idx >= 0) {
1086     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
1087
1088     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
1089     MARK_REBUILD (set);
1090   }
1091
1092   g_mutex_unlock (&set->lock);
1093 #endif
1094 }
1095
1096 /**
1097  * gst_poll_fd_has_closed:
1098  * @set: a file descriptor set.
1099  * @fd: a file descriptor.
1100  *
1101  * Check if @fd in @set has closed the connection.
1102  *
1103  * Returns: %TRUE if the connection was closed.
1104  */
1105 gboolean
1106 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1107 {
1108   gboolean res = FALSE;
1109   gint idx;
1110
1111   g_return_val_if_fail (set != NULL, FALSE);
1112   g_return_val_if_fail (fd != NULL, FALSE);
1113   g_return_val_if_fail (fd->fd >= 0, FALSE);
1114
1115   g_mutex_lock (&((GstPoll *) set)->lock);
1116
1117   idx = find_index (set->active_fds, fd);
1118   if (idx >= 0) {
1119 #ifndef G_OS_WIN32
1120     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1121
1122     res = (pfd->revents & POLLHUP) != 0;
1123 #else
1124     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1125
1126     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1127 #endif
1128   } else {
1129     GST_WARNING ("%p: couldn't find fd !", set);
1130   }
1131   g_mutex_unlock (&((GstPoll *) set)->lock);
1132
1133   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1134
1135   return res;
1136 }
1137
1138 /**
1139  * gst_poll_fd_has_error:
1140  * @set: a file descriptor set.
1141  * @fd: a file descriptor.
1142  *
1143  * Check if @fd in @set has an error.
1144  *
1145  * Returns: %TRUE if the descriptor has an error.
1146  */
1147 gboolean
1148 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1149 {
1150   gboolean res = FALSE;
1151   gint idx;
1152
1153   g_return_val_if_fail (set != NULL, FALSE);
1154   g_return_val_if_fail (fd != NULL, FALSE);
1155   g_return_val_if_fail (fd->fd >= 0, FALSE);
1156
1157   g_mutex_lock (&((GstPoll *) set)->lock);
1158
1159   idx = find_index (set->active_fds, fd);
1160   if (idx >= 0) {
1161 #ifndef G_OS_WIN32
1162     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1163
1164     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1165 #else
1166     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1167
1168     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1169         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1170         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1171         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1172         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1173 #endif
1174   } else {
1175     GST_WARNING ("%p: couldn't find fd !", set);
1176   }
1177   g_mutex_unlock (&((GstPoll *) set)->lock);
1178
1179   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1180
1181   return res;
1182 }
1183
1184 static gboolean
1185 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1186 {
1187   gboolean res = FALSE;
1188   gint idx;
1189
1190   idx = find_index (set->active_fds, fd);
1191   if (idx >= 0) {
1192 #ifndef G_OS_WIN32
1193     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1194
1195     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1196 #else
1197     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1198
1199     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1200 #endif
1201   } else {
1202     GST_WARNING ("%p: couldn't find fd !", set);
1203   }
1204   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1205
1206   return res;
1207 }
1208
1209 /**
1210  * gst_poll_fd_can_read:
1211  * @set: a file descriptor set.
1212  * @fd: a file descriptor.
1213  *
1214  * Check if @fd in @set has data to be read.
1215  *
1216  * Returns: %TRUE if the descriptor has data to be read.
1217  */
1218 gboolean
1219 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1220 {
1221   gboolean res = FALSE;
1222
1223   g_return_val_if_fail (set != NULL, FALSE);
1224   g_return_val_if_fail (fd != NULL, FALSE);
1225   g_return_val_if_fail (fd->fd >= 0, FALSE);
1226
1227   g_mutex_lock (&((GstPoll *) set)->lock);
1228
1229   res = gst_poll_fd_can_read_unlocked (set, fd);
1230
1231   g_mutex_unlock (&((GstPoll *) set)->lock);
1232
1233   return res;
1234 }
1235
1236 /**
1237  * gst_poll_fd_can_write:
1238  * @set: a file descriptor set.
1239  * @fd: a file descriptor.
1240  *
1241  * Check if @fd in @set can be used for writing.
1242  *
1243  * Returns: %TRUE if the descriptor can be used for writing.
1244  */
1245 gboolean
1246 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1247 {
1248   gboolean res = FALSE;
1249   gint idx;
1250
1251   g_return_val_if_fail (set != NULL, FALSE);
1252   g_return_val_if_fail (fd != NULL, FALSE);
1253   g_return_val_if_fail (fd->fd >= 0, FALSE);
1254
1255   g_mutex_lock (&((GstPoll *) set)->lock);
1256
1257   idx = find_index (set->active_fds, fd);
1258   if (idx >= 0) {
1259 #ifndef G_OS_WIN32
1260     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1261
1262     res = (pfd->revents & POLLOUT) != 0;
1263 #else
1264     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1265
1266     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1267 #endif
1268   } else {
1269     GST_WARNING ("%p: couldn't find fd !", set);
1270   }
1271   g_mutex_unlock (&((GstPoll *) set)->lock);
1272
1273   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1274
1275   return res;
1276 }
1277
1278 /**
1279  * gst_poll_wait:
1280  * @set: a #GstPoll.
1281  * @timeout: a timeout in nanoseconds.
1282  *
1283  * Wait for activity on the file descriptors in @set. This function waits up to
1284  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1285  *
1286  * For #GstPoll objects created with gst_poll_new(), this function can only be
1287  * called from a single thread at a time.  If called from multiple threads,
1288  * -1 will be returned with errno set to EPERM.
1289  *
1290  * This is not true for timer #GstPoll objects created with
1291  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1292  * simultaneously.
1293  *
1294  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1295  * activity was detected after @timeout. If an error occurs, -1 is returned
1296  * and errno is set.
1297  */
1298 gint
1299 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1300 {
1301   gboolean restarting;
1302   gboolean is_timer;
1303   int res;
1304   gint old_waiting;
1305
1306   g_return_val_if_fail (set != NULL, -1);
1307
1308   GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1309
1310   is_timer = set->timer;
1311
1312   /* add one more waiter */
1313   old_waiting = INC_WAITING (set);
1314
1315   /* we cannot wait from multiple threads unless we are a timer */
1316   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1317     goto already_waiting;
1318
1319   /* flushing, exit immediately */
1320   if (G_UNLIKELY (IS_FLUSHING (set)))
1321     goto flushing;
1322
1323   do {
1324     GstPollMode mode;
1325
1326     res = -1;
1327     restarting = FALSE;
1328
1329     mode = choose_mode (set, timeout);
1330
1331     if (TEST_REBUILD (set)) {
1332       g_mutex_lock (&set->lock);
1333 #ifndef G_OS_WIN32
1334       g_array_set_size (set->active_fds, set->fds->len);
1335       memcpy (set->active_fds->data, set->fds->data,
1336           set->fds->len * sizeof (struct pollfd));
1337 #else
1338       if (!gst_poll_prepare_winsock_active_sets (set))
1339         goto winsock_error;
1340 #endif
1341       g_mutex_unlock (&set->lock);
1342     }
1343
1344     switch (mode) {
1345       case GST_POLL_MODE_AUTO:
1346         g_assert_not_reached ();
1347         break;
1348       case GST_POLL_MODE_PPOLL:
1349       {
1350 #ifdef HAVE_PPOLL
1351         struct timespec ts;
1352         struct timespec *tsptr;
1353
1354         if (timeout != GST_CLOCK_TIME_NONE) {
1355           GST_TIME_TO_TIMESPEC (timeout, ts);
1356           tsptr = &ts;
1357         } else {
1358           tsptr = NULL;
1359         }
1360
1361         res =
1362             ppoll ((struct pollfd *) set->active_fds->data,
1363             set->active_fds->len, tsptr, NULL);
1364 #else
1365         g_assert_not_reached ();
1366         errno = ENOSYS;
1367 #endif
1368         break;
1369       }
1370       case GST_POLL_MODE_POLL:
1371       {
1372 #ifdef HAVE_POLL
1373         gint t;
1374
1375         if (timeout != GST_CLOCK_TIME_NONE) {
1376           t = GST_TIME_AS_MSECONDS (timeout);
1377         } else {
1378           t = -1;
1379         }
1380
1381         res =
1382             poll ((struct pollfd *) set->active_fds->data,
1383             set->active_fds->len, t);
1384 #else
1385         g_assert_not_reached ();
1386         errno = ENOSYS;
1387 #endif
1388         break;
1389       }
1390       case GST_POLL_MODE_PSELECT:
1391 #ifndef HAVE_PSELECT
1392       {
1393         g_assert_not_reached ();
1394         errno = ENOSYS;
1395         break;
1396       }
1397 #endif
1398       case GST_POLL_MODE_SELECT:
1399       {
1400 #ifndef G_OS_WIN32
1401         fd_set readfds;
1402         fd_set writefds;
1403         fd_set errorfds;
1404         gint max_fd;
1405
1406         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1407
1408         if (mode == GST_POLL_MODE_SELECT) {
1409           struct timeval tv;
1410           struct timeval *tvptr;
1411
1412           if (timeout != GST_CLOCK_TIME_NONE) {
1413             GST_TIME_TO_TIMEVAL (timeout, tv);
1414             tvptr = &tv;
1415           } else {
1416             tvptr = NULL;
1417           }
1418
1419           GST_DEBUG ("%p: Calling select", set);
1420           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1421           GST_DEBUG ("%p: After select, res:%d", set, res);
1422         } else {
1423 #ifdef HAVE_PSELECT
1424           struct timespec ts;
1425           struct timespec *tsptr;
1426
1427           if (timeout != GST_CLOCK_TIME_NONE) {
1428             GST_TIME_TO_TIMESPEC (timeout, ts);
1429             tsptr = &ts;
1430           } else {
1431             tsptr = NULL;
1432           }
1433
1434           GST_DEBUG ("%p: Calling pselect", set);
1435           res =
1436               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1437           GST_DEBUG ("%p: After pselect, res:%d", set, res);
1438 #endif
1439         }
1440
1441         if (res >= 0) {
1442           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1443         }
1444 #else /* G_OS_WIN32 */
1445         g_assert_not_reached ();
1446         errno = ENOSYS;
1447 #endif
1448         break;
1449       }
1450       case GST_POLL_MODE_WINDOWS:
1451       {
1452 #ifdef G_OS_WIN32
1453         gint ignore_count = set->active_fds_ignored->len;
1454         DWORD t, wait_ret;
1455
1456         if (G_LIKELY (ignore_count == 0)) {
1457           if (timeout != GST_CLOCK_TIME_NONE)
1458             t = GST_TIME_AS_MSECONDS (timeout);
1459           else
1460             t = INFINITE;
1461         } else {
1462           /* already one or more ignored fds, so we quickly sweep the others */
1463           t = 0;
1464         }
1465
1466         if (set->active_events->len != 0) {
1467           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1468               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1469         } else {
1470           wait_ret = WSA_WAIT_FAILED;
1471           WSASetLastError (WSA_INVALID_PARAMETER);
1472         }
1473
1474         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1475           res = 0;
1476         } else if (wait_ret == WSA_WAIT_FAILED) {
1477           res = -1;
1478           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1479         } else {
1480           /* the first entry is the wakeup event */
1481           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1482             res = gst_poll_collect_winsock_events (set);
1483           } else {
1484             res = 1;            /* wakeup event */
1485           }
1486         }
1487 #else
1488         g_assert_not_reached ();
1489         errno = ENOSYS;
1490 #endif
1491         break;
1492       }
1493     }
1494
1495     if (!is_timer) {
1496       /* Applications needs to clear the control socket themselves for timer
1497        * polls.
1498        * For other polls, we need to clear the control socket. If there was only
1499        * one socket with activity and it was the control socket, we need to
1500        * restart */
1501       if (release_all_wakeup (set) > 0 && res == 1)
1502         restarting = TRUE;
1503     }
1504
1505     /* we got woken up and we are flushing, we need to stop */
1506     if (G_UNLIKELY (IS_FLUSHING (set)))
1507       goto flushing;
1508
1509   } while (G_UNLIKELY (restarting));
1510
1511   DEC_WAITING (set);
1512
1513   return res;
1514
1515   /* ERRORS */
1516 already_waiting:
1517   {
1518     GST_LOG ("%p: we are already waiting", set);
1519     DEC_WAITING (set);
1520     errno = EPERM;
1521     return -1;
1522   }
1523 flushing:
1524   {
1525     GST_LOG ("%p: we are flushing", set);
1526     DEC_WAITING (set);
1527     errno = EBUSY;
1528     return -1;
1529   }
1530 #ifdef G_OS_WIN32
1531 winsock_error:
1532   {
1533     GST_LOG ("%p: winsock error", set);
1534     g_mutex_unlock (&set->lock);
1535     DEC_WAITING (set);
1536     return -1;
1537   }
1538 #endif
1539 }
1540
1541 /**
1542  * gst_poll_set_controllable:
1543  * @set: a #GstPoll.
1544  * @controllable: new controllable state.
1545  *
1546  * When @controllable is %TRUE, this function ensures that future calls to
1547  * gst_poll_wait() will be affected by gst_poll_restart() and
1548  * gst_poll_set_flushing().
1549  *
1550  * Returns: %TRUE if the controllability of @set could be updated.
1551  */
1552 gboolean
1553 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1554 {
1555   g_return_val_if_fail (set != NULL, FALSE);
1556
1557   GST_LOG ("%p: controllable : %d", set, controllable);
1558
1559   set->controllable = controllable;
1560
1561   return TRUE;
1562 }
1563
1564 /**
1565  * gst_poll_restart:
1566  * @set: a #GstPoll.
1567  *
1568  * Restart any gst_poll_wait() that is in progress. This function is typically
1569  * used after adding or removing descriptors to @set.
1570  *
1571  * If @set is not controllable, then this call will have no effect.
1572  */
1573 void
1574 gst_poll_restart (GstPoll * set)
1575 {
1576   g_return_if_fail (set != NULL);
1577
1578   if (set->controllable && GET_WAITING (set) > 0) {
1579     /* we are controllable and waiting, wake up the waiter. The socket will be
1580      * cleared by the _wait() thread and the poll will be restarted */
1581     raise_wakeup (set);
1582   }
1583 }
1584
1585 /**
1586  * gst_poll_set_flushing:
1587  * @set: a #GstPoll.
1588  * @flushing: new flushing state.
1589  *
1590  * When @flushing is %TRUE, this function ensures that current and future calls
1591  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1592  *
1593  * Unsetting the flushing state will restore normal operation of @set.
1594  */
1595 void
1596 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1597 {
1598   g_return_if_fail (set != NULL);
1599
1600   GST_LOG ("%p: flushing: %d", set, flushing);
1601
1602   /* update the new state first */
1603   SET_FLUSHING (set, flushing);
1604
1605   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1606     /* we are flushing, controllable and waiting, wake up the waiter. When we
1607      * stop the flushing operation we don't clear the wakeup fd here, this will
1608      * happen in the _wait() thread. */
1609     raise_wakeup (set);
1610   }
1611 }
1612
1613 /**
1614  * gst_poll_write_control:
1615  * @set: a #GstPoll.
1616  *
1617  * Write a byte to the control socket of the controllable @set.
1618  * This function is mostly useful for timer #GstPoll objects created with
1619  * gst_poll_new_timer(). 
1620  *
1621  * It will make any current and future gst_poll_wait() function return with
1622  * 1, meaning the control socket is set. After an equal amount of calls to
1623  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1624  * block again until their timeout expired.
1625  *
1626  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1627  * byte could not be written.
1628  */
1629 gboolean
1630 gst_poll_write_control (GstPoll * set)
1631 {
1632   gboolean res;
1633
1634   g_return_val_if_fail (set != NULL, FALSE);
1635   g_return_val_if_fail (set->timer, FALSE);
1636
1637   res = raise_wakeup (set);
1638
1639   return res;
1640 }
1641
1642 /**
1643  * gst_poll_read_control:
1644  * @set: a #GstPoll.
1645  *
1646  * Read a byte from the control socket of the controllable @set.
1647  * This function is mostly useful for timer #GstPoll objects created with
1648  * gst_poll_new_timer(). 
1649  *
1650  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1651  * was no byte to read.
1652  */
1653 gboolean
1654 gst_poll_read_control (GstPoll * set)
1655 {
1656   gboolean res;
1657
1658   g_return_val_if_fail (set != NULL, FALSE);
1659   g_return_val_if_fail (set->timer, FALSE);
1660
1661   res = release_wakeup (set);
1662
1663   return res;
1664 }