gst: Fix compiler warnings on mingw-w64
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 /* OS/X needs this because of bad headers */
88 #include <string.h>
89
90 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
91  * so we prefer our own poll emulation.
92  */
93 #if defined(BROKEN_POLL)
94 #undef HAVE_POLL
95 #endif
96
97 #include "gstpoll.h"
98
99 #define GST_CAT_DEFAULT GST_CAT_POLL
100
101 #ifdef G_OS_WIN32
102 typedef struct _WinsockFd WinsockFd;
103
104 struct _WinsockFd
105 {
106   gint fd;
107   glong event_mask;
108   WSANETWORKEVENTS events;
109   glong ignored_event_mask;
110 };
111 #endif
112
113 typedef enum
114 {
115   GST_POLL_MODE_AUTO,
116   GST_POLL_MODE_SELECT,
117   GST_POLL_MODE_PSELECT,
118   GST_POLL_MODE_POLL,
119   GST_POLL_MODE_PPOLL,
120   GST_POLL_MODE_WINDOWS
121 } GstPollMode;
122
123 struct _GstPoll
124 {
125   GstPollMode mode;
126
127   GMutex lock;
128   /* array of fds, always written to and read from with lock */
129   GArray *fds;
130   /* array of active fds, only written to from the waiting thread with the
131    * lock and read from with the lock or without the lock from the waiting
132    * thread */
133   GArray *active_fds;
134
135 #ifndef G_OS_WIN32
136   gchar buf[1];
137   GstPollFD control_read_fd;
138   GstPollFD control_write_fd;
139 #else
140   GArray *active_fds_ignored;
141   GArray *events;
142   GArray *active_events;
143
144   HANDLE wakeup_event;
145 #endif
146
147   gboolean controllable;
148   volatile gint waiting;
149   volatile gint control_pending;
150   volatile gint flushing;
151   gboolean timer;
152   volatile gint rebuild;
153 };
154
155 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
156     gboolean active);
157 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
158
159 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
160 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
161
162 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
163 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
164 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
165
166 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
167 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
168
169 #ifndef G_OS_WIN32
170 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
171 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
172 #else
173 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
174 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
175 #endif
176
177 /* the poll/select call is also performed on a control socket, that way
178  * we can send special commands to control it */
179 static inline gboolean
180 raise_wakeup (GstPoll * set)
181 {
182   gboolean result = TRUE;
183
184   if (g_atomic_int_add (&set->control_pending, 1) == 0) {
185     /* raise when nothing pending */
186     GST_LOG ("%p: raise", set);
187     result = WAKE_EVENT (set);
188   }
189   return result;
190 }
191
192 /* note how bad things can happen when the 2 threads both raise and release the
193  * wakeup. This is however not a problem because you must always pair a raise
194  * with a release */
195 static inline gboolean
196 release_wakeup (GstPoll * set)
197 {
198   gboolean result = TRUE;
199
200   if (g_atomic_int_dec_and_test (&set->control_pending)) {
201     GST_LOG ("%p: release", set);
202     result = RELEASE_EVENT (set);
203   }
204   return result;
205 }
206
207 static inline gint
208 release_all_wakeup (GstPoll * set)
209 {
210   gint old;
211
212   while (TRUE) {
213     if (!(old = g_atomic_int_get (&set->control_pending)))
214       /* nothing pending, just exit */
215       break;
216
217     /* try to remove all pending control messages */
218     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
219       /* we managed to remove all messages, read the control socket */
220       if (RELEASE_EVENT (set))
221         break;
222       else
223         /* retry again until we read it successfully */
224         g_atomic_int_add (&set->control_pending, 1);
225     }
226   }
227   return old;
228 }
229
230 static gint
231 find_index (GArray * array, GstPollFD * fd)
232 {
233 #ifndef G_OS_WIN32
234   struct pollfd *ifd;
235 #else
236   WinsockFd *ifd;
237 #endif
238   guint i;
239
240   /* start by assuming the index found in the fd is still valid */
241   if (fd->idx >= 0 && fd->idx < array->len) {
242 #ifndef G_OS_WIN32
243     ifd = &g_array_index (array, struct pollfd, fd->idx);
244 #else
245     ifd = &g_array_index (array, WinsockFd, fd->idx);
246 #endif
247
248     if (ifd->fd == fd->fd) {
249       return fd->idx;
250     }
251   }
252
253   /* the pollfd array has changed and we need to lookup the fd again */
254   for (i = 0; i < array->len; i++) {
255 #ifndef G_OS_WIN32
256     ifd = &g_array_index (array, struct pollfd, i);
257 #else
258     ifd = &g_array_index (array, WinsockFd, i);
259 #endif
260
261     if (ifd->fd == fd->fd) {
262       fd->idx = (gint) i;
263       return fd->idx;
264     }
265   }
266
267   fd->idx = -1;
268   return fd->idx;
269 }
270
271 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
272 /* check if all file descriptors will fit in an fd_set */
273 static gboolean
274 selectable_fds (const GstPoll * set)
275 {
276   guint i;
277
278   g_mutex_lock (&set->lock);
279   for (i = 0; i < set->fds->len; i++) {
280     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
281
282     if (pfd->fd >= FD_SETSIZE)
283       goto too_many;
284   }
285   g_mutex_unlock (&set->lock);
286
287   return TRUE;
288
289 too_many:
290   {
291     g_mutex_unlock (&set->lock);
292     return FALSE;
293   }
294 }
295
296 /* check if the timeout will convert to a timeout value used for poll()
297  * without a loss of precision
298  */
299 static gboolean
300 pollable_timeout (GstClockTime timeout)
301 {
302   if (timeout == GST_CLOCK_TIME_NONE)
303     return TRUE;
304
305   /* not a nice multiple of milliseconds */
306   if (timeout % 1000000)
307     return FALSE;
308
309   return TRUE;
310 }
311 #endif
312
313 static GstPollMode
314 choose_mode (const GstPoll * set, GstClockTime timeout)
315 {
316   GstPollMode mode;
317
318   if (set->mode == GST_POLL_MODE_AUTO) {
319 #ifdef HAVE_PPOLL
320     mode = GST_POLL_MODE_PPOLL;
321 #elif defined(HAVE_POLL)
322     if (!selectable_fds (set) || pollable_timeout (timeout)) {
323       mode = GST_POLL_MODE_POLL;
324     } else {
325 #ifdef HAVE_PSELECT
326       mode = GST_POLL_MODE_PSELECT;
327 #else
328       mode = GST_POLL_MODE_SELECT;
329 #endif
330     }
331 #elif defined(HAVE_PSELECT)
332     mode = GST_POLL_MODE_PSELECT;
333 #else
334     mode = GST_POLL_MODE_SELECT;
335 #endif
336   } else {
337     mode = set->mode;
338   }
339   return mode;
340 }
341
342 #ifndef G_OS_WIN32
343 static gint
344 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
345     fd_set * errorfds)
346 {
347   gint max_fd = -1;
348   guint i;
349
350   FD_ZERO (readfds);
351   FD_ZERO (writefds);
352   FD_ZERO (errorfds);
353
354   g_mutex_lock (&set->lock);
355
356   for (i = 0; i < set->active_fds->len; i++) {
357     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
358
359     if (pfd->fd < FD_SETSIZE) {
360       if (pfd->events & POLLIN)
361         FD_SET (pfd->fd, readfds);
362       if (pfd->events & POLLOUT)
363         FD_SET (pfd->fd, writefds);
364       if (pfd->events)
365         FD_SET (pfd->fd, errorfds);
366       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
367         max_fd = pfd->fd;
368     }
369   }
370
371   g_mutex_unlock (&set->lock);
372
373   return max_fd;
374 }
375
376 static void
377 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
378     fd_set * errorfds)
379 {
380   guint i;
381
382   g_mutex_lock (&set->lock);
383
384   for (i = 0; i < set->active_fds->len; i++) {
385     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
386
387     if (pfd->fd < FD_SETSIZE) {
388       pfd->revents = 0;
389       if (FD_ISSET (pfd->fd, readfds))
390         pfd->revents |= POLLIN;
391       if (FD_ISSET (pfd->fd, writefds))
392         pfd->revents |= POLLOUT;
393       if (FD_ISSET (pfd->fd, errorfds))
394         pfd->revents |= POLLERR;
395     }
396   }
397
398   g_mutex_unlock (&set->lock);
399 }
400 #else /* G_OS_WIN32 */
401 /*
402  * Translate errors thrown by the Winsock API used by GstPoll:
403  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
404  */
405 static gint
406 gst_poll_winsock_error_to_errno (DWORD last_error)
407 {
408   switch (last_error) {
409     case WSA_INVALID_HANDLE:
410     case WSAEINVAL:
411     case WSAENOTSOCK:
412       return EBADF;
413
414     case WSA_NOT_ENOUGH_MEMORY:
415       return ENOMEM;
416
417       /*
418        * Anything else, including:
419        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
420        *   WSANOTINITIALISED
421        */
422     default:
423       return EINVAL;
424   }
425 }
426
427 static void
428 gst_poll_free_winsock_event (GstPoll * set, gint idx)
429 {
430   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
431   HANDLE event = g_array_index (set->events, HANDLE, idx);
432
433   WSAEventSelect (wfd->fd, event, 0);
434   CloseHandle (event);
435 }
436
437 static void
438 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
439     gboolean active)
440 {
441   WinsockFd *wfd;
442
443   wfd = &g_array_index (set->fds, WinsockFd, idx);
444
445   if (active)
446     wfd->event_mask |= flags;
447   else
448     wfd->event_mask &= ~flags;
449
450   /* reset ignored state if the new mask doesn't overlap at all */
451   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
452     wfd->ignored_event_mask = 0;
453 }
454
455 static gboolean
456 gst_poll_prepare_winsock_active_sets (GstPoll * set)
457 {
458   guint i;
459
460   g_array_set_size (set->active_fds, 0);
461   g_array_set_size (set->active_fds_ignored, 0);
462   g_array_set_size (set->active_events, 0);
463   g_array_append_val (set->active_events, set->wakeup_event);
464
465   for (i = 0; i < set->fds->len; i++) {
466     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
467     HANDLE event = g_array_index (set->events, HANDLE, i);
468
469     if (wfd->ignored_event_mask == 0) {
470       gint ret;
471
472       g_array_append_val (set->active_fds, *wfd);
473       g_array_append_val (set->active_events, event);
474
475       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
476       if (G_UNLIKELY (ret != 0)) {
477         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
478         return FALSE;
479       }
480     } else {
481       g_array_append_val (set->active_fds_ignored, wfd);
482     }
483   }
484
485   return TRUE;
486 }
487
488 static gint
489 gst_poll_collect_winsock_events (GstPoll * set)
490 {
491   gint res, i;
492
493   /*
494    * We need to check which events are signaled, and call
495    * WSAEnumNetworkEvents for those that are, which resets
496    * the event and clears the internal network event records.
497    */
498   res = 0;
499   for (i = 0; i < set->active_fds->len; i++) {
500     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
501     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
502     DWORD wait_ret;
503
504     wait_ret = WaitForSingleObject (event, 0);
505     if (wait_ret == WAIT_OBJECT_0) {
506       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
507
508       if (G_UNLIKELY (enum_ret != 0)) {
509         res = -1;
510         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
511         break;
512       }
513
514       res++;
515     } else {
516       /* clear any previously stored result */
517       memset (&wfd->events, 0, sizeof (wfd->events));
518     }
519   }
520
521   /* If all went well we also need to reset the ignored fds. */
522   if (res >= 0) {
523     res += set->active_fds_ignored->len;
524
525     for (i = 0; i < set->active_fds_ignored->len; i++) {
526       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
527
528       wfd->ignored_event_mask = 0;
529     }
530
531     g_array_set_size (set->active_fds_ignored, 0);
532   }
533
534   return res;
535 }
536 #endif
537
538 /**
539  * gst_poll_new: (skip)
540  * @controllable: whether it should be possible to control a wait.
541  *
542  * Create a new file descriptor set. If @controllable, it
543  * is possible to restart or flush a call to gst_poll_wait() with
544  * gst_poll_restart() and gst_poll_set_flushing() respectively.
545  *
546  * Free-function: gst_poll_free
547  *
548  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
549  *     Free with gst_poll_free().
550  *
551  * Since: 0.10.18
552  */
553 GstPoll *
554 gst_poll_new (gboolean controllable)
555 {
556   GstPoll *nset;
557
558   GST_DEBUG ("controllable : %d", controllable);
559
560   nset = g_slice_new0 (GstPoll);
561   g_mutex_init (&nset->lock);
562 #ifndef G_OS_WIN32
563   nset->mode = GST_POLL_MODE_AUTO;
564   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
565   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
566   nset->control_read_fd.fd = -1;
567   nset->control_write_fd.fd = -1;
568   {
569     gint control_sock[2];
570
571     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
572       goto no_socket_pair;
573
574     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
575     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
576
577     nset->control_read_fd.fd = control_sock[0];
578     nset->control_write_fd.fd = control_sock[1];
579
580     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
581     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
582   }
583 #else
584   nset->mode = GST_POLL_MODE_WINDOWS;
585   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
586   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
587   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
588   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
589   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
590
591   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
592 #endif
593
594   /* ensure (re)build, though already sneakily set in non-windows case */
595   MARK_REBUILD (nset);
596
597   nset->controllable = controllable;
598
599   return nset;
600
601   /* ERRORS */
602 #ifndef G_OS_WIN32
603 no_socket_pair:
604   {
605     GST_WARNING ("%p: can't create socket pair !", nset);
606     gst_poll_free (nset);
607     return NULL;
608   }
609 #endif
610 }
611
612 /**
613  * gst_poll_new_timer: (skip)
614  *
615  * Create a new poll object that can be used for scheduling cancellable
616  * timeouts.
617  *
618  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
619  * performed from different threads. 
620  *
621  * Free-function: gst_poll_free
622  *
623  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
624  *     Free with gst_poll_free().
625  *
626  * Since: 0.10.23
627  */
628 GstPoll *
629 gst_poll_new_timer (void)
630 {
631   GstPoll *poll;
632
633   /* make a new controllable poll set */
634   if (!(poll = gst_poll_new (TRUE)))
635     goto done;
636
637   /* we are a timer */
638   poll->timer = TRUE;
639
640 done:
641   return poll;
642 }
643
644 /**
645  * gst_poll_free:
646  * @set: (transfer full): a file descriptor set.
647  *
648  * Free a file descriptor set.
649  *
650  * Since: 0.10.18
651  */
652 void
653 gst_poll_free (GstPoll * set)
654 {
655   g_return_if_fail (set != NULL);
656
657   GST_DEBUG ("%p: freeing", set);
658
659 #ifndef G_OS_WIN32
660   if (set->control_write_fd.fd >= 0)
661     close (set->control_write_fd.fd);
662   if (set->control_read_fd.fd >= 0)
663     close (set->control_read_fd.fd);
664 #else
665   CloseHandle (set->wakeup_event);
666
667   {
668     guint i;
669
670     for (i = 0; i < set->events->len; i++)
671       gst_poll_free_winsock_event (set, i);
672   }
673
674   g_array_free (set->active_events, TRUE);
675   g_array_free (set->events, TRUE);
676   g_array_free (set->active_fds_ignored, TRUE);
677 #endif
678
679   g_array_free (set->active_fds, TRUE);
680   g_array_free (set->fds, TRUE);
681   g_mutex_clear (&set->lock);
682   g_slice_free (GstPoll, set);
683 }
684
685 /**
686  * gst_poll_get_read_gpollfd:
687  * @set: a #GstPoll
688  * @fd: a #GPollFD
689  *
690  * Get a GPollFD for the reading part of the control socket. This is useful when
691  * integrating with a GSource and GMainLoop.
692  *
693  * Since: 0.10.32
694  */
695 void
696 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
697 {
698   g_return_if_fail (set != NULL);
699   g_return_if_fail (fd != NULL);
700
701 #ifndef G_OS_WIN32
702   fd->fd = set->control_read_fd.fd;
703 #else
704 #if GLIB_SIZEOF_VOID_P == 8
705   fd->fd = (gint64) set->wakeup_event;
706 #else
707   fd->fd = (gint) set->wakeup_event;
708 #endif
709 #endif
710   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
711   fd->revents = 0;
712 }
713
714 /**
715  * gst_poll_fd_init:
716  * @fd: a #GstPollFD
717  *
718  * Initializes @fd. Alternatively you can initialize it with
719  * #GST_POLL_FD_INIT.
720  *
721  * Since: 0.10.18
722  */
723 void
724 gst_poll_fd_init (GstPollFD * fd)
725 {
726   g_return_if_fail (fd != NULL);
727
728   fd->fd = -1;
729   fd->idx = -1;
730 }
731
732 static gboolean
733 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
734 {
735   gint idx;
736
737   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
738
739   idx = find_index (set->fds, fd);
740   if (idx < 0) {
741 #ifndef G_OS_WIN32
742     struct pollfd nfd;
743
744     nfd.fd = fd->fd;
745     nfd.events = POLLERR | POLLNVAL | POLLHUP;
746     nfd.revents = 0;
747
748     g_array_append_val (set->fds, nfd);
749
750     fd->idx = set->fds->len - 1;
751 #else
752     WinsockFd wfd;
753     HANDLE event;
754
755     wfd.fd = fd->fd;
756     wfd.event_mask = FD_CLOSE;
757     memset (&wfd.events, 0, sizeof (wfd.events));
758     wfd.ignored_event_mask = 0;
759     event = WSACreateEvent ();
760
761     g_array_append_val (set->fds, wfd);
762     g_array_append_val (set->events, event);
763
764     fd->idx = set->fds->len - 1;
765 #endif
766     MARK_REBUILD (set);
767   } else {
768     GST_WARNING ("%p: couldn't find fd !", set);
769   }
770
771   return TRUE;
772 }
773
774 /**
775  * gst_poll_add_fd:
776  * @set: a file descriptor set.
777  * @fd: a file descriptor.
778  *
779  * Add a file descriptor to the file descriptor set.
780  *
781  * Returns: %TRUE if the file descriptor was successfully added to the set.
782  *
783  * Since: 0.10.18
784  */
785 gboolean
786 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
787 {
788   gboolean ret;
789
790   g_return_val_if_fail (set != NULL, FALSE);
791   g_return_val_if_fail (fd != NULL, FALSE);
792   g_return_val_if_fail (fd->fd >= 0, FALSE);
793
794   g_mutex_lock (&set->lock);
795
796   ret = gst_poll_add_fd_unlocked (set, fd);
797
798   g_mutex_unlock (&set->lock);
799
800   return ret;
801 }
802
803 /**
804  * gst_poll_remove_fd:
805  * @set: a file descriptor set.
806  * @fd: a file descriptor.
807  *
808  * Remove a file descriptor from the file descriptor set.
809  *
810  * Returns: %TRUE if the file descriptor was successfully removed from the set.
811  *
812  * Since: 0.10.18
813  */
814 gboolean
815 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
816 {
817   gint idx;
818
819   g_return_val_if_fail (set != NULL, FALSE);
820   g_return_val_if_fail (fd != NULL, FALSE);
821   g_return_val_if_fail (fd->fd >= 0, FALSE);
822
823
824   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
825
826   g_mutex_lock (&set->lock);
827
828   /* get the index, -1 is an fd that is not added */
829   idx = find_index (set->fds, fd);
830   if (idx >= 0) {
831 #ifdef G_OS_WIN32
832     gst_poll_free_winsock_event (set, idx);
833     g_array_remove_index_fast (set->events, idx);
834 #endif
835
836     /* remove the fd at index, we use _remove_index_fast, which copies the last
837      * element of the array to the freed index */
838     g_array_remove_index_fast (set->fds, idx);
839
840     /* mark fd as removed by setting the index to -1 */
841     fd->idx = -1;
842     MARK_REBUILD (set);
843   } else {
844     GST_WARNING ("%p: couldn't find fd !", set);
845   }
846
847   g_mutex_unlock (&set->lock);
848
849   return idx >= 0;
850 }
851
852 /**
853  * gst_poll_fd_ctl_write:
854  * @set: a file descriptor set.
855  * @fd: a file descriptor.
856  * @active: a new status.
857  *
858  * Control whether the descriptor @fd in @set will be monitored for
859  * writability.
860  *
861  * Returns: %TRUE if the descriptor was successfully updated.
862  *
863  * Since: 0.10.18
864  */
865 gboolean
866 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
867 {
868   gint idx;
869
870   g_return_val_if_fail (set != NULL, FALSE);
871   g_return_val_if_fail (fd != NULL, FALSE);
872   g_return_val_if_fail (fd->fd >= 0, FALSE);
873
874   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
875       fd->fd, fd->idx, active);
876
877   g_mutex_lock (&set->lock);
878
879   idx = find_index (set->fds, fd);
880   if (idx >= 0) {
881 #ifndef G_OS_WIN32
882     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
883
884     if (active)
885       pfd->events |= POLLOUT;
886     else
887       pfd->events &= ~POLLOUT;
888
889     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
890 #else
891     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
892         active);
893 #endif
894     MARK_REBUILD (set);
895   } else {
896     GST_WARNING ("%p: couldn't find fd !", set);
897   }
898
899   g_mutex_unlock (&set->lock);
900
901   return idx >= 0;
902 }
903
904 static gboolean
905 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
906 {
907   gint idx;
908
909   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
910       fd->fd, fd->idx, active);
911
912   idx = find_index (set->fds, fd);
913
914   if (idx >= 0) {
915 #ifndef G_OS_WIN32
916     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
917
918     if (active)
919       pfd->events |= (POLLIN | POLLPRI);
920     else
921       pfd->events &= ~(POLLIN | POLLPRI);
922 #else
923     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
924 #endif
925     MARK_REBUILD (set);
926   } else {
927     GST_WARNING ("%p: couldn't find fd !", set);
928   }
929
930   return idx >= 0;
931 }
932
933 /**
934  * gst_poll_fd_ctl_read:
935  * @set: a file descriptor set.
936  * @fd: a file descriptor.
937  * @active: a new status.
938  *
939  * Control whether the descriptor @fd in @set will be monitored for
940  * readability.
941  *
942  * Returns: %TRUE if the descriptor was successfully updated.
943  *
944  * Since: 0.10.18
945  */
946 gboolean
947 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
948 {
949   gboolean ret;
950
951   g_return_val_if_fail (set != NULL, FALSE);
952   g_return_val_if_fail (fd != NULL, FALSE);
953   g_return_val_if_fail (fd->fd >= 0, FALSE);
954
955   g_mutex_lock (&set->lock);
956
957   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
958
959   g_mutex_unlock (&set->lock);
960
961   return ret;
962 }
963
964 /**
965  * gst_poll_fd_ignored:
966  * @set: a file descriptor set.
967  * @fd: a file descriptor.
968  *
969  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
970  * the same result for @fd as last time. This function must be called if no
971  * operation (read/write/recv/send/etc.) will be performed on @fd before
972  * the next call to gst_poll_wait().
973  *
974  * The reason why this is needed is because the underlying implementation
975  * might not allow querying the fd more than once between calls to one of
976  * the re-enabling operations.
977  *
978  * Since: 0.10.18
979  */
980 void
981 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
982 {
983 #ifdef G_OS_WIN32
984   gint idx;
985
986   g_return_if_fail (set != NULL);
987   g_return_if_fail (fd != NULL);
988   g_return_if_fail (fd->fd >= 0);
989
990   g_mutex_lock (&set->lock);
991
992   idx = find_index (set->fds, fd);
993   if (idx >= 0) {
994     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
995
996     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
997     MARK_REBUILD (set);
998   }
999
1000   g_mutex_unlock (&set->lock);
1001 #endif
1002 }
1003
1004 /**
1005  * gst_poll_fd_has_closed:
1006  * @set: a file descriptor set.
1007  * @fd: a file descriptor.
1008  *
1009  * Check if @fd in @set has closed the connection.
1010  *
1011  * Returns: %TRUE if the connection was closed.
1012  *
1013  * Since: 0.10.18
1014  */
1015 gboolean
1016 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1017 {
1018   gboolean res = FALSE;
1019   gint idx;
1020
1021   g_return_val_if_fail (set != NULL, FALSE);
1022   g_return_val_if_fail (fd != NULL, FALSE);
1023   g_return_val_if_fail (fd->fd >= 0, FALSE);
1024
1025   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1026
1027   g_mutex_lock (&((GstPoll *) set)->lock);
1028
1029   idx = find_index (set->active_fds, fd);
1030   if (idx >= 0) {
1031 #ifndef G_OS_WIN32
1032     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1033
1034     res = (pfd->revents & POLLHUP) != 0;
1035 #else
1036     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1037
1038     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1039 #endif
1040   } else {
1041     GST_WARNING ("%p: couldn't find fd !", set);
1042   }
1043
1044   g_mutex_unlock (&((GstPoll *) set)->lock);
1045
1046   return res;
1047 }
1048
1049 /**
1050  * gst_poll_fd_has_error:
1051  * @set: a file descriptor set.
1052  * @fd: a file descriptor.
1053  *
1054  * Check if @fd in @set has an error.
1055  *
1056  * Returns: %TRUE if the descriptor has an error.
1057  *
1058  * Since: 0.10.18
1059  */
1060 gboolean
1061 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1062 {
1063   gboolean res = FALSE;
1064   gint idx;
1065
1066   g_return_val_if_fail (set != NULL, FALSE);
1067   g_return_val_if_fail (fd != NULL, FALSE);
1068   g_return_val_if_fail (fd->fd >= 0, FALSE);
1069
1070   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1071
1072   g_mutex_lock (&((GstPoll *) set)->lock);
1073
1074   idx = find_index (set->active_fds, fd);
1075   if (idx >= 0) {
1076 #ifndef G_OS_WIN32
1077     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1078
1079     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1080 #else
1081     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1082
1083     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1084         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1085         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1086         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1087         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1088 #endif
1089   } else {
1090     GST_WARNING ("%p: couldn't find fd !", set);
1091   }
1092
1093   g_mutex_unlock (&((GstPoll *) set)->lock);
1094
1095   return res;
1096 }
1097
1098 static gboolean
1099 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1100 {
1101   gboolean res = FALSE;
1102   gint idx;
1103
1104   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1105
1106   idx = find_index (set->active_fds, fd);
1107   if (idx >= 0) {
1108 #ifndef G_OS_WIN32
1109     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1110
1111     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1112 #else
1113     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1114
1115     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1116 #endif
1117   } else {
1118     GST_WARNING ("%p: couldn't find fd !", set);
1119   }
1120
1121   return res;
1122 }
1123
1124 /**
1125  * gst_poll_fd_can_read:
1126  * @set: a file descriptor set.
1127  * @fd: a file descriptor.
1128  *
1129  * Check if @fd in @set has data to be read.
1130  *
1131  * Returns: %TRUE if the descriptor has data to be read.
1132  *
1133  * Since: 0.10.18
1134  */
1135 gboolean
1136 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1137 {
1138   gboolean res = FALSE;
1139
1140   g_return_val_if_fail (set != NULL, FALSE);
1141   g_return_val_if_fail (fd != NULL, FALSE);
1142   g_return_val_if_fail (fd->fd >= 0, FALSE);
1143
1144   g_mutex_lock (&((GstPoll *) set)->lock);
1145
1146   res = gst_poll_fd_can_read_unlocked (set, fd);
1147
1148   g_mutex_unlock (&((GstPoll *) set)->lock);
1149
1150   return res;
1151 }
1152
1153 /**
1154  * gst_poll_fd_can_write:
1155  * @set: a file descriptor set.
1156  * @fd: a file descriptor.
1157  *
1158  * Check if @fd in @set can be used for writing.
1159  *
1160  * Returns: %TRUE if the descriptor can be used for writing.
1161  *
1162  * Since: 0.10.18
1163  */
1164 gboolean
1165 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1166 {
1167   gboolean res = FALSE;
1168   gint idx;
1169
1170   g_return_val_if_fail (set != NULL, FALSE);
1171   g_return_val_if_fail (fd != NULL, FALSE);
1172   g_return_val_if_fail (fd->fd >= 0, FALSE);
1173
1174   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1175
1176   g_mutex_lock (&((GstPoll *) set)->lock);
1177
1178   idx = find_index (set->active_fds, fd);
1179   if (idx >= 0) {
1180 #ifndef G_OS_WIN32
1181     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1182
1183     res = (pfd->revents & POLLOUT) != 0;
1184 #else
1185     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1186
1187     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1188 #endif
1189   } else {
1190     GST_WARNING ("%p: couldn't find fd !", set);
1191   }
1192
1193   g_mutex_unlock (&((GstPoll *) set)->lock);
1194
1195   return res;
1196 }
1197
1198 /**
1199  * gst_poll_wait:
1200  * @set: a #GstPoll.
1201  * @timeout: a timeout in nanoseconds.
1202  *
1203  * Wait for activity on the file descriptors in @set. This function waits up to
1204  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1205  *
1206  * For #GstPoll objects created with gst_poll_new(), this function can only be
1207  * called from a single thread at a time.  If called from multiple threads,
1208  * -1 will be returned with errno set to EPERM.
1209  *
1210  * This is not true for timer #GstPoll objects created with
1211  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1212  * simultaneously.
1213  *
1214  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1215  * activity was detected after @timeout. If an error occurs, -1 is returned
1216  * and errno is set.
1217  *
1218  * Since: 0.10.18
1219  */
1220 gint
1221 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1222 {
1223   gboolean restarting;
1224   gboolean is_timer;
1225   int res;
1226   gint old_waiting;
1227
1228   g_return_val_if_fail (set != NULL, -1);
1229
1230   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1231
1232   is_timer = set->timer;
1233
1234   /* add one more waiter */
1235   old_waiting = INC_WAITING (set);
1236
1237   /* we cannot wait from multiple threads unless we are a timer */
1238   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1239     goto already_waiting;
1240
1241   /* flushing, exit immediately */
1242   if (G_UNLIKELY (IS_FLUSHING (set)))
1243     goto flushing;
1244
1245   do {
1246     GstPollMode mode;
1247
1248     res = -1;
1249     restarting = FALSE;
1250
1251     mode = choose_mode (set, timeout);
1252
1253     if (TEST_REBUILD (set)) {
1254       g_mutex_lock (&set->lock);
1255 #ifndef G_OS_WIN32
1256       g_array_set_size (set->active_fds, set->fds->len);
1257       memcpy (set->active_fds->data, set->fds->data,
1258           set->fds->len * sizeof (struct pollfd));
1259 #else
1260       if (!gst_poll_prepare_winsock_active_sets (set))
1261         goto winsock_error;
1262 #endif
1263       g_mutex_unlock (&set->lock);
1264     }
1265
1266     switch (mode) {
1267       case GST_POLL_MODE_AUTO:
1268         g_assert_not_reached ();
1269         break;
1270       case GST_POLL_MODE_PPOLL:
1271       {
1272 #ifdef HAVE_PPOLL
1273         struct timespec ts;
1274         struct timespec *tsptr;
1275
1276         if (timeout != GST_CLOCK_TIME_NONE) {
1277           GST_TIME_TO_TIMESPEC (timeout, ts);
1278           tsptr = &ts;
1279         } else {
1280           tsptr = NULL;
1281         }
1282
1283         res =
1284             ppoll ((struct pollfd *) set->active_fds->data,
1285             set->active_fds->len, tsptr, NULL);
1286 #else
1287         g_assert_not_reached ();
1288         errno = ENOSYS;
1289 #endif
1290         break;
1291       }
1292       case GST_POLL_MODE_POLL:
1293       {
1294 #ifdef HAVE_POLL
1295         gint t;
1296
1297         if (timeout != GST_CLOCK_TIME_NONE) {
1298           t = GST_TIME_AS_MSECONDS (timeout);
1299         } else {
1300           t = -1;
1301         }
1302
1303         res =
1304             poll ((struct pollfd *) set->active_fds->data,
1305             set->active_fds->len, t);
1306 #else
1307         g_assert_not_reached ();
1308         errno = ENOSYS;
1309 #endif
1310         break;
1311       }
1312       case GST_POLL_MODE_PSELECT:
1313 #ifndef HAVE_PSELECT
1314       {
1315         g_assert_not_reached ();
1316         errno = ENOSYS;
1317         break;
1318       }
1319 #endif
1320       case GST_POLL_MODE_SELECT:
1321       {
1322 #ifndef G_OS_WIN32
1323         fd_set readfds;
1324         fd_set writefds;
1325         fd_set errorfds;
1326         gint max_fd;
1327
1328         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1329
1330         if (mode == GST_POLL_MODE_SELECT) {
1331           struct timeval tv;
1332           struct timeval *tvptr;
1333
1334           if (timeout != GST_CLOCK_TIME_NONE) {
1335             GST_TIME_TO_TIMEVAL (timeout, tv);
1336             tvptr = &tv;
1337           } else {
1338             tvptr = NULL;
1339           }
1340
1341           GST_DEBUG ("Calling select");
1342           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1343           GST_DEBUG ("After select, res:%d", res);
1344         } else {
1345 #ifdef HAVE_PSELECT
1346           struct timespec ts;
1347           struct timespec *tsptr;
1348
1349           if (timeout != GST_CLOCK_TIME_NONE) {
1350             GST_TIME_TO_TIMESPEC (timeout, ts);
1351             tsptr = &ts;
1352           } else {
1353             tsptr = NULL;
1354           }
1355
1356           GST_DEBUG ("Calling pselect");
1357           res =
1358               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1359           GST_DEBUG ("After pselect, res:%d", res);
1360 #endif
1361         }
1362
1363         if (res >= 0) {
1364           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1365         }
1366 #else /* G_OS_WIN32 */
1367         g_assert_not_reached ();
1368         errno = ENOSYS;
1369 #endif
1370         break;
1371       }
1372       case GST_POLL_MODE_WINDOWS:
1373       {
1374 #ifdef G_OS_WIN32
1375         gint ignore_count = set->active_fds_ignored->len;
1376         DWORD t, wait_ret;
1377
1378         if (G_LIKELY (ignore_count == 0)) {
1379           if (timeout != GST_CLOCK_TIME_NONE)
1380             t = GST_TIME_AS_MSECONDS (timeout);
1381           else
1382             t = INFINITE;
1383         } else {
1384           /* already one or more ignored fds, so we quickly sweep the others */
1385           t = 0;
1386         }
1387
1388         if (set->active_events->len != 0) {
1389           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1390               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1391         } else {
1392           wait_ret = WSA_WAIT_FAILED;
1393           WSASetLastError (WSA_INVALID_PARAMETER);
1394         }
1395
1396         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1397           res = 0;
1398         } else if (wait_ret == WSA_WAIT_FAILED) {
1399           res = -1;
1400           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1401         } else {
1402           /* the first entry is the wakeup event */
1403           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1404             res = gst_poll_collect_winsock_events (set);
1405           } else {
1406             res = 1;            /* wakeup event */
1407           }
1408         }
1409 #else
1410         g_assert_not_reached ();
1411         errno = ENOSYS;
1412 #endif
1413         break;
1414       }
1415     }
1416
1417     if (!is_timer) {
1418       /* Applications needs to clear the control socket themselves for timer
1419        * polls.
1420        * For other polls, we need to clear the control socket. If there was only
1421        * one socket with activity and it was the control socket, we need to
1422        * restart */
1423       if (release_all_wakeup (set) > 0 && res == 1)
1424         restarting = TRUE;
1425     }
1426
1427     /* we got woken up and we are flushing, we need to stop */
1428     if (G_UNLIKELY (IS_FLUSHING (set)))
1429       goto flushing;
1430
1431   } while (G_UNLIKELY (restarting));
1432
1433   DEC_WAITING (set);
1434
1435   return res;
1436
1437   /* ERRORS */
1438 already_waiting:
1439   {
1440     GST_LOG ("%p: we are already waiting", set);
1441     DEC_WAITING (set);
1442     errno = EPERM;
1443     return -1;
1444   }
1445 flushing:
1446   {
1447     GST_LOG ("%p: we are flushing", set);
1448     DEC_WAITING (set);
1449     errno = EBUSY;
1450     return -1;
1451   }
1452 #ifdef G_OS_WIN32
1453 winsock_error:
1454   {
1455     GST_LOG ("%p: winsock error", set);
1456     g_mutex_unlock (&set->lock);
1457     DEC_WAITING (set);
1458     return -1;
1459   }
1460 #endif
1461 }
1462
1463 /**
1464  * gst_poll_set_controllable:
1465  * @set: a #GstPoll.
1466  * @controllable: new controllable state.
1467  *
1468  * When @controllable is %TRUE, this function ensures that future calls to
1469  * gst_poll_wait() will be affected by gst_poll_restart() and
1470  * gst_poll_set_flushing().
1471  *
1472  * Returns: %TRUE if the controllability of @set could be updated.
1473  *
1474  * Since: 0.10.18
1475  */
1476 gboolean
1477 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1478 {
1479   g_return_val_if_fail (set != NULL, FALSE);
1480
1481   GST_LOG ("%p: controllable : %d", set, controllable);
1482
1483   set->controllable = controllable;
1484
1485   return TRUE;
1486 }
1487
1488 /**
1489  * gst_poll_restart:
1490  * @set: a #GstPoll.
1491  *
1492  * Restart any gst_poll_wait() that is in progress. This function is typically
1493  * used after adding or removing descriptors to @set.
1494  *
1495  * If @set is not controllable, then this call will have no effect.
1496  *
1497  * Since: 0.10.18
1498  */
1499 void
1500 gst_poll_restart (GstPoll * set)
1501 {
1502   g_return_if_fail (set != NULL);
1503
1504   if (set->controllable && GET_WAITING (set) > 0) {
1505     /* we are controllable and waiting, wake up the waiter. The socket will be
1506      * cleared by the _wait() thread and the poll will be restarted */
1507     raise_wakeup (set);
1508   }
1509 }
1510
1511 /**
1512  * gst_poll_set_flushing:
1513  * @set: a #GstPoll.
1514  * @flushing: new flushing state.
1515  *
1516  * When @flushing is %TRUE, this function ensures that current and future calls
1517  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1518  *
1519  * Unsetting the flushing state will restore normal operation of @set.
1520  *
1521  * Since: 0.10.18
1522  */
1523 void
1524 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1525 {
1526   g_return_if_fail (set != NULL);
1527
1528   GST_LOG ("%p: flushing: %d", set, flushing);
1529
1530   /* update the new state first */
1531   SET_FLUSHING (set, flushing);
1532
1533   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1534     /* we are flushing, controllable and waiting, wake up the waiter. When we
1535      * stop the flushing operation we don't clear the wakeup fd here, this will
1536      * happen in the _wait() thread. */
1537     raise_wakeup (set);
1538   }
1539 }
1540
1541 /**
1542  * gst_poll_write_control:
1543  * @set: a #GstPoll.
1544  *
1545  * Write a byte to the control socket of the controllable @set.
1546  * This function is mostly useful for timer #GstPoll objects created with
1547  * gst_poll_new_timer(). 
1548  *
1549  * It will make any current and future gst_poll_wait() function return with
1550  * 1, meaning the control socket is set. After an equal amount of calls to
1551  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1552  * block again until their timeout expired.
1553  *
1554  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1555  * byte could not be written.
1556  *
1557  * Since: 0.10.23
1558  */
1559 gboolean
1560 gst_poll_write_control (GstPoll * set)
1561 {
1562   gboolean res;
1563
1564   g_return_val_if_fail (set != NULL, FALSE);
1565   g_return_val_if_fail (set->timer, FALSE);
1566
1567   res = raise_wakeup (set);
1568
1569   return res;
1570 }
1571
1572 /**
1573  * gst_poll_read_control:
1574  * @set: a #GstPoll.
1575  *
1576  * Read a byte from the control socket of the controllable @set.
1577  * This function is mostly useful for timer #GstPoll objects created with
1578  * gst_poll_new_timer(). 
1579  *
1580  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1581  * was no byte to read.
1582  *
1583  * Since: 0.10.23
1584  */
1585 gboolean
1586 gst_poll_read_control (GstPoll * set)
1587 {
1588   gboolean res;
1589
1590   g_return_val_if_fail (set != NULL, FALSE);
1591   g_return_val_if_fail (set->timer, FALSE);
1592
1593   res = release_wakeup (set);
1594
1595   return res;
1596 }