Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writeable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writeable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writeable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #define EINPROGRESS WSAEINPROGRESS
76 #else
77 #define _GNU_SOURCE 1
78 #include <sys/poll.h>
79 #include <sys/time.h>
80 #include <sys/socket.h>
81 #endif
82
83 /* OS/X needs this because of bad headers */
84 #include <string.h>
85
86 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
87  * so we prefer our own poll emulation.
88  */
89 #if defined(BROKEN_POLL)
90 #undef HAVE_POLL
91 #endif
92
93 #include "gstpoll.h"
94
95 #define GST_CAT_DEFAULT GST_CAT_POLL
96
97 #ifdef G_OS_WIN32
98 typedef struct _WinsockFd WinsockFd;
99
100 struct _WinsockFd
101 {
102   gint fd;
103   glong event_mask;
104   WSANETWORKEVENTS events;
105   glong ignored_event_mask;
106 };
107 #endif
108
109 typedef enum
110 {
111   GST_POLL_MODE_AUTO,
112   GST_POLL_MODE_SELECT,
113   GST_POLL_MODE_PSELECT,
114   GST_POLL_MODE_POLL,
115   GST_POLL_MODE_PPOLL,
116   GST_POLL_MODE_WINDOWS
117 } GstPollMode;
118
119 struct _GstPoll
120 {
121   GstPollMode mode;
122
123   GMutex *lock;
124   /* array of fds, always written to and read from with lock */
125   GArray *fds;
126   /* array of active fds, only written to from the waiting thread with the
127    * lock and read from with the lock or without the lock from the waiting
128    * thread */
129   GArray *active_fds;
130
131 #ifndef G_OS_WIN32
132   gchar buf[1];
133   GstPollFD control_read_fd;
134   GstPollFD control_write_fd;
135 #else
136   GArray *active_fds_ignored;
137   GArray *events;
138   GArray *active_events;
139
140   HANDLE wakeup_event;
141 #endif
142
143   gboolean controllable;
144   volatile gint waiting;
145   volatile gint control_pending;
146   volatile gint flushing;
147   gboolean timer;
148   volatile gint rebuild;
149 };
150
151 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
152     gboolean active);
153 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
154
155 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
156 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
157
158 #define INC_WAITING(s)      (G_ATOMIC_INT_ADD(&(s)->waiting, 1))
159 #define DEC_WAITING(s)      (G_ATOMIC_INT_ADD(&(s)->waiting, -1))
160 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
161
162 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
163 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
164
165 #ifndef G_OS_WIN32
166 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
167 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
168 #else
169 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
170 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
171 #endif
172
173 /* the poll/select call is also performed on a control socket, that way
174  * we can send special commands to control it */
175 static inline gboolean
176 raise_wakeup (GstPoll * set)
177 {
178   gboolean result = TRUE;
179
180   if (G_ATOMIC_INT_ADD (&set->control_pending, 1) == 0) {
181     /* raise when nothing pending */
182     GST_LOG ("%p: raise", set);
183     result = WAKE_EVENT (set);
184   }
185   return result;
186 }
187
188 /* note how bad things can happen when the 2 threads both raise and release the
189  * wakeup. This is however not a problem because you must always pair a raise
190  * with a release */
191 static inline gboolean
192 release_wakeup (GstPoll * set)
193 {
194   gboolean result = TRUE;
195
196   if (g_atomic_int_dec_and_test (&set->control_pending)) {
197     GST_LOG ("%p: release", set);
198     result = RELEASE_EVENT (set);
199   }
200   return result;
201 }
202
203 static inline gint
204 release_all_wakeup (GstPoll * set)
205 {
206   gint old;
207
208   while (TRUE) {
209     if (!(old = g_atomic_int_get (&set->control_pending)))
210       /* nothing pending, just exit */
211       break;
212
213     /* try to remove all pending control messages */
214     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
215       /* we managed to remove all messages, read the control socket */
216       if (RELEASE_EVENT (set))
217         break;
218       else
219         /* retry again until we read it successfully */
220         G_ATOMIC_INT_ADD (&set->control_pending, 1);
221     }
222   }
223   return old;
224 }
225
226 static gint
227 find_index (GArray * array, GstPollFD * fd)
228 {
229 #ifndef G_OS_WIN32
230   struct pollfd *ifd;
231 #else
232   WinsockFd *ifd;
233 #endif
234   guint i;
235
236   /* start by assuming the index found in the fd is still valid */
237   if (fd->idx >= 0 && fd->idx < array->len) {
238 #ifndef G_OS_WIN32
239     ifd = &g_array_index (array, struct pollfd, fd->idx);
240 #else
241     ifd = &g_array_index (array, WinsockFd, fd->idx);
242 #endif
243
244     if (ifd->fd == fd->fd) {
245       return fd->idx;
246     }
247   }
248
249   /* the pollfd array has changed and we need to lookup the fd again */
250   for (i = 0; i < array->len; i++) {
251 #ifndef G_OS_WIN32
252     ifd = &g_array_index (array, struct pollfd, i);
253 #else
254     ifd = &g_array_index (array, WinsockFd, i);
255 #endif
256
257     if (ifd->fd == fd->fd) {
258       fd->idx = (gint) i;
259       return fd->idx;
260     }
261   }
262
263   fd->idx = -1;
264   return fd->idx;
265 }
266
267 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
268 /* check if all file descriptors will fit in an fd_set */
269 static gboolean
270 selectable_fds (const GstPoll * set)
271 {
272   guint i;
273
274   g_mutex_lock (set->lock);
275   for (i = 0; i < set->fds->len; i++) {
276     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
277
278     if (pfd->fd >= FD_SETSIZE)
279       goto too_many;
280   }
281   g_mutex_unlock (set->lock);
282
283   return TRUE;
284
285 too_many:
286   {
287     g_mutex_unlock (set->lock);
288     return FALSE;
289   }
290 }
291
292 /* check if the timeout will convert to a timeout value used for poll()
293  * without a loss of precision
294  */
295 static gboolean
296 pollable_timeout (GstClockTime timeout)
297 {
298   if (timeout == GST_CLOCK_TIME_NONE)
299     return TRUE;
300
301   /* not a nice multiple of milliseconds */
302   if (timeout % 1000000)
303     return FALSE;
304
305   return TRUE;
306 }
307 #endif
308
309 static GstPollMode
310 choose_mode (const GstPoll * set, GstClockTime timeout)
311 {
312   GstPollMode mode;
313
314   if (set->mode == GST_POLL_MODE_AUTO) {
315 #ifdef HAVE_PPOLL
316     mode = GST_POLL_MODE_PPOLL;
317 #elif defined(HAVE_POLL)
318     if (!selectable_fds (set) || pollable_timeout (timeout)) {
319       mode = GST_POLL_MODE_POLL;
320     } else {
321 #ifdef HAVE_PSELECT
322       mode = GST_POLL_MODE_PSELECT;
323 #else
324       mode = GST_POLL_MODE_SELECT;
325 #endif
326     }
327 #elif defined(HAVE_PSELECT)
328     mode = GST_POLL_MODE_PSELECT;
329 #else
330     mode = GST_POLL_MODE_SELECT;
331 #endif
332   } else {
333     mode = set->mode;
334   }
335   return mode;
336 }
337
338 #ifndef G_OS_WIN32
339 static gint
340 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
341     fd_set * errorfds)
342 {
343   gint max_fd = -1;
344   guint i;
345
346   FD_ZERO (readfds);
347   FD_ZERO (writefds);
348   FD_ZERO (errorfds);
349
350   g_mutex_lock (set->lock);
351
352   for (i = 0; i < set->active_fds->len; i++) {
353     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
354
355     if (pfd->fd < FD_SETSIZE) {
356       if (pfd->events & POLLIN)
357         FD_SET (pfd->fd, readfds);
358       if (pfd->events & POLLOUT)
359         FD_SET (pfd->fd, writefds);
360       if (pfd->events)
361         FD_SET (pfd->fd, errorfds);
362       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
363         max_fd = pfd->fd;
364     }
365   }
366
367   g_mutex_unlock (set->lock);
368
369   return max_fd;
370 }
371
372 static void
373 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
374     fd_set * errorfds)
375 {
376   guint i;
377
378   g_mutex_lock (set->lock);
379
380   for (i = 0; i < set->active_fds->len; i++) {
381     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
382
383     if (pfd->fd < FD_SETSIZE) {
384       pfd->revents = 0;
385       if (FD_ISSET (pfd->fd, readfds))
386         pfd->revents |= POLLIN;
387       if (FD_ISSET (pfd->fd, writefds))
388         pfd->revents |= POLLOUT;
389       if (FD_ISSET (pfd->fd, errorfds))
390         pfd->revents |= POLLERR;
391     }
392   }
393
394   g_mutex_unlock (set->lock);
395 }
396 #else /* G_OS_WIN32 */
397 /*
398  * Translate errors thrown by the Winsock API used by GstPoll:
399  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
400  */
401 static gint
402 gst_poll_winsock_error_to_errno (DWORD last_error)
403 {
404   switch (last_error) {
405     case WSA_INVALID_HANDLE:
406     case WSAEINVAL:
407     case WSAENOTSOCK:
408       return EBADF;
409
410     case WSA_NOT_ENOUGH_MEMORY:
411       return ENOMEM;
412
413       /*
414        * Anything else, including:
415        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
416        *   WSANOTINITIALISED
417        */
418     default:
419       return EINVAL;
420   }
421 }
422
423 static void
424 gst_poll_free_winsock_event (GstPoll * set, gint idx)
425 {
426   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
427   HANDLE event = g_array_index (set->events, HANDLE, idx);
428
429   WSAEventSelect (wfd->fd, event, 0);
430   CloseHandle (event);
431 }
432
433 static void
434 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
435     gboolean active)
436 {
437   WinsockFd *wfd;
438
439   wfd = &g_array_index (set->fds, WinsockFd, idx);
440
441   if (active)
442     wfd->event_mask |= flags;
443   else
444     wfd->event_mask &= ~flags;
445
446   /* reset ignored state if the new mask doesn't overlap at all */
447   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
448     wfd->ignored_event_mask = 0;
449 }
450
451 static gboolean
452 gst_poll_prepare_winsock_active_sets (GstPoll * set)
453 {
454   guint i;
455
456   g_array_set_size (set->active_fds, 0);
457   g_array_set_size (set->active_fds_ignored, 0);
458   g_array_set_size (set->active_events, 0);
459   g_array_append_val (set->active_events, set->wakeup_event);
460
461   for (i = 0; i < set->fds->len; i++) {
462     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
463     HANDLE event = g_array_index (set->events, HANDLE, i);
464
465     if (wfd->ignored_event_mask == 0) {
466       gint ret;
467
468       g_array_append_val (set->active_fds, *wfd);
469       g_array_append_val (set->active_events, event);
470
471       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
472       if (G_UNLIKELY (ret != 0)) {
473         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
474         return FALSE;
475       }
476     } else {
477       g_array_append_val (set->active_fds_ignored, wfd);
478     }
479   }
480
481   return TRUE;
482 }
483
484 static gint
485 gst_poll_collect_winsock_events (GstPoll * set)
486 {
487   gint res, i;
488
489   /*
490    * We need to check which events are signaled, and call
491    * WSAEnumNetworkEvents for those that are, which resets
492    * the event and clears the internal network event records.
493    */
494   res = 0;
495   for (i = 0; i < set->active_fds->len; i++) {
496     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
497     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
498     DWORD wait_ret;
499
500     wait_ret = WaitForSingleObject (event, 0);
501     if (wait_ret == WAIT_OBJECT_0) {
502       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
503
504       if (G_UNLIKELY (enum_ret != 0)) {
505         res = -1;
506         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
507         break;
508       }
509
510       res++;
511     } else {
512       /* clear any previously stored result */
513       memset (&wfd->events, 0, sizeof (wfd->events));
514     }
515   }
516
517   /* If all went well we also need to reset the ignored fds. */
518   if (res >= 0) {
519     res += set->active_fds_ignored->len;
520
521     for (i = 0; i < set->active_fds_ignored->len; i++) {
522       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
523
524       wfd->ignored_event_mask = 0;
525     }
526
527     g_array_set_size (set->active_fds_ignored, 0);
528   }
529
530   return res;
531 }
532 #endif
533
534 /**
535  * gst_poll_new:
536  * @controllable: whether it should be possible to control a wait.
537  *
538  * Create a new file descriptor set. If @controllable, it
539  * is possible to restart or flush a call to gst_poll_wait() with
540  * gst_poll_restart() and gst_poll_set_flushing() respectively.
541  *
542  * Free-function: gst_poll_free
543  *
544  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
545  *     Free with gst_poll_free().
546  *
547  * Since: 0.10.18
548  */
549 GstPoll *
550 gst_poll_new (gboolean controllable)
551 {
552   GstPoll *nset;
553
554   GST_DEBUG ("controllable : %d", controllable);
555
556   nset = g_slice_new0 (GstPoll);
557   nset->lock = g_mutex_new ();
558 #ifndef G_OS_WIN32
559   nset->mode = GST_POLL_MODE_AUTO;
560   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
561   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
562   nset->control_read_fd.fd = -1;
563   nset->control_write_fd.fd = -1;
564   {
565     gint control_sock[2];
566
567     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
568       goto no_socket_pair;
569
570     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
571     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
572
573     nset->control_read_fd.fd = control_sock[0];
574     nset->control_write_fd.fd = control_sock[1];
575
576     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
577     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
578   }
579 #else
580   nset->mode = GST_POLL_MODE_WINDOWS;
581   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
582   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
583   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
584   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
585   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
586
587   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
588 #endif
589
590   /* ensure (re)build, though already sneakily set in non-windows case */
591   MARK_REBUILD (nset);
592
593   nset->controllable = controllable;
594
595   return nset;
596
597   /* ERRORS */
598 #ifndef G_OS_WIN32
599 no_socket_pair:
600   {
601     GST_WARNING ("%p: can't create socket pair !", nset);
602     gst_poll_free (nset);
603     return NULL;
604   }
605 #endif
606 }
607
608 /**
609  * gst_poll_new_timer:
610  *
611  * Create a new poll object that can be used for scheduling cancellable
612  * timeouts.
613  *
614  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
615  * performed from different threads. 
616  *
617  * Free-function: gst_poll_free
618  *
619  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
620  *     Free with gst_poll_free().
621  *
622  * Since: 0.10.23
623  */
624 GstPoll *
625 gst_poll_new_timer (void)
626 {
627   GstPoll *poll;
628
629   /* make a new controllable poll set */
630   if (!(poll = gst_poll_new (TRUE)))
631     goto done;
632
633   /* we are a timer */
634   poll->timer = TRUE;
635
636 done:
637   return poll;
638 }
639
640 /**
641  * gst_poll_free:
642  * @set: (transfer full): a file descriptor set.
643  *
644  * Free a file descriptor set.
645  *
646  * Since: 0.10.18
647  */
648 void
649 gst_poll_free (GstPoll * set)
650 {
651   g_return_if_fail (set != NULL);
652
653   GST_DEBUG ("%p: freeing", set);
654
655 #ifndef G_OS_WIN32
656   if (set->control_write_fd.fd >= 0)
657     close (set->control_write_fd.fd);
658   if (set->control_read_fd.fd >= 0)
659     close (set->control_read_fd.fd);
660 #else
661   CloseHandle (set->wakeup_event);
662
663   {
664     guint i;
665
666     for (i = 0; i < set->events->len; i++)
667       gst_poll_free_winsock_event (set, i);
668   }
669
670   g_array_free (set->active_events, TRUE);
671   g_array_free (set->events, TRUE);
672   g_array_free (set->active_fds_ignored, TRUE);
673 #endif
674
675   g_array_free (set->active_fds, TRUE);
676   g_array_free (set->fds, TRUE);
677   g_mutex_free (set->lock);
678   g_slice_free (GstPoll, set);
679 }
680
681 /**
682  * gst_poll_get_read_gpollfd:
683  * @set: a #GstPoll
684  * @fd: a #GPollFD
685  *
686  * Get a GPollFD for the reading part of the control socket. This is useful when
687  * integrating with a GSource and GMainLoop.
688  *
689  * Since: 0.10.32
690  */
691 void
692 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
693 {
694   g_return_if_fail (set != NULL);
695   g_return_if_fail (fd != NULL);
696
697 #ifndef G_OS_WIN32
698   fd->fd = set->control_read_fd.fd;
699 #else
700 #if GLIB_SIZEOF_VOID_P == 8
701   fd->fd = (gint64) set->wakeup_event;
702 #else
703   fd->fd = (gint) set->wakeup_event;
704 #endif
705 #endif
706   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
707   fd->revents = 0;
708 }
709
710 /**
711  * gst_poll_fd_init:
712  * @fd: a #GstPollFD
713  *
714  * Initializes @fd. Alternatively you can initialize it with
715  * #GST_POLL_FD_INIT.
716  *
717  * Since: 0.10.18
718  */
719 void
720 gst_poll_fd_init (GstPollFD * fd)
721 {
722   g_return_if_fail (fd != NULL);
723
724   fd->fd = -1;
725   fd->idx = -1;
726 }
727
728 static gboolean
729 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
730 {
731   gint idx;
732
733   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
734
735   idx = find_index (set->fds, fd);
736   if (idx < 0) {
737 #ifndef G_OS_WIN32
738     struct pollfd nfd;
739
740     nfd.fd = fd->fd;
741     nfd.events = POLLERR | POLLNVAL | POLLHUP;
742     nfd.revents = 0;
743
744     g_array_append_val (set->fds, nfd);
745
746     fd->idx = set->fds->len - 1;
747 #else
748     WinsockFd wfd;
749     HANDLE event;
750
751     wfd.fd = fd->fd;
752     wfd.event_mask = FD_CLOSE;
753     memset (&wfd.events, 0, sizeof (wfd.events));
754     wfd.ignored_event_mask = 0;
755     event = WSACreateEvent ();
756
757     g_array_append_val (set->fds, wfd);
758     g_array_append_val (set->events, event);
759
760     fd->idx = set->fds->len - 1;
761 #endif
762     MARK_REBUILD (set);
763   } else {
764     GST_WARNING ("%p: couldn't find fd !", set);
765   }
766
767   return TRUE;
768 }
769
770 /**
771  * gst_poll_add_fd:
772  * @set: a file descriptor set.
773  * @fd: a file descriptor.
774  *
775  * Add a file descriptor to the file descriptor set.
776  *
777  * Returns: %TRUE if the file descriptor was successfully added to the set.
778  *
779  * Since: 0.10.18
780  */
781 gboolean
782 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
783 {
784   gboolean ret;
785
786   g_return_val_if_fail (set != NULL, FALSE);
787   g_return_val_if_fail (fd != NULL, FALSE);
788   g_return_val_if_fail (fd->fd >= 0, FALSE);
789
790   g_mutex_lock (set->lock);
791
792   ret = gst_poll_add_fd_unlocked (set, fd);
793
794   g_mutex_unlock (set->lock);
795
796   return ret;
797 }
798
799 /**
800  * gst_poll_remove_fd:
801  * @set: a file descriptor set.
802  * @fd: a file descriptor.
803  *
804  * Remove a file descriptor from the file descriptor set.
805  *
806  * Returns: %TRUE if the file descriptor was successfully removed from the set.
807  *
808  * Since: 0.10.18
809  */
810 gboolean
811 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
812 {
813   gint idx;
814
815   g_return_val_if_fail (set != NULL, FALSE);
816   g_return_val_if_fail (fd != NULL, FALSE);
817   g_return_val_if_fail (fd->fd >= 0, FALSE);
818
819
820   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
821
822   g_mutex_lock (set->lock);
823
824   /* get the index, -1 is an fd that is not added */
825   idx = find_index (set->fds, fd);
826   if (idx >= 0) {
827 #ifdef G_OS_WIN32
828     gst_poll_free_winsock_event (set, idx);
829     g_array_remove_index_fast (set->events, idx);
830 #endif
831
832     /* remove the fd at index, we use _remove_index_fast, which copies the last
833      * element of the array to the freed index */
834     g_array_remove_index_fast (set->fds, idx);
835
836     /* mark fd as removed by setting the index to -1 */
837     fd->idx = -1;
838     MARK_REBUILD (set);
839   } else {
840     GST_WARNING ("%p: couldn't find fd !", set);
841   }
842
843   g_mutex_unlock (set->lock);
844
845   return idx >= 0;
846 }
847
848 /**
849  * gst_poll_fd_ctl_write:
850  * @set: a file descriptor set.
851  * @fd: a file descriptor.
852  * @active: a new status.
853  *
854  * Control whether the descriptor @fd in @set will be monitored for
855  * writability.
856  *
857  * Returns: %TRUE if the descriptor was successfully updated.
858  *
859  * Since: 0.10.18
860  */
861 gboolean
862 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
863 {
864   gint idx;
865
866   g_return_val_if_fail (set != NULL, FALSE);
867   g_return_val_if_fail (fd != NULL, FALSE);
868   g_return_val_if_fail (fd->fd >= 0, FALSE);
869
870   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
871       fd->fd, fd->idx, active);
872
873   g_mutex_lock (set->lock);
874
875   idx = find_index (set->fds, fd);
876   if (idx >= 0) {
877 #ifndef G_OS_WIN32
878     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
879
880     if (active)
881       pfd->events |= POLLOUT;
882     else
883       pfd->events &= ~POLLOUT;
884
885     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
886 #else
887     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
888         active);
889 #endif
890     MARK_REBUILD (set);
891   } else {
892     GST_WARNING ("%p: couldn't find fd !", set);
893   }
894
895   g_mutex_unlock (set->lock);
896
897   return idx >= 0;
898 }
899
900 static gboolean
901 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
902 {
903   gint idx;
904
905   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
906       fd->fd, fd->idx, active);
907
908   idx = find_index (set->fds, fd);
909
910   if (idx >= 0) {
911 #ifndef G_OS_WIN32
912     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
913
914     if (active)
915       pfd->events |= (POLLIN | POLLPRI);
916     else
917       pfd->events &= ~(POLLIN | POLLPRI);
918 #else
919     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
920 #endif
921     MARK_REBUILD (set);
922   } else {
923     GST_WARNING ("%p: couldn't find fd !", set);
924   }
925
926   return idx >= 0;
927 }
928
929 /**
930  * gst_poll_fd_ctl_read:
931  * @set: a file descriptor set.
932  * @fd: a file descriptor.
933  * @active: a new status.
934  *
935  * Control whether the descriptor @fd in @set will be monitored for
936  * readability.
937  *
938  * Returns: %TRUE if the descriptor was successfully updated.
939  *
940  * Since: 0.10.18
941  */
942 gboolean
943 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
944 {
945   gboolean ret;
946
947   g_return_val_if_fail (set != NULL, FALSE);
948   g_return_val_if_fail (fd != NULL, FALSE);
949   g_return_val_if_fail (fd->fd >= 0, FALSE);
950
951   g_mutex_lock (set->lock);
952
953   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
954
955   g_mutex_unlock (set->lock);
956
957   return ret;
958 }
959
960 /**
961  * gst_poll_fd_ignored:
962  * @set: a file descriptor set.
963  * @fd: a file descriptor.
964  *
965  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
966  * the same result for @fd as last time. This function must be called if no
967  * operation (read/write/recv/send/etc.) will be performed on @fd before
968  * the next call to gst_poll_wait().
969  *
970  * The reason why this is needed is because the underlying implementation
971  * might not allow querying the fd more than once between calls to one of
972  * the re-enabling operations.
973  *
974  * Since: 0.10.18
975  */
976 void
977 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
978 {
979 #ifdef G_OS_WIN32
980   gint idx;
981
982   g_return_if_fail (set != NULL);
983   g_return_if_fail (fd != NULL);
984   g_return_if_fail (fd->fd >= 0);
985
986   g_mutex_lock (set->lock);
987
988   idx = find_index (set->fds, fd);
989   if (idx >= 0) {
990     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
991
992     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
993     MARK_REBUILD (set);
994   }
995
996   g_mutex_unlock (set->lock);
997 #endif
998 }
999
1000 /**
1001  * gst_poll_fd_has_closed:
1002  * @set: a file descriptor set.
1003  * @fd: a file descriptor.
1004  *
1005  * Check if @fd in @set has closed the connection.
1006  *
1007  * Returns: %TRUE if the connection was closed.
1008  *
1009  * Since: 0.10.18
1010  */
1011 gboolean
1012 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1013 {
1014   gboolean res = FALSE;
1015   gint idx;
1016
1017   g_return_val_if_fail (set != NULL, FALSE);
1018   g_return_val_if_fail (fd != NULL, FALSE);
1019   g_return_val_if_fail (fd->fd >= 0, FALSE);
1020
1021   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1022
1023   g_mutex_lock (set->lock);
1024
1025   idx = find_index (set->active_fds, fd);
1026   if (idx >= 0) {
1027 #ifndef G_OS_WIN32
1028     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1029
1030     res = (pfd->revents & POLLHUP) != 0;
1031 #else
1032     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1033
1034     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1035 #endif
1036   } else {
1037     GST_WARNING ("%p: couldn't find fd !", set);
1038   }
1039
1040   g_mutex_unlock (set->lock);
1041
1042   return res;
1043 }
1044
1045 /**
1046  * gst_poll_fd_has_error:
1047  * @set: a file descriptor set.
1048  * @fd: a file descriptor.
1049  *
1050  * Check if @fd in @set has an error.
1051  *
1052  * Returns: %TRUE if the descriptor has an error.
1053  *
1054  * Since: 0.10.18
1055  */
1056 gboolean
1057 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1058 {
1059   gboolean res = FALSE;
1060   gint idx;
1061
1062   g_return_val_if_fail (set != NULL, FALSE);
1063   g_return_val_if_fail (fd != NULL, FALSE);
1064   g_return_val_if_fail (fd->fd >= 0, FALSE);
1065
1066   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1067
1068   g_mutex_lock (set->lock);
1069
1070   idx = find_index (set->active_fds, fd);
1071   if (idx >= 0) {
1072 #ifndef G_OS_WIN32
1073     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1074
1075     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1076 #else
1077     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1078
1079     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1080         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1081         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1082         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1083         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1084 #endif
1085   } else {
1086     GST_WARNING ("%p: couldn't find fd !", set);
1087   }
1088
1089   g_mutex_unlock (set->lock);
1090
1091   return res;
1092 }
1093
1094 static gboolean
1095 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1096 {
1097   gboolean res = FALSE;
1098   gint idx;
1099
1100   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1101
1102   idx = find_index (set->active_fds, fd);
1103   if (idx >= 0) {
1104 #ifndef G_OS_WIN32
1105     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1106
1107     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1108 #else
1109     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1110
1111     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1112 #endif
1113   } else {
1114     GST_WARNING ("%p: couldn't find fd !", set);
1115   }
1116
1117   return res;
1118 }
1119
1120 /**
1121  * gst_poll_fd_can_read:
1122  * @set: a file descriptor set.
1123  * @fd: a file descriptor.
1124  *
1125  * Check if @fd in @set has data to be read.
1126  *
1127  * Returns: %TRUE if the descriptor has data to be read.
1128  *
1129  * Since: 0.10.18
1130  */
1131 gboolean
1132 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1133 {
1134   gboolean res = FALSE;
1135
1136   g_return_val_if_fail (set != NULL, FALSE);
1137   g_return_val_if_fail (fd != NULL, FALSE);
1138   g_return_val_if_fail (fd->fd >= 0, FALSE);
1139
1140   g_mutex_lock (set->lock);
1141
1142   res = gst_poll_fd_can_read_unlocked (set, fd);
1143
1144   g_mutex_unlock (set->lock);
1145
1146   return res;
1147 }
1148
1149 /**
1150  * gst_poll_fd_can_write:
1151  * @set: a file descriptor set.
1152  * @fd: a file descriptor.
1153  *
1154  * Check if @fd in @set can be used for writing.
1155  *
1156  * Returns: %TRUE if the descriptor can be used for writing.
1157  *
1158  * Since: 0.10.18
1159  */
1160 gboolean
1161 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1162 {
1163   gboolean res = FALSE;
1164   gint idx;
1165
1166   g_return_val_if_fail (set != NULL, FALSE);
1167   g_return_val_if_fail (fd != NULL, FALSE);
1168   g_return_val_if_fail (fd->fd >= 0, FALSE);
1169
1170   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1171
1172   g_mutex_lock (set->lock);
1173
1174   idx = find_index (set->active_fds, fd);
1175   if (idx >= 0) {
1176 #ifndef G_OS_WIN32
1177     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1178
1179     res = (pfd->revents & POLLOUT) != 0;
1180 #else
1181     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1182
1183     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1184 #endif
1185   } else {
1186     GST_WARNING ("%p: couldn't find fd !", set);
1187   }
1188
1189   g_mutex_unlock (set->lock);
1190
1191   return res;
1192 }
1193
1194 /**
1195  * gst_poll_wait:
1196  * @set: a #GstPoll.
1197  * @timeout: a timeout in nanoseconds.
1198  *
1199  * Wait for activity on the file descriptors in @set. This function waits up to
1200  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1201  *
1202  * For #GstPoll objects created with gst_poll_new(), this function can only be
1203  * called from a single thread at a time.  If called from multiple threads,
1204  * -1 will be returned with errno set to EPERM.
1205  *
1206  * This is not true for timer #GstPoll objects created with
1207  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1208  * simultaneously.
1209  *
1210  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1211  * activity was detected after @timeout. If an error occurs, -1 is returned
1212  * and errno is set.
1213  *
1214  * Since: 0.10.18
1215  */
1216 gint
1217 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1218 {
1219   gboolean restarting;
1220   gboolean is_timer;
1221   int res;
1222   gint old_waiting;
1223
1224   g_return_val_if_fail (set != NULL, -1);
1225
1226   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1227
1228   is_timer = set->timer;
1229
1230   /* add one more waiter */
1231   old_waiting = INC_WAITING (set);
1232
1233   /* we cannot wait from multiple threads unless we are a timer */
1234   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1235     goto already_waiting;
1236
1237   /* flushing, exit immediately */
1238   if (G_UNLIKELY (IS_FLUSHING (set)))
1239     goto flushing;
1240
1241   do {
1242     GstPollMode mode;
1243
1244     res = -1;
1245     restarting = FALSE;
1246
1247     mode = choose_mode (set, timeout);
1248
1249     if (TEST_REBUILD (set)) {
1250       g_mutex_lock (set->lock);
1251 #ifndef G_OS_WIN32
1252       g_array_set_size (set->active_fds, set->fds->len);
1253       memcpy (set->active_fds->data, set->fds->data,
1254           set->fds->len * sizeof (struct pollfd));
1255 #else
1256       if (!gst_poll_prepare_winsock_active_sets (set))
1257         goto winsock_error;
1258 #endif
1259       g_mutex_unlock (set->lock);
1260     }
1261
1262     switch (mode) {
1263       case GST_POLL_MODE_AUTO:
1264         g_assert_not_reached ();
1265         break;
1266       case GST_POLL_MODE_PPOLL:
1267       {
1268 #ifdef HAVE_PPOLL
1269         struct timespec ts;
1270         struct timespec *tsptr;
1271
1272         if (timeout != GST_CLOCK_TIME_NONE) {
1273           GST_TIME_TO_TIMESPEC (timeout, ts);
1274           tsptr = &ts;
1275         } else {
1276           tsptr = NULL;
1277         }
1278
1279         res =
1280             ppoll ((struct pollfd *) set->active_fds->data,
1281             set->active_fds->len, tsptr, NULL);
1282 #else
1283         g_assert_not_reached ();
1284         errno = ENOSYS;
1285 #endif
1286         break;
1287       }
1288       case GST_POLL_MODE_POLL:
1289       {
1290 #ifdef HAVE_POLL
1291         gint t;
1292
1293         if (timeout != GST_CLOCK_TIME_NONE) {
1294           t = GST_TIME_AS_MSECONDS (timeout);
1295         } else {
1296           t = -1;
1297         }
1298
1299         res =
1300             poll ((struct pollfd *) set->active_fds->data,
1301             set->active_fds->len, t);
1302 #else
1303         g_assert_not_reached ();
1304         errno = ENOSYS;
1305 #endif
1306         break;
1307       }
1308       case GST_POLL_MODE_PSELECT:
1309 #ifndef HAVE_PSELECT
1310       {
1311         g_assert_not_reached ();
1312         errno = ENOSYS;
1313         break;
1314       }
1315 #endif
1316       case GST_POLL_MODE_SELECT:
1317       {
1318 #ifndef G_OS_WIN32
1319         fd_set readfds;
1320         fd_set writefds;
1321         fd_set errorfds;
1322         gint max_fd;
1323
1324         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1325
1326         if (mode == GST_POLL_MODE_SELECT) {
1327           struct timeval tv;
1328           struct timeval *tvptr;
1329
1330           if (timeout != GST_CLOCK_TIME_NONE) {
1331             GST_TIME_TO_TIMEVAL (timeout, tv);
1332             tvptr = &tv;
1333           } else {
1334             tvptr = NULL;
1335           }
1336
1337           GST_DEBUG ("Calling select");
1338           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1339           GST_DEBUG ("After select, res:%d", res);
1340         } else {
1341 #ifdef HAVE_PSELECT
1342           struct timespec ts;
1343           struct timespec *tsptr;
1344
1345           if (timeout != GST_CLOCK_TIME_NONE) {
1346             GST_TIME_TO_TIMESPEC (timeout, ts);
1347             tsptr = &ts;
1348           } else {
1349             tsptr = NULL;
1350           }
1351
1352           GST_DEBUG ("Calling pselect");
1353           res =
1354               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1355           GST_DEBUG ("After pselect, res:%d", res);
1356 #endif
1357         }
1358
1359         if (res >= 0) {
1360           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1361         }
1362 #else /* G_OS_WIN32 */
1363         g_assert_not_reached ();
1364         errno = ENOSYS;
1365 #endif
1366         break;
1367       }
1368       case GST_POLL_MODE_WINDOWS:
1369       {
1370 #ifdef G_OS_WIN32
1371         gint ignore_count = set->active_fds_ignored->len;
1372         DWORD t, wait_ret;
1373
1374         if (G_LIKELY (ignore_count == 0)) {
1375           if (timeout != GST_CLOCK_TIME_NONE)
1376             t = GST_TIME_AS_MSECONDS (timeout);
1377           else
1378             t = INFINITE;
1379         } else {
1380           /* already one or more ignored fds, so we quickly sweep the others */
1381           t = 0;
1382         }
1383
1384         if (set->active_events->len != 0) {
1385           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1386               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1387         } else {
1388           wait_ret = WSA_WAIT_FAILED;
1389           WSASetLastError (WSA_INVALID_PARAMETER);
1390         }
1391
1392         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1393           res = 0;
1394         } else if (wait_ret == WSA_WAIT_FAILED) {
1395           res = -1;
1396           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1397         } else {
1398           /* the first entry is the wakeup event */
1399           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1400             res = gst_poll_collect_winsock_events (set);
1401           } else {
1402             res = 1;            /* wakeup event */
1403           }
1404         }
1405 #else
1406         g_assert_not_reached ();
1407         errno = ENOSYS;
1408 #endif
1409         break;
1410       }
1411     }
1412
1413     if (!is_timer) {
1414       /* Applications needs to clear the control socket themselves for timer
1415        * polls.
1416        * For other polls, we need to clear the control socket. If there was only
1417        * one socket with activity and it was the control socket, we need to
1418        * restart */
1419       if (release_all_wakeup (set) > 0 && res == 1)
1420         restarting = TRUE;
1421     }
1422
1423     /* we got woken up and we are flushing, we need to stop */
1424     if (G_UNLIKELY (IS_FLUSHING (set)))
1425       goto flushing;
1426
1427   } while (G_UNLIKELY (restarting));
1428
1429   DEC_WAITING (set);
1430
1431   return res;
1432
1433   /* ERRORS */
1434 already_waiting:
1435   {
1436     GST_LOG ("%p: we are already waiting", set);
1437     DEC_WAITING (set);
1438     errno = EPERM;
1439     return -1;
1440   }
1441 flushing:
1442   {
1443     GST_LOG ("%p: we are flushing", set);
1444     DEC_WAITING (set);
1445     errno = EBUSY;
1446     return -1;
1447   }
1448 #ifdef G_OS_WIN32
1449 winsock_error:
1450   {
1451     GST_LOG ("%p: winsock error", set);
1452     g_mutex_unlock (set->lock);
1453     DEC_WAITING (set);
1454     return -1;
1455   }
1456 #endif
1457 }
1458
1459 /**
1460  * gst_poll_set_controllable:
1461  * @set: a #GstPoll.
1462  * @controllable: new controllable state.
1463  *
1464  * When @controllable is %TRUE, this function ensures that future calls to
1465  * gst_poll_wait() will be affected by gst_poll_restart() and
1466  * gst_poll_set_flushing().
1467  *
1468  * Returns: %TRUE if the controllability of @set could be updated.
1469  *
1470  * Since: 0.10.18
1471  */
1472 gboolean
1473 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1474 {
1475   g_return_val_if_fail (set != NULL, FALSE);
1476
1477   GST_LOG ("%p: controllable : %d", set, controllable);
1478
1479   set->controllable = controllable;
1480
1481   return TRUE;
1482 }
1483
1484 /**
1485  * gst_poll_restart:
1486  * @set: a #GstPoll.
1487  *
1488  * Restart any gst_poll_wait() that is in progress. This function is typically
1489  * used after adding or removing descriptors to @set.
1490  *
1491  * If @set is not controllable, then this call will have no effect.
1492  *
1493  * Since: 0.10.18
1494  */
1495 void
1496 gst_poll_restart (GstPoll * set)
1497 {
1498   g_return_if_fail (set != NULL);
1499
1500   if (set->controllable && GET_WAITING (set) > 0) {
1501     /* we are controllable and waiting, wake up the waiter. The socket will be
1502      * cleared by the _wait() thread and the poll will be restarted */
1503     raise_wakeup (set);
1504   }
1505 }
1506
1507 /**
1508  * gst_poll_set_flushing:
1509  * @set: a #GstPoll.
1510  * @flushing: new flushing state.
1511  *
1512  * When @flushing is %TRUE, this function ensures that current and future calls
1513  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1514  *
1515  * Unsetting the flushing state will restore normal operation of @set.
1516  *
1517  * Since: 0.10.18
1518  */
1519 void
1520 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1521 {
1522   g_return_if_fail (set != NULL);
1523
1524   GST_LOG ("%p: flushing: %d", set, flushing);
1525
1526   /* update the new state first */
1527   SET_FLUSHING (set, flushing);
1528
1529   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1530     /* we are flushing, controllable and waiting, wake up the waiter. When we
1531      * stop the flushing operation we don't clear the wakeup fd here, this will
1532      * happen in the _wait() thread. */
1533     raise_wakeup (set);
1534   }
1535 }
1536
1537 /**
1538  * gst_poll_write_control:
1539  * @set: a #GstPoll.
1540  *
1541  * Write a byte to the control socket of the controllable @set.
1542  * This function is mostly useful for timer #GstPoll objects created with
1543  * gst_poll_new_timer(). 
1544  *
1545  * It will make any current and future gst_poll_wait() function return with
1546  * 1, meaning the control socket is set. After an equal amount of calls to
1547  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1548  * block again until their timeout expired.
1549  *
1550  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1551  * byte could not be written.
1552  *
1553  * Since: 0.10.23
1554  */
1555 gboolean
1556 gst_poll_write_control (GstPoll * set)
1557 {
1558   gboolean res;
1559
1560   g_return_val_if_fail (set != NULL, FALSE);
1561   g_return_val_if_fail (set->timer, FALSE);
1562
1563   res = raise_wakeup (set);
1564
1565   return res;
1566 }
1567
1568 /**
1569  * gst_poll_read_control:
1570  * @set: a #GstPoll.
1571  *
1572  * Read a byte from the control socket of the controllable @set.
1573  * This function is mostly useful for timer #GstPoll objects created with
1574  * gst_poll_new_timer(). 
1575  *
1576  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1577  * was no byte to read.
1578  *
1579  * Since: 0.10.23
1580  */
1581 gboolean
1582 gst_poll_read_control (GstPoll * set)
1583 {
1584   gboolean res;
1585
1586   g_return_val_if_fail (set != NULL, FALSE);
1587   g_return_val_if_fail (set->timer, FALSE);
1588
1589   res = release_wakeup (set);
1590
1591   return res;
1592 }