uri: fix wrong G_GNUC_MALLOC
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writeable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writeable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writeable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #define EINPROGRESS WSAEINPROGRESS
76 #else
77 #define _GNU_SOURCE 1
78 #include <sys/poll.h>
79 #include <sys/time.h>
80 #include <sys/socket.h>
81 #endif
82
83 /* OS/X needs this because of bad headers */
84 #include <string.h>
85
86 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
87  * so we prefer our own poll emulation.
88  */
89 #if defined(BROKEN_POLL)
90 #undef HAVE_POLL
91 #endif
92
93 #include "gstpoll.h"
94
95 #define GST_CAT_DEFAULT GST_CAT_POLL
96
97 #ifdef G_OS_WIN32
98 typedef struct _WinsockFd WinsockFd;
99
100 struct _WinsockFd
101 {
102   gint fd;
103   glong event_mask;
104   WSANETWORKEVENTS events;
105   glong ignored_event_mask;
106 };
107 #endif
108
109 typedef enum
110 {
111   GST_POLL_MODE_AUTO,
112   GST_POLL_MODE_SELECT,
113   GST_POLL_MODE_PSELECT,
114   GST_POLL_MODE_POLL,
115   GST_POLL_MODE_PPOLL,
116   GST_POLL_MODE_WINDOWS
117 } GstPollMode;
118
119 struct _GstPoll
120 {
121   GstPollMode mode;
122
123   GMutex *lock;
124   /* array of fds, always written to and read from with lock */
125   GArray *fds;
126   /* array of active fds, only written to from the waiting thread with the
127    * lock and read from with the lock or without the lock from the waiting
128    * thread */
129   GArray *active_fds;
130
131 #ifndef G_OS_WIN32
132   gchar buf[1];
133   GstPollFD control_read_fd;
134   GstPollFD control_write_fd;
135 #else
136   GArray *active_fds_ignored;
137   GArray *events;
138   GArray *active_events;
139
140   HANDLE wakeup_event;
141 #endif
142
143   gboolean controllable;
144   volatile gint waiting;
145   volatile gint control_pending;
146   volatile gint flushing;
147   gboolean timer;
148   volatile gint rebuild;
149 };
150
151 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
152     gboolean active);
153 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
154
155 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
156 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
157
158 #define INC_WAITING(s)      (G_ATOMIC_INT_ADD(&(s)->waiting, 1))
159 #define DEC_WAITING(s)      (G_ATOMIC_INT_ADD(&(s)->waiting, -1))
160 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
161
162 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
163 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
164
165 #ifndef G_OS_WIN32
166 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
167 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
168 #else
169 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
170 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
171 #endif
172
173 /* the poll/select call is also performed on a control socket, that way
174  * we can send special commands to control it */
175 static inline gboolean
176 raise_wakeup (GstPoll * set)
177 {
178   gboolean result = TRUE;
179
180   if (G_ATOMIC_INT_ADD (&set->control_pending, 1) == 0) {
181     /* raise when nothing pending */
182     result = WAKE_EVENT (set);
183   }
184   return result;
185 }
186
187 /* note how bad things can happen when the 2 threads both raise and release the
188  * wakeup. This is however not a problem because you must always pair a raise
189  * with a release */
190 static inline gboolean
191 release_wakeup (GstPoll * set)
192 {
193   gboolean result = TRUE;
194
195   if (g_atomic_int_dec_and_test (&set->control_pending)) {
196     result = RELEASE_EVENT (set);
197   }
198   return result;
199 }
200
201 static inline gint
202 release_all_wakeup (GstPoll * set)
203 {
204   gint old;
205
206   while (TRUE) {
207     if (!(old = g_atomic_int_get (&set->control_pending)))
208       /* nothing pending, just exit */
209       break;
210
211     /* try to remove all pending control messages */
212     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
213       /* we managed to remove all messages, read the control socket */
214       if (RELEASE_EVENT (set))
215         break;
216       else
217         /* retry again until we read it successfully */
218         G_ATOMIC_INT_ADD (&set->control_pending, 1);
219     }
220   }
221   return old;
222 }
223
224 static gint
225 find_index (GArray * array, GstPollFD * fd)
226 {
227 #ifndef G_OS_WIN32
228   struct pollfd *ifd;
229 #else
230   WinsockFd *ifd;
231 #endif
232   guint i;
233
234   /* start by assuming the index found in the fd is still valid */
235   if (fd->idx >= 0 && fd->idx < array->len) {
236 #ifndef G_OS_WIN32
237     ifd = &g_array_index (array, struct pollfd, fd->idx);
238 #else
239     ifd = &g_array_index (array, WinsockFd, fd->idx);
240 #endif
241
242     if (ifd->fd == fd->fd) {
243       return fd->idx;
244     }
245   }
246
247   /* the pollfd array has changed and we need to lookup the fd again */
248   for (i = 0; i < array->len; i++) {
249 #ifndef G_OS_WIN32
250     ifd = &g_array_index (array, struct pollfd, i);
251 #else
252     ifd = &g_array_index (array, WinsockFd, i);
253 #endif
254
255     if (ifd->fd == fd->fd) {
256       fd->idx = (gint) i;
257       return fd->idx;
258     }
259   }
260
261   fd->idx = -1;
262   return fd->idx;
263 }
264
265 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
266 /* check if all file descriptors will fit in an fd_set */
267 static gboolean
268 selectable_fds (const GstPoll * set)
269 {
270   guint i;
271
272   g_mutex_lock (set->lock);
273   for (i = 0; i < set->fds->len; i++) {
274     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
275
276     if (pfd->fd >= FD_SETSIZE)
277       goto too_many;
278   }
279   g_mutex_unlock (set->lock);
280
281   return TRUE;
282
283 too_many:
284   {
285     g_mutex_unlock (set->lock);
286     return FALSE;
287   }
288 }
289
290 /* check if the timeout will convert to a timeout value used for poll()
291  * without a loss of precision
292  */
293 static gboolean
294 pollable_timeout (GstClockTime timeout)
295 {
296   if (timeout == GST_CLOCK_TIME_NONE)
297     return TRUE;
298
299   /* not a nice multiple of milliseconds */
300   if (timeout % 1000000)
301     return FALSE;
302
303   return TRUE;
304 }
305 #endif
306
307 static GstPollMode
308 choose_mode (const GstPoll * set, GstClockTime timeout)
309 {
310   GstPollMode mode;
311
312   if (set->mode == GST_POLL_MODE_AUTO) {
313 #ifdef HAVE_PPOLL
314     mode = GST_POLL_MODE_PPOLL;
315 #elif defined(HAVE_POLL)
316     if (!selectable_fds (set) || pollable_timeout (timeout)) {
317       mode = GST_POLL_MODE_POLL;
318     } else {
319 #ifdef HAVE_PSELECT
320       mode = GST_POLL_MODE_PSELECT;
321 #else
322       mode = GST_POLL_MODE_SELECT;
323 #endif
324     }
325 #elif defined(HAVE_PSELECT)
326     mode = GST_POLL_MODE_PSELECT;
327 #else
328     mode = GST_POLL_MODE_SELECT;
329 #endif
330   } else {
331     mode = set->mode;
332   }
333   return mode;
334 }
335
336 #ifndef G_OS_WIN32
337 static gint
338 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
339     fd_set * errorfds)
340 {
341   gint max_fd = -1;
342   guint i;
343
344   FD_ZERO (readfds);
345   FD_ZERO (writefds);
346   FD_ZERO (errorfds);
347
348   g_mutex_lock (set->lock);
349
350   for (i = 0; i < set->active_fds->len; i++) {
351     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
352
353     if (pfd->fd < FD_SETSIZE) {
354       if (pfd->events & POLLIN)
355         FD_SET (pfd->fd, readfds);
356       if (pfd->events & POLLOUT)
357         FD_SET (pfd->fd, writefds);
358       if (pfd->events)
359         FD_SET (pfd->fd, errorfds);
360       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
361         max_fd = pfd->fd;
362     }
363   }
364
365   g_mutex_unlock (set->lock);
366
367   return max_fd;
368 }
369
370 static void
371 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
372     fd_set * errorfds)
373 {
374   guint i;
375
376   g_mutex_lock (set->lock);
377
378   for (i = 0; i < set->active_fds->len; i++) {
379     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
380
381     if (pfd->fd < FD_SETSIZE) {
382       pfd->revents = 0;
383       if (FD_ISSET (pfd->fd, readfds))
384         pfd->revents |= POLLIN;
385       if (FD_ISSET (pfd->fd, writefds))
386         pfd->revents |= POLLOUT;
387       if (FD_ISSET (pfd->fd, errorfds))
388         pfd->revents |= POLLERR;
389     }
390   }
391
392   g_mutex_unlock (set->lock);
393 }
394 #else /* G_OS_WIN32 */
395 /*
396  * Translate errors thrown by the Winsock API used by GstPoll:
397  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
398  */
399 static gint
400 gst_poll_winsock_error_to_errno (DWORD last_error)
401 {
402   switch (last_error) {
403     case WSA_INVALID_HANDLE:
404     case WSAEINVAL:
405     case WSAENOTSOCK:
406       return EBADF;
407
408     case WSA_NOT_ENOUGH_MEMORY:
409       return ENOMEM;
410
411       /*
412        * Anything else, including:
413        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
414        *   WSANOTINITIALISED
415        */
416     default:
417       return EINVAL;
418   }
419 }
420
421 static void
422 gst_poll_free_winsock_event (GstPoll * set, gint idx)
423 {
424   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
425   HANDLE event = g_array_index (set->events, HANDLE, idx);
426
427   WSAEventSelect (wfd->fd, event, 0);
428   CloseHandle (event);
429 }
430
431 static void
432 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
433     gboolean active)
434 {
435   WinsockFd *wfd;
436
437   wfd = &g_array_index (set->fds, WinsockFd, idx);
438
439   if (active)
440     wfd->event_mask |= flags;
441   else
442     wfd->event_mask &= ~flags;
443
444   /* reset ignored state if the new mask doesn't overlap at all */
445   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
446     wfd->ignored_event_mask = 0;
447 }
448
449 static gboolean
450 gst_poll_prepare_winsock_active_sets (GstPoll * set)
451 {
452   guint i;
453
454   g_array_set_size (set->active_fds, 0);
455   g_array_set_size (set->active_fds_ignored, 0);
456   g_array_set_size (set->active_events, 0);
457   g_array_append_val (set->active_events, set->wakeup_event);
458
459   for (i = 0; i < set->fds->len; i++) {
460     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
461     HANDLE event = g_array_index (set->events, HANDLE, i);
462
463     if (wfd->ignored_event_mask == 0) {
464       gint ret;
465
466       g_array_append_val (set->active_fds, *wfd);
467       g_array_append_val (set->active_events, event);
468
469       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
470       if (G_UNLIKELY (ret != 0)) {
471         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
472         return FALSE;
473       }
474     } else {
475       g_array_append_val (set->active_fds_ignored, wfd);
476     }
477   }
478
479   return TRUE;
480 }
481
482 static gint
483 gst_poll_collect_winsock_events (GstPoll * set)
484 {
485   gint res, i;
486
487   /*
488    * We need to check which events are signaled, and call
489    * WSAEnumNetworkEvents for those that are, which resets
490    * the event and clears the internal network event records.
491    */
492   res = 0;
493   for (i = 0; i < set->active_fds->len; i++) {
494     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
495     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
496     DWORD wait_ret;
497
498     wait_ret = WaitForSingleObject (event, 0);
499     if (wait_ret == WAIT_OBJECT_0) {
500       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
501
502       if (G_UNLIKELY (enum_ret != 0)) {
503         res = -1;
504         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
505         break;
506       }
507
508       res++;
509     } else {
510       /* clear any previously stored result */
511       memset (&wfd->events, 0, sizeof (wfd->events));
512     }
513   }
514
515   /* If all went well we also need to reset the ignored fds. */
516   if (res >= 0) {
517     res += set->active_fds_ignored->len;
518
519     for (i = 0; i < set->active_fds_ignored->len; i++) {
520       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
521
522       wfd->ignored_event_mask = 0;
523     }
524
525     g_array_set_size (set->active_fds_ignored, 0);
526   }
527
528   return res;
529 }
530 #endif
531
532 /**
533  * gst_poll_new:
534  * @controllable: whether it should be possible to control a wait.
535  *
536  * Create a new file descriptor set. If @controllable, it
537  * is possible to restart or flush a call to gst_poll_wait() with
538  * gst_poll_restart() and gst_poll_set_flushing() respectively.
539  *
540  * Free-function: gst_poll_free
541  *
542  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
543  *     Free with gst_poll_free().
544  *
545  * Since: 0.10.18
546  */
547 GstPoll *
548 gst_poll_new (gboolean controllable)
549 {
550   GstPoll *nset;
551
552   GST_DEBUG ("controllable : %d", controllable);
553
554   nset = g_slice_new0 (GstPoll);
555   nset->lock = g_mutex_new ();
556 #ifndef G_OS_WIN32
557   nset->mode = GST_POLL_MODE_AUTO;
558   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
559   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
560   nset->control_read_fd.fd = -1;
561   nset->control_write_fd.fd = -1;
562   {
563     gint control_sock[2];
564
565     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
566       goto no_socket_pair;
567
568     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
569     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
570
571     nset->control_read_fd.fd = control_sock[0];
572     nset->control_write_fd.fd = control_sock[1];
573
574     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
575     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
576   }
577 #else
578   nset->mode = GST_POLL_MODE_WINDOWS;
579   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
580   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
581   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
582   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
583   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
584
585   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
586 #endif
587
588   /* ensure (re)build, though already sneakily set in non-windows case */
589   MARK_REBUILD (nset);
590
591   nset->controllable = controllable;
592
593   return nset;
594
595   /* ERRORS */
596 #ifndef G_OS_WIN32
597 no_socket_pair:
598   {
599     GST_WARNING ("%p: can't create socket pair !", nset);
600     gst_poll_free (nset);
601     return NULL;
602   }
603 #endif
604 }
605
606 /**
607  * gst_poll_new_timer:
608  *
609  * Create a new poll object that can be used for scheduling cancellable
610  * timeouts.
611  *
612  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
613  * performed from different threads. 
614  *
615  * Free-function: gst_poll_free
616  *
617  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
618  *     Free with gst_poll_free().
619  *
620  * Since: 0.10.23
621  */
622 GstPoll *
623 gst_poll_new_timer (void)
624 {
625   GstPoll *poll;
626
627   /* make a new controllable poll set */
628   if (!(poll = gst_poll_new (TRUE)))
629     goto done;
630
631   /* we are a timer */
632   poll->timer = TRUE;
633
634 done:
635   return poll;
636 }
637
638 /**
639  * gst_poll_free:
640  * @set: (transfer full): a file descriptor set.
641  *
642  * Free a file descriptor set.
643  *
644  * Since: 0.10.18
645  */
646 void
647 gst_poll_free (GstPoll * set)
648 {
649   g_return_if_fail (set != NULL);
650
651   GST_DEBUG ("%p: freeing", set);
652
653 #ifndef G_OS_WIN32
654   if (set->control_write_fd.fd >= 0)
655     close (set->control_write_fd.fd);
656   if (set->control_read_fd.fd >= 0)
657     close (set->control_read_fd.fd);
658 #else
659   CloseHandle (set->wakeup_event);
660
661   {
662     guint i;
663
664     for (i = 0; i < set->events->len; i++)
665       gst_poll_free_winsock_event (set, i);
666   }
667
668   g_array_free (set->active_events, TRUE);
669   g_array_free (set->events, TRUE);
670   g_array_free (set->active_fds_ignored, TRUE);
671 #endif
672
673   g_array_free (set->active_fds, TRUE);
674   g_array_free (set->fds, TRUE);
675   g_mutex_free (set->lock);
676   g_slice_free (GstPoll, set);
677 }
678
679 /**
680  * gst_poll_get_read_gpollfd:
681  * @set: a #GstPoll
682  * @fd: a #GPollFD
683  *
684  * Get a GPollFD for the reading part of the control socket. This is useful when
685  * integrating with a GSource and GMainLoop.
686  *
687  * Since: 0.10.32
688  */
689 void
690 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
691 {
692   g_return_if_fail (set != NULL);
693   g_return_if_fail (fd != NULL);
694
695 #ifndef G_OS_WIN32
696   fd->fd = set->control_read_fd.fd;
697 #else
698 #if GLIB_SIZEOF_VOID_P == 8
699   fd->fd = (gint64) set->wakeup_event;
700 #else
701   fd->fd = (gint) set->wakeup_event;
702 #endif
703 #endif
704   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
705   fd->revents = 0;
706 }
707
708 /**
709  * gst_poll_fd_init:
710  * @fd: a #GstPollFD
711  *
712  * Initializes @fd. Alternatively you can initialize it with
713  * #GST_POLL_FD_INIT.
714  *
715  * Since: 0.10.18
716  */
717 void
718 gst_poll_fd_init (GstPollFD * fd)
719 {
720   g_return_if_fail (fd != NULL);
721
722   fd->fd = -1;
723   fd->idx = -1;
724 }
725
726 static gboolean
727 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
728 {
729   gint idx;
730
731   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
732
733   idx = find_index (set->fds, fd);
734   if (idx < 0) {
735 #ifndef G_OS_WIN32
736     struct pollfd nfd;
737
738     nfd.fd = fd->fd;
739     nfd.events = POLLERR | POLLNVAL | POLLHUP;
740     nfd.revents = 0;
741
742     g_array_append_val (set->fds, nfd);
743
744     fd->idx = set->fds->len - 1;
745 #else
746     WinsockFd wfd;
747     HANDLE event;
748
749     wfd.fd = fd->fd;
750     wfd.event_mask = FD_CLOSE;
751     memset (&wfd.events, 0, sizeof (wfd.events));
752     wfd.ignored_event_mask = 0;
753     event = WSACreateEvent ();
754
755     g_array_append_val (set->fds, wfd);
756     g_array_append_val (set->events, event);
757
758     fd->idx = set->fds->len - 1;
759 #endif
760     MARK_REBUILD (set);
761   } else {
762     GST_WARNING ("%p: couldn't find fd !", set);
763   }
764
765   return TRUE;
766 }
767
768 /**
769  * gst_poll_add_fd:
770  * @set: a file descriptor set.
771  * @fd: a file descriptor.
772  *
773  * Add a file descriptor to the file descriptor set.
774  *
775  * Returns: %TRUE if the file descriptor was successfully added to the set.
776  *
777  * Since: 0.10.18
778  */
779 gboolean
780 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
781 {
782   gboolean ret;
783
784   g_return_val_if_fail (set != NULL, FALSE);
785   g_return_val_if_fail (fd != NULL, FALSE);
786   g_return_val_if_fail (fd->fd >= 0, FALSE);
787
788   g_mutex_lock (set->lock);
789
790   ret = gst_poll_add_fd_unlocked (set, fd);
791
792   g_mutex_unlock (set->lock);
793
794   return ret;
795 }
796
797 /**
798  * gst_poll_remove_fd:
799  * @set: a file descriptor set.
800  * @fd: a file descriptor.
801  *
802  * Remove a file descriptor from the file descriptor set.
803  *
804  * Returns: %TRUE if the file descriptor was successfully removed from the set.
805  *
806  * Since: 0.10.18
807  */
808 gboolean
809 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
810 {
811   gint idx;
812
813   g_return_val_if_fail (set != NULL, FALSE);
814   g_return_val_if_fail (fd != NULL, FALSE);
815   g_return_val_if_fail (fd->fd >= 0, FALSE);
816
817
818   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
819
820   g_mutex_lock (set->lock);
821
822   /* get the index, -1 is an fd that is not added */
823   idx = find_index (set->fds, fd);
824   if (idx >= 0) {
825 #ifdef G_OS_WIN32
826     gst_poll_free_winsock_event (set, idx);
827     g_array_remove_index_fast (set->events, idx);
828 #endif
829
830     /* remove the fd at index, we use _remove_index_fast, which copies the last
831      * element of the array to the freed index */
832     g_array_remove_index_fast (set->fds, idx);
833
834     /* mark fd as removed by setting the index to -1 */
835     fd->idx = -1;
836     MARK_REBUILD (set);
837   } else {
838     GST_WARNING ("%p: couldn't find fd !", set);
839   }
840
841   g_mutex_unlock (set->lock);
842
843   return idx >= 0;
844 }
845
846 /**
847  * gst_poll_fd_ctl_write:
848  * @set: a file descriptor set.
849  * @fd: a file descriptor.
850  * @active: a new status.
851  *
852  * Control whether the descriptor @fd in @set will be monitored for
853  * writability.
854  *
855  * Returns: %TRUE if the descriptor was successfully updated.
856  *
857  * Since: 0.10.18
858  */
859 gboolean
860 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
861 {
862   gint idx;
863
864   g_return_val_if_fail (set != NULL, FALSE);
865   g_return_val_if_fail (fd != NULL, FALSE);
866   g_return_val_if_fail (fd->fd >= 0, FALSE);
867
868   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
869       fd->fd, fd->idx, active);
870
871   g_mutex_lock (set->lock);
872
873   idx = find_index (set->fds, fd);
874   if (idx >= 0) {
875 #ifndef G_OS_WIN32
876     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
877
878     if (active)
879       pfd->events |= POLLOUT;
880     else
881       pfd->events &= ~POLLOUT;
882
883     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
884 #else
885     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
886         active);
887 #endif
888     MARK_REBUILD (set);
889   } else {
890     GST_WARNING ("%p: couldn't find fd !", set);
891   }
892
893   g_mutex_unlock (set->lock);
894
895   return idx >= 0;
896 }
897
898 static gboolean
899 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
900 {
901   gint idx;
902
903   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
904       fd->fd, fd->idx, active);
905
906   idx = find_index (set->fds, fd);
907
908   if (idx >= 0) {
909 #ifndef G_OS_WIN32
910     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
911
912     if (active)
913       pfd->events |= (POLLIN | POLLPRI);
914     else
915       pfd->events &= ~(POLLIN | POLLPRI);
916 #else
917     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
918 #endif
919     MARK_REBUILD (set);
920   } else {
921     GST_WARNING ("%p: couldn't find fd !", set);
922   }
923
924   return idx >= 0;
925 }
926
927 /**
928  * gst_poll_fd_ctl_read:
929  * @set: a file descriptor set.
930  * @fd: a file descriptor.
931  * @active: a new status.
932  *
933  * Control whether the descriptor @fd in @set will be monitored for
934  * readability.
935  *
936  * Returns: %TRUE if the descriptor was successfully updated.
937  *
938  * Since: 0.10.18
939  */
940 gboolean
941 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
942 {
943   gboolean ret;
944
945   g_return_val_if_fail (set != NULL, FALSE);
946   g_return_val_if_fail (fd != NULL, FALSE);
947   g_return_val_if_fail (fd->fd >= 0, FALSE);
948
949   g_mutex_lock (set->lock);
950
951   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
952
953   g_mutex_unlock (set->lock);
954
955   return ret;
956 }
957
958 /**
959  * gst_poll_fd_ignored:
960  * @set: a file descriptor set.
961  * @fd: a file descriptor.
962  *
963  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
964  * the same result for @fd as last time. This function must be called if no
965  * operation (read/write/recv/send/etc.) will be performed on @fd before
966  * the next call to gst_poll_wait().
967  *
968  * The reason why this is needed is because the underlying implementation
969  * might not allow querying the fd more than once between calls to one of
970  * the re-enabling operations.
971  *
972  * Since: 0.10.18
973  */
974 void
975 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
976 {
977 #ifdef G_OS_WIN32
978   gint idx;
979
980   g_return_if_fail (set != NULL);
981   g_return_if_fail (fd != NULL);
982   g_return_if_fail (fd->fd >= 0);
983
984   g_mutex_lock (set->lock);
985
986   idx = find_index (set->fds, fd);
987   if (idx >= 0) {
988     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
989
990     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
991     MARK_REBUILD (set);
992   }
993
994   g_mutex_unlock (set->lock);
995 #endif
996 }
997
998 /**
999  * gst_poll_fd_has_closed:
1000  * @set: a file descriptor set.
1001  * @fd: a file descriptor.
1002  *
1003  * Check if @fd in @set has closed the connection.
1004  *
1005  * Returns: %TRUE if the connection was closed.
1006  *
1007  * Since: 0.10.18
1008  */
1009 gboolean
1010 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1011 {
1012   gboolean res = FALSE;
1013   gint idx;
1014
1015   g_return_val_if_fail (set != NULL, FALSE);
1016   g_return_val_if_fail (fd != NULL, FALSE);
1017   g_return_val_if_fail (fd->fd >= 0, FALSE);
1018
1019   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1020
1021   g_mutex_lock (set->lock);
1022
1023   idx = find_index (set->active_fds, fd);
1024   if (idx >= 0) {
1025 #ifndef G_OS_WIN32
1026     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1027
1028     res = (pfd->revents & POLLHUP) != 0;
1029 #else
1030     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1031
1032     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1033 #endif
1034   } else {
1035     GST_WARNING ("%p: couldn't find fd !", set);
1036   }
1037
1038   g_mutex_unlock (set->lock);
1039
1040   return res;
1041 }
1042
1043 /**
1044  * gst_poll_fd_has_error:
1045  * @set: a file descriptor set.
1046  * @fd: a file descriptor.
1047  *
1048  * Check if @fd in @set has an error.
1049  *
1050  * Returns: %TRUE if the descriptor has an error.
1051  *
1052  * Since: 0.10.18
1053  */
1054 gboolean
1055 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1056 {
1057   gboolean res = FALSE;
1058   gint idx;
1059
1060   g_return_val_if_fail (set != NULL, FALSE);
1061   g_return_val_if_fail (fd != NULL, FALSE);
1062   g_return_val_if_fail (fd->fd >= 0, FALSE);
1063
1064   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1065
1066   g_mutex_lock (set->lock);
1067
1068   idx = find_index (set->active_fds, fd);
1069   if (idx >= 0) {
1070 #ifndef G_OS_WIN32
1071     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1072
1073     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1074 #else
1075     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1076
1077     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1078         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1079         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1080         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1081         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1082 #endif
1083   } else {
1084     GST_WARNING ("%p: couldn't find fd !", set);
1085   }
1086
1087   g_mutex_unlock (set->lock);
1088
1089   return res;
1090 }
1091
1092 static gboolean
1093 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1094 {
1095   gboolean res = FALSE;
1096   gint idx;
1097
1098   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1099
1100   idx = find_index (set->active_fds, fd);
1101   if (idx >= 0) {
1102 #ifndef G_OS_WIN32
1103     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1104
1105     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1106 #else
1107     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1108
1109     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1110 #endif
1111   } else {
1112     GST_WARNING ("%p: couldn't find fd !", set);
1113   }
1114
1115   return res;
1116 }
1117
1118 /**
1119  * gst_poll_fd_can_read:
1120  * @set: a file descriptor set.
1121  * @fd: a file descriptor.
1122  *
1123  * Check if @fd in @set has data to be read.
1124  *
1125  * Returns: %TRUE if the descriptor has data to be read.
1126  *
1127  * Since: 0.10.18
1128  */
1129 gboolean
1130 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1131 {
1132   gboolean res = FALSE;
1133
1134   g_return_val_if_fail (set != NULL, FALSE);
1135   g_return_val_if_fail (fd != NULL, FALSE);
1136   g_return_val_if_fail (fd->fd >= 0, FALSE);
1137
1138   g_mutex_lock (set->lock);
1139
1140   res = gst_poll_fd_can_read_unlocked (set, fd);
1141
1142   g_mutex_unlock (set->lock);
1143
1144   return res;
1145 }
1146
1147 /**
1148  * gst_poll_fd_can_write:
1149  * @set: a file descriptor set.
1150  * @fd: a file descriptor.
1151  *
1152  * Check if @fd in @set can be used for writing.
1153  *
1154  * Returns: %TRUE if the descriptor can be used for writing.
1155  *
1156  * Since: 0.10.18
1157  */
1158 gboolean
1159 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1160 {
1161   gboolean res = FALSE;
1162   gint idx;
1163
1164   g_return_val_if_fail (set != NULL, FALSE);
1165   g_return_val_if_fail (fd != NULL, FALSE);
1166   g_return_val_if_fail (fd->fd >= 0, FALSE);
1167
1168   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1169
1170   g_mutex_lock (set->lock);
1171
1172   idx = find_index (set->active_fds, fd);
1173   if (idx >= 0) {
1174 #ifndef G_OS_WIN32
1175     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1176
1177     res = (pfd->revents & POLLOUT) != 0;
1178 #else
1179     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1180
1181     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1182 #endif
1183   } else {
1184     GST_WARNING ("%p: couldn't find fd !", set);
1185   }
1186
1187   g_mutex_unlock (set->lock);
1188
1189   return res;
1190 }
1191
1192 /**
1193  * gst_poll_wait:
1194  * @set: a #GstPoll.
1195  * @timeout: a timeout in nanoseconds.
1196  *
1197  * Wait for activity on the file descriptors in @set. This function waits up to
1198  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1199  *
1200  * For #GstPoll objects created with gst_poll_new(), this function can only be
1201  * called from a single thread at a time.  If called from multiple threads,
1202  * -1 will be returned with errno set to EPERM.
1203  *
1204  * This is not true for timer #GstPoll objects created with
1205  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1206  * simultaneously.
1207  *
1208  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1209  * activity was detected after @timeout. If an error occurs, -1 is returned
1210  * and errno is set.
1211  *
1212  * Since: 0.10.18
1213  */
1214 gint
1215 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1216 {
1217   gboolean restarting;
1218   gboolean is_timer;
1219   int res;
1220   gint old_waiting;
1221
1222   g_return_val_if_fail (set != NULL, -1);
1223
1224   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1225
1226   is_timer = set->timer;
1227
1228   /* add one more waiter */
1229   old_waiting = INC_WAITING (set);
1230
1231   /* we cannot wait from multiple threads unless we are a timer */
1232   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1233     goto already_waiting;
1234
1235   /* flushing, exit immediately */
1236   if (G_UNLIKELY (IS_FLUSHING (set)))
1237     goto flushing;
1238
1239   do {
1240     GstPollMode mode;
1241
1242     res = -1;
1243     restarting = FALSE;
1244
1245     mode = choose_mode (set, timeout);
1246
1247     if (TEST_REBUILD (set)) {
1248       g_mutex_lock (set->lock);
1249 #ifndef G_OS_WIN32
1250       g_array_set_size (set->active_fds, set->fds->len);
1251       memcpy (set->active_fds->data, set->fds->data,
1252           set->fds->len * sizeof (struct pollfd));
1253 #else
1254       if (!gst_poll_prepare_winsock_active_sets (set))
1255         goto winsock_error;
1256 #endif
1257       g_mutex_unlock (set->lock);
1258     }
1259
1260     switch (mode) {
1261       case GST_POLL_MODE_AUTO:
1262         g_assert_not_reached ();
1263         break;
1264       case GST_POLL_MODE_PPOLL:
1265       {
1266 #ifdef HAVE_PPOLL
1267         struct timespec ts;
1268         struct timespec *tsptr;
1269
1270         if (timeout != GST_CLOCK_TIME_NONE) {
1271           GST_TIME_TO_TIMESPEC (timeout, ts);
1272           tsptr = &ts;
1273         } else {
1274           tsptr = NULL;
1275         }
1276
1277         res =
1278             ppoll ((struct pollfd *) set->active_fds->data,
1279             set->active_fds->len, tsptr, NULL);
1280 #else
1281         g_assert_not_reached ();
1282         errno = ENOSYS;
1283 #endif
1284         break;
1285       }
1286       case GST_POLL_MODE_POLL:
1287       {
1288 #ifdef HAVE_POLL
1289         gint t;
1290
1291         if (timeout != GST_CLOCK_TIME_NONE) {
1292           t = GST_TIME_AS_MSECONDS (timeout);
1293         } else {
1294           t = -1;
1295         }
1296
1297         res =
1298             poll ((struct pollfd *) set->active_fds->data,
1299             set->active_fds->len, t);
1300 #else
1301         g_assert_not_reached ();
1302         errno = ENOSYS;
1303 #endif
1304         break;
1305       }
1306       case GST_POLL_MODE_PSELECT:
1307 #ifndef HAVE_PSELECT
1308       {
1309         g_assert_not_reached ();
1310         errno = ENOSYS;
1311         break;
1312       }
1313 #endif
1314       case GST_POLL_MODE_SELECT:
1315       {
1316 #ifndef G_OS_WIN32
1317         fd_set readfds;
1318         fd_set writefds;
1319         fd_set errorfds;
1320         gint max_fd;
1321
1322         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1323
1324         if (mode == GST_POLL_MODE_SELECT) {
1325           struct timeval tv;
1326           struct timeval *tvptr;
1327
1328           if (timeout != GST_CLOCK_TIME_NONE) {
1329             GST_TIME_TO_TIMEVAL (timeout, tv);
1330             tvptr = &tv;
1331           } else {
1332             tvptr = NULL;
1333           }
1334
1335           GST_DEBUG ("Calling select");
1336           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1337           GST_DEBUG ("After select, res:%d", res);
1338         } else {
1339 #ifdef HAVE_PSELECT
1340           struct timespec ts;
1341           struct timespec *tsptr;
1342
1343           if (timeout != GST_CLOCK_TIME_NONE) {
1344             GST_TIME_TO_TIMESPEC (timeout, ts);
1345             tsptr = &ts;
1346           } else {
1347             tsptr = NULL;
1348           }
1349
1350           GST_DEBUG ("Calling pselect");
1351           res =
1352               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1353           GST_DEBUG ("After pselect, res:%d", res);
1354 #endif
1355         }
1356
1357         if (res >= 0) {
1358           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1359         }
1360 #else /* G_OS_WIN32 */
1361         g_assert_not_reached ();
1362         errno = ENOSYS;
1363 #endif
1364         break;
1365       }
1366       case GST_POLL_MODE_WINDOWS:
1367       {
1368 #ifdef G_OS_WIN32
1369         gint ignore_count = set->active_fds_ignored->len;
1370         DWORD t, wait_ret;
1371
1372         if (G_LIKELY (ignore_count == 0)) {
1373           if (timeout != GST_CLOCK_TIME_NONE)
1374             t = GST_TIME_AS_MSECONDS (timeout);
1375           else
1376             t = INFINITE;
1377         } else {
1378           /* already one or more ignored fds, so we quickly sweep the others */
1379           t = 0;
1380         }
1381
1382         if (set->active_events->len != 0) {
1383           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1384               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1385         } else {
1386           wait_ret = WSA_WAIT_FAILED;
1387           WSASetLastError (WSA_INVALID_PARAMETER);
1388         }
1389
1390         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1391           res = 0;
1392         } else if (wait_ret == WSA_WAIT_FAILED) {
1393           res = -1;
1394           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1395         } else {
1396           /* the first entry is the wakeup event */
1397           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1398             res = gst_poll_collect_winsock_events (set);
1399           } else {
1400             res = 1;            /* wakeup event */
1401           }
1402         }
1403 #else
1404         g_assert_not_reached ();
1405         errno = ENOSYS;
1406 #endif
1407         break;
1408       }
1409     }
1410
1411     if (!is_timer) {
1412       /* Applications needs to clear the control socket themselves for timer
1413        * polls.
1414        * For other polls, we need to clear the control socket. If there was only
1415        * one socket with activity and it was the control socket, we need to
1416        * restart */
1417       if (release_all_wakeup (set) > 0 && res == 1)
1418         restarting = TRUE;
1419     }
1420
1421     if (G_UNLIKELY (IS_FLUSHING (set))) {
1422       /* we got woken up and we are flushing, we need to stop */
1423       errno = EBUSY;
1424       res = -1;
1425       break;
1426     }
1427   } while (G_UNLIKELY (restarting));
1428
1429   DEC_WAITING (set);
1430
1431   return res;
1432
1433   /* ERRORS */
1434 already_waiting:
1435   {
1436     DEC_WAITING (set);
1437     errno = EPERM;
1438     return -1;
1439   }
1440 flushing:
1441   {
1442     DEC_WAITING (set);
1443     errno = EBUSY;
1444     return -1;
1445   }
1446 #ifdef G_OS_WIN32
1447 winsock_error:
1448   {
1449     g_mutex_unlock (set->lock);
1450     DEC_WAITING (set);
1451     return -1;
1452   }
1453 #endif
1454 }
1455
1456 /**
1457  * gst_poll_set_controllable:
1458  * @set: a #GstPoll.
1459  * @controllable: new controllable state.
1460  *
1461  * When @controllable is %TRUE, this function ensures that future calls to
1462  * gst_poll_wait() will be affected by gst_poll_restart() and
1463  * gst_poll_set_flushing().
1464  *
1465  * Returns: %TRUE if the controllability of @set could be updated.
1466  *
1467  * Since: 0.10.18
1468  */
1469 gboolean
1470 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1471 {
1472   g_return_val_if_fail (set != NULL, FALSE);
1473
1474   GST_LOG ("%p: controllable : %d", set, controllable);
1475
1476   set->controllable = controllable;
1477
1478   return TRUE;
1479 }
1480
1481 /**
1482  * gst_poll_restart:
1483  * @set: a #GstPoll.
1484  *
1485  * Restart any gst_poll_wait() that is in progress. This function is typically
1486  * used after adding or removing descriptors to @set.
1487  *
1488  * If @set is not controllable, then this call will have no effect.
1489  *
1490  * Since: 0.10.18
1491  */
1492 void
1493 gst_poll_restart (GstPoll * set)
1494 {
1495   g_return_if_fail (set != NULL);
1496
1497   if (set->controllable && GET_WAITING (set) > 0) {
1498     /* we are controllable and waiting, wake up the waiter. The socket will be
1499      * cleared by the _wait() thread and the poll will be restarted */
1500     raise_wakeup (set);
1501   }
1502 }
1503
1504 /**
1505  * gst_poll_set_flushing:
1506  * @set: a #GstPoll.
1507  * @flushing: new flushing state.
1508  *
1509  * When @flushing is %TRUE, this function ensures that current and future calls
1510  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1511  *
1512  * Unsetting the flushing state will restore normal operation of @set.
1513  *
1514  * Since: 0.10.18
1515  */
1516 void
1517 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1518 {
1519   g_return_if_fail (set != NULL);
1520
1521   /* update the new state first */
1522   SET_FLUSHING (set, flushing);
1523
1524   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1525     /* we are flushing, controllable and waiting, wake up the waiter. When we
1526      * stop the flushing operation we don't clear the wakeup fd here, this will
1527      * happen in the _wait() thread. */
1528     raise_wakeup (set);
1529   }
1530 }
1531
1532 /**
1533  * gst_poll_write_control:
1534  * @set: a #GstPoll.
1535  *
1536  * Write a byte to the control socket of the controllable @set.
1537  * This function is mostly useful for timer #GstPoll objects created with
1538  * gst_poll_new_timer(). 
1539  *
1540  * It will make any current and future gst_poll_wait() function return with
1541  * 1, meaning the control socket is set. After an equal amount of calls to
1542  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1543  * block again until their timeout expired.
1544  *
1545  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1546  * byte could not be written.
1547  *
1548  * Since: 0.10.23
1549  */
1550 gboolean
1551 gst_poll_write_control (GstPoll * set)
1552 {
1553   gboolean res;
1554
1555   g_return_val_if_fail (set != NULL, FALSE);
1556   g_return_val_if_fail (set->timer, FALSE);
1557
1558   res = raise_wakeup (set);
1559
1560   return res;
1561 }
1562
1563 /**
1564  * gst_poll_read_control:
1565  * @set: a #GstPoll.
1566  *
1567  * Read a byte from the control socket of the controllable @set.
1568  * This function is mostly useful for timer #GstPoll objects created with
1569  * gst_poll_new_timer(). 
1570  *
1571  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1572  * was no byte to read.
1573  *
1574  * Since: 0.10.23
1575  */
1576 gboolean
1577 gst_poll_read_control (GstPoll * set)
1578 {
1579   gboolean res;
1580
1581   g_return_val_if_fail (set != NULL, FALSE);
1582   g_return_val_if_fail (set->timer, FALSE);
1583
1584   res = release_wakeup (set);
1585
1586   return res;
1587 }