Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writeable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writeable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writeable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #define EINPROGRESS WSAEINPROGRESS
76 #else
77 #define _GNU_SOURCE 1
78 #ifdef HAVE_SYS_POLL_H
79 #include <sys/poll.h>
80 #endif
81 #ifdef HAVE_POLL_H
82 #include <poll.h>
83 #endif
84 #include <sys/time.h>
85 #include <sys/socket.h>
86 #endif
87
88 /* OS/X needs this because of bad headers */
89 #include <string.h>
90
91 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
92  * so we prefer our own poll emulation.
93  */
94 #if defined(BROKEN_POLL)
95 #undef HAVE_POLL
96 #endif
97
98 #include "gstpoll.h"
99
100 #define GST_CAT_DEFAULT GST_CAT_POLL
101
102 #ifdef G_OS_WIN32
103 typedef struct _WinsockFd WinsockFd;
104
105 struct _WinsockFd
106 {
107   gint fd;
108   glong event_mask;
109   WSANETWORKEVENTS events;
110   glong ignored_event_mask;
111 };
112 #endif
113
114 typedef enum
115 {
116   GST_POLL_MODE_AUTO,
117   GST_POLL_MODE_SELECT,
118   GST_POLL_MODE_PSELECT,
119   GST_POLL_MODE_POLL,
120   GST_POLL_MODE_PPOLL,
121   GST_POLL_MODE_WINDOWS
122 } GstPollMode;
123
124 struct _GstPoll
125 {
126   GstPollMode mode;
127
128   GMutex lock;
129   /* array of fds, always written to and read from with lock */
130   GArray *fds;
131   /* array of active fds, only written to from the waiting thread with the
132    * lock and read from with the lock or without the lock from the waiting
133    * thread */
134   GArray *active_fds;
135
136 #ifndef G_OS_WIN32
137   gchar buf[1];
138   GstPollFD control_read_fd;
139   GstPollFD control_write_fd;
140 #else
141   GArray *active_fds_ignored;
142   GArray *events;
143   GArray *active_events;
144
145   HANDLE wakeup_event;
146 #endif
147
148   gboolean controllable;
149   volatile gint waiting;
150   volatile gint control_pending;
151   volatile gint flushing;
152   gboolean timer;
153   volatile gint rebuild;
154 };
155
156 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
157     gboolean active);
158 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
159
160 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
161 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
162
163 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
164 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
165 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
166
167 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
168 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
169
170 #ifndef G_OS_WIN32
171 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
172 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
173 #else
174 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
175 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
176 #endif
177
178 /* the poll/select call is also performed on a control socket, that way
179  * we can send special commands to control it */
180 static inline gboolean
181 raise_wakeup (GstPoll * set)
182 {
183   gboolean result = TRUE;
184
185   if (g_atomic_int_add (&set->control_pending, 1) == 0) {
186     /* raise when nothing pending */
187     GST_LOG ("%p: raise", set);
188     result = WAKE_EVENT (set);
189   }
190   return result;
191 }
192
193 /* note how bad things can happen when the 2 threads both raise and release the
194  * wakeup. This is however not a problem because you must always pair a raise
195  * with a release */
196 static inline gboolean
197 release_wakeup (GstPoll * set)
198 {
199   gboolean result = TRUE;
200
201   if (g_atomic_int_dec_and_test (&set->control_pending)) {
202     GST_LOG ("%p: release", set);
203     result = RELEASE_EVENT (set);
204   }
205   return result;
206 }
207
208 static inline gint
209 release_all_wakeup (GstPoll * set)
210 {
211   gint old;
212
213   while (TRUE) {
214     if (!(old = g_atomic_int_get (&set->control_pending)))
215       /* nothing pending, just exit */
216       break;
217
218     /* try to remove all pending control messages */
219     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
220       /* we managed to remove all messages, read the control socket */
221       if (RELEASE_EVENT (set))
222         break;
223       else
224         /* retry again until we read it successfully */
225         g_atomic_int_add (&set->control_pending, 1);
226     }
227   }
228   return old;
229 }
230
231 static gint
232 find_index (GArray * array, GstPollFD * fd)
233 {
234 #ifndef G_OS_WIN32
235   struct pollfd *ifd;
236 #else
237   WinsockFd *ifd;
238 #endif
239   guint i;
240
241   /* start by assuming the index found in the fd is still valid */
242   if (fd->idx >= 0 && fd->idx < array->len) {
243 #ifndef G_OS_WIN32
244     ifd = &g_array_index (array, struct pollfd, fd->idx);
245 #else
246     ifd = &g_array_index (array, WinsockFd, fd->idx);
247 #endif
248
249     if (ifd->fd == fd->fd) {
250       return fd->idx;
251     }
252   }
253
254   /* the pollfd array has changed and we need to lookup the fd again */
255   for (i = 0; i < array->len; i++) {
256 #ifndef G_OS_WIN32
257     ifd = &g_array_index (array, struct pollfd, i);
258 #else
259     ifd = &g_array_index (array, WinsockFd, i);
260 #endif
261
262     if (ifd->fd == fd->fd) {
263       fd->idx = (gint) i;
264       return fd->idx;
265     }
266   }
267
268   fd->idx = -1;
269   return fd->idx;
270 }
271
272 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
273 /* check if all file descriptors will fit in an fd_set */
274 static gboolean
275 selectable_fds (const GstPoll * set)
276 {
277   guint i;
278
279   g_mutex_lock (&set->lock);
280   for (i = 0; i < set->fds->len; i++) {
281     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
282
283     if (pfd->fd >= FD_SETSIZE)
284       goto too_many;
285   }
286   g_mutex_unlock (&set->lock);
287
288   return TRUE;
289
290 too_many:
291   {
292     g_mutex_unlock (&set->lock);
293     return FALSE;
294   }
295 }
296
297 /* check if the timeout will convert to a timeout value used for poll()
298  * without a loss of precision
299  */
300 static gboolean
301 pollable_timeout (GstClockTime timeout)
302 {
303   if (timeout == GST_CLOCK_TIME_NONE)
304     return TRUE;
305
306   /* not a nice multiple of milliseconds */
307   if (timeout % 1000000)
308     return FALSE;
309
310   return TRUE;
311 }
312 #endif
313
314 static GstPollMode
315 choose_mode (const GstPoll * set, GstClockTime timeout)
316 {
317   GstPollMode mode;
318
319   if (set->mode == GST_POLL_MODE_AUTO) {
320 #ifdef HAVE_PPOLL
321     mode = GST_POLL_MODE_PPOLL;
322 #elif defined(HAVE_POLL)
323     if (!selectable_fds (set) || pollable_timeout (timeout)) {
324       mode = GST_POLL_MODE_POLL;
325     } else {
326 #ifdef HAVE_PSELECT
327       mode = GST_POLL_MODE_PSELECT;
328 #else
329       mode = GST_POLL_MODE_SELECT;
330 #endif
331     }
332 #elif defined(HAVE_PSELECT)
333     mode = GST_POLL_MODE_PSELECT;
334 #else
335     mode = GST_POLL_MODE_SELECT;
336 #endif
337   } else {
338     mode = set->mode;
339   }
340   return mode;
341 }
342
343 #ifndef G_OS_WIN32
344 static gint
345 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
346     fd_set * errorfds)
347 {
348   gint max_fd = -1;
349   guint i;
350
351   FD_ZERO (readfds);
352   FD_ZERO (writefds);
353   FD_ZERO (errorfds);
354
355   g_mutex_lock (&set->lock);
356
357   for (i = 0; i < set->active_fds->len; i++) {
358     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
359
360     if (pfd->fd < FD_SETSIZE) {
361       if (pfd->events & POLLIN)
362         FD_SET (pfd->fd, readfds);
363       if (pfd->events & POLLOUT)
364         FD_SET (pfd->fd, writefds);
365       if (pfd->events)
366         FD_SET (pfd->fd, errorfds);
367       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
368         max_fd = pfd->fd;
369     }
370   }
371
372   g_mutex_unlock (&set->lock);
373
374   return max_fd;
375 }
376
377 static void
378 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
379     fd_set * errorfds)
380 {
381   guint i;
382
383   g_mutex_lock (&set->lock);
384
385   for (i = 0; i < set->active_fds->len; i++) {
386     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
387
388     if (pfd->fd < FD_SETSIZE) {
389       pfd->revents = 0;
390       if (FD_ISSET (pfd->fd, readfds))
391         pfd->revents |= POLLIN;
392       if (FD_ISSET (pfd->fd, writefds))
393         pfd->revents |= POLLOUT;
394       if (FD_ISSET (pfd->fd, errorfds))
395         pfd->revents |= POLLERR;
396     }
397   }
398
399   g_mutex_unlock (&set->lock);
400 }
401 #else /* G_OS_WIN32 */
402 /*
403  * Translate errors thrown by the Winsock API used by GstPoll:
404  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
405  */
406 static gint
407 gst_poll_winsock_error_to_errno (DWORD last_error)
408 {
409   switch (last_error) {
410     case WSA_INVALID_HANDLE:
411     case WSAEINVAL:
412     case WSAENOTSOCK:
413       return EBADF;
414
415     case WSA_NOT_ENOUGH_MEMORY:
416       return ENOMEM;
417
418       /*
419        * Anything else, including:
420        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
421        *   WSANOTINITIALISED
422        */
423     default:
424       return EINVAL;
425   }
426 }
427
428 static void
429 gst_poll_free_winsock_event (GstPoll * set, gint idx)
430 {
431   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
432   HANDLE event = g_array_index (set->events, HANDLE, idx);
433
434   WSAEventSelect (wfd->fd, event, 0);
435   CloseHandle (event);
436 }
437
438 static void
439 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
440     gboolean active)
441 {
442   WinsockFd *wfd;
443
444   wfd = &g_array_index (set->fds, WinsockFd, idx);
445
446   if (active)
447     wfd->event_mask |= flags;
448   else
449     wfd->event_mask &= ~flags;
450
451   /* reset ignored state if the new mask doesn't overlap at all */
452   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
453     wfd->ignored_event_mask = 0;
454 }
455
456 static gboolean
457 gst_poll_prepare_winsock_active_sets (GstPoll * set)
458 {
459   guint i;
460
461   g_array_set_size (set->active_fds, 0);
462   g_array_set_size (set->active_fds_ignored, 0);
463   g_array_set_size (set->active_events, 0);
464   g_array_append_val (set->active_events, set->wakeup_event);
465
466   for (i = 0; i < set->fds->len; i++) {
467     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
468     HANDLE event = g_array_index (set->events, HANDLE, i);
469
470     if (wfd->ignored_event_mask == 0) {
471       gint ret;
472
473       g_array_append_val (set->active_fds, *wfd);
474       g_array_append_val (set->active_events, event);
475
476       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
477       if (G_UNLIKELY (ret != 0)) {
478         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
479         return FALSE;
480       }
481     } else {
482       g_array_append_val (set->active_fds_ignored, wfd);
483     }
484   }
485
486   return TRUE;
487 }
488
489 static gint
490 gst_poll_collect_winsock_events (GstPoll * set)
491 {
492   gint res, i;
493
494   /*
495    * We need to check which events are signaled, and call
496    * WSAEnumNetworkEvents for those that are, which resets
497    * the event and clears the internal network event records.
498    */
499   res = 0;
500   for (i = 0; i < set->active_fds->len; i++) {
501     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
502     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
503     DWORD wait_ret;
504
505     wait_ret = WaitForSingleObject (event, 0);
506     if (wait_ret == WAIT_OBJECT_0) {
507       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
508
509       if (G_UNLIKELY (enum_ret != 0)) {
510         res = -1;
511         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
512         break;
513       }
514
515       res++;
516     } else {
517       /* clear any previously stored result */
518       memset (&wfd->events, 0, sizeof (wfd->events));
519     }
520   }
521
522   /* If all went well we also need to reset the ignored fds. */
523   if (res >= 0) {
524     res += set->active_fds_ignored->len;
525
526     for (i = 0; i < set->active_fds_ignored->len; i++) {
527       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
528
529       wfd->ignored_event_mask = 0;
530     }
531
532     g_array_set_size (set->active_fds_ignored, 0);
533   }
534
535   return res;
536 }
537 #endif
538
539 /**
540  * gst_poll_new: (skip)
541  * @controllable: whether it should be possible to control a wait.
542  *
543  * Create a new file descriptor set. If @controllable, it
544  * is possible to restart or flush a call to gst_poll_wait() with
545  * gst_poll_restart() and gst_poll_set_flushing() respectively.
546  *
547  * Free-function: gst_poll_free
548  *
549  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
550  *     Free with gst_poll_free().
551  *
552  * Since: 0.10.18
553  */
554 GstPoll *
555 gst_poll_new (gboolean controllable)
556 {
557   GstPoll *nset;
558
559   GST_DEBUG ("controllable : %d", controllable);
560
561   nset = g_slice_new0 (GstPoll);
562   g_mutex_init (&nset->lock);
563 #ifndef G_OS_WIN32
564   nset->mode = GST_POLL_MODE_AUTO;
565   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
566   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
567   nset->control_read_fd.fd = -1;
568   nset->control_write_fd.fd = -1;
569   {
570     gint control_sock[2];
571
572     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
573       goto no_socket_pair;
574
575     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
576     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
577
578     nset->control_read_fd.fd = control_sock[0];
579     nset->control_write_fd.fd = control_sock[1];
580
581     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
582     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
583   }
584 #else
585   nset->mode = GST_POLL_MODE_WINDOWS;
586   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
587   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
588   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
589   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
590   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
591
592   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
593 #endif
594
595   /* ensure (re)build, though already sneakily set in non-windows case */
596   MARK_REBUILD (nset);
597
598   nset->controllable = controllable;
599
600   return nset;
601
602   /* ERRORS */
603 #ifndef G_OS_WIN32
604 no_socket_pair:
605   {
606     GST_WARNING ("%p: can't create socket pair !", nset);
607     gst_poll_free (nset);
608     return NULL;
609   }
610 #endif
611 }
612
613 /**
614  * gst_poll_new_timer: (skip)
615  *
616  * Create a new poll object that can be used for scheduling cancellable
617  * timeouts.
618  *
619  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
620  * performed from different threads. 
621  *
622  * Free-function: gst_poll_free
623  *
624  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
625  *     Free with gst_poll_free().
626  *
627  * Since: 0.10.23
628  */
629 GstPoll *
630 gst_poll_new_timer (void)
631 {
632   GstPoll *poll;
633
634   /* make a new controllable poll set */
635   if (!(poll = gst_poll_new (TRUE)))
636     goto done;
637
638   /* we are a timer */
639   poll->timer = TRUE;
640
641 done:
642   return poll;
643 }
644
645 /**
646  * gst_poll_free:
647  * @set: (transfer full): a file descriptor set.
648  *
649  * Free a file descriptor set.
650  *
651  * Since: 0.10.18
652  */
653 void
654 gst_poll_free (GstPoll * set)
655 {
656   g_return_if_fail (set != NULL);
657
658   GST_DEBUG ("%p: freeing", set);
659
660 #ifndef G_OS_WIN32
661   if (set->control_write_fd.fd >= 0)
662     close (set->control_write_fd.fd);
663   if (set->control_read_fd.fd >= 0)
664     close (set->control_read_fd.fd);
665 #else
666   CloseHandle (set->wakeup_event);
667
668   {
669     guint i;
670
671     for (i = 0; i < set->events->len; i++)
672       gst_poll_free_winsock_event (set, i);
673   }
674
675   g_array_free (set->active_events, TRUE);
676   g_array_free (set->events, TRUE);
677   g_array_free (set->active_fds_ignored, TRUE);
678 #endif
679
680   g_array_free (set->active_fds, TRUE);
681   g_array_free (set->fds, TRUE);
682   g_mutex_clear (&set->lock);
683   g_slice_free (GstPoll, set);
684 }
685
686 /**
687  * gst_poll_get_read_gpollfd:
688  * @set: a #GstPoll
689  * @fd: a #GPollFD
690  *
691  * Get a GPollFD for the reading part of the control socket. This is useful when
692  * integrating with a GSource and GMainLoop.
693  *
694  * Since: 0.10.32
695  */
696 void
697 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
698 {
699   g_return_if_fail (set != NULL);
700   g_return_if_fail (fd != NULL);
701
702 #ifndef G_OS_WIN32
703   fd->fd = set->control_read_fd.fd;
704 #else
705 #if GLIB_SIZEOF_VOID_P == 8
706   fd->fd = (gint64) set->wakeup_event;
707 #else
708   fd->fd = (gint) set->wakeup_event;
709 #endif
710 #endif
711   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
712   fd->revents = 0;
713 }
714
715 /**
716  * gst_poll_fd_init:
717  * @fd: a #GstPollFD
718  *
719  * Initializes @fd. Alternatively you can initialize it with
720  * #GST_POLL_FD_INIT.
721  *
722  * Since: 0.10.18
723  */
724 void
725 gst_poll_fd_init (GstPollFD * fd)
726 {
727   g_return_if_fail (fd != NULL);
728
729   fd->fd = -1;
730   fd->idx = -1;
731 }
732
733 static gboolean
734 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
735 {
736   gint idx;
737
738   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
739
740   idx = find_index (set->fds, fd);
741   if (idx < 0) {
742 #ifndef G_OS_WIN32
743     struct pollfd nfd;
744
745     nfd.fd = fd->fd;
746     nfd.events = POLLERR | POLLNVAL | POLLHUP;
747     nfd.revents = 0;
748
749     g_array_append_val (set->fds, nfd);
750
751     fd->idx = set->fds->len - 1;
752 #else
753     WinsockFd wfd;
754     HANDLE event;
755
756     wfd.fd = fd->fd;
757     wfd.event_mask = FD_CLOSE;
758     memset (&wfd.events, 0, sizeof (wfd.events));
759     wfd.ignored_event_mask = 0;
760     event = WSACreateEvent ();
761
762     g_array_append_val (set->fds, wfd);
763     g_array_append_val (set->events, event);
764
765     fd->idx = set->fds->len - 1;
766 #endif
767     MARK_REBUILD (set);
768   } else {
769     GST_WARNING ("%p: couldn't find fd !", set);
770   }
771
772   return TRUE;
773 }
774
775 /**
776  * gst_poll_add_fd:
777  * @set: a file descriptor set.
778  * @fd: a file descriptor.
779  *
780  * Add a file descriptor to the file descriptor set.
781  *
782  * Returns: %TRUE if the file descriptor was successfully added to the set.
783  *
784  * Since: 0.10.18
785  */
786 gboolean
787 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
788 {
789   gboolean ret;
790
791   g_return_val_if_fail (set != NULL, FALSE);
792   g_return_val_if_fail (fd != NULL, FALSE);
793   g_return_val_if_fail (fd->fd >= 0, FALSE);
794
795   g_mutex_lock (&set->lock);
796
797   ret = gst_poll_add_fd_unlocked (set, fd);
798
799   g_mutex_unlock (&set->lock);
800
801   return ret;
802 }
803
804 /**
805  * gst_poll_remove_fd:
806  * @set: a file descriptor set.
807  * @fd: a file descriptor.
808  *
809  * Remove a file descriptor from the file descriptor set.
810  *
811  * Returns: %TRUE if the file descriptor was successfully removed from the set.
812  *
813  * Since: 0.10.18
814  */
815 gboolean
816 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
817 {
818   gint idx;
819
820   g_return_val_if_fail (set != NULL, FALSE);
821   g_return_val_if_fail (fd != NULL, FALSE);
822   g_return_val_if_fail (fd->fd >= 0, FALSE);
823
824
825   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
826
827   g_mutex_lock (&set->lock);
828
829   /* get the index, -1 is an fd that is not added */
830   idx = find_index (set->fds, fd);
831   if (idx >= 0) {
832 #ifdef G_OS_WIN32
833     gst_poll_free_winsock_event (set, idx);
834     g_array_remove_index_fast (set->events, idx);
835 #endif
836
837     /* remove the fd at index, we use _remove_index_fast, which copies the last
838      * element of the array to the freed index */
839     g_array_remove_index_fast (set->fds, idx);
840
841     /* mark fd as removed by setting the index to -1 */
842     fd->idx = -1;
843     MARK_REBUILD (set);
844   } else {
845     GST_WARNING ("%p: couldn't find fd !", set);
846   }
847
848   g_mutex_unlock (&set->lock);
849
850   return idx >= 0;
851 }
852
853 /**
854  * gst_poll_fd_ctl_write:
855  * @set: a file descriptor set.
856  * @fd: a file descriptor.
857  * @active: a new status.
858  *
859  * Control whether the descriptor @fd in @set will be monitored for
860  * writability.
861  *
862  * Returns: %TRUE if the descriptor was successfully updated.
863  *
864  * Since: 0.10.18
865  */
866 gboolean
867 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
868 {
869   gint idx;
870
871   g_return_val_if_fail (set != NULL, FALSE);
872   g_return_val_if_fail (fd != NULL, FALSE);
873   g_return_val_if_fail (fd->fd >= 0, FALSE);
874
875   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
876       fd->fd, fd->idx, active);
877
878   g_mutex_lock (&set->lock);
879
880   idx = find_index (set->fds, fd);
881   if (idx >= 0) {
882 #ifndef G_OS_WIN32
883     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
884
885     if (active)
886       pfd->events |= POLLOUT;
887     else
888       pfd->events &= ~POLLOUT;
889
890     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
891 #else
892     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
893         active);
894 #endif
895     MARK_REBUILD (set);
896   } else {
897     GST_WARNING ("%p: couldn't find fd !", set);
898   }
899
900   g_mutex_unlock (&set->lock);
901
902   return idx >= 0;
903 }
904
905 static gboolean
906 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
907 {
908   gint idx;
909
910   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
911       fd->fd, fd->idx, active);
912
913   idx = find_index (set->fds, fd);
914
915   if (idx >= 0) {
916 #ifndef G_OS_WIN32
917     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
918
919     if (active)
920       pfd->events |= (POLLIN | POLLPRI);
921     else
922       pfd->events &= ~(POLLIN | POLLPRI);
923 #else
924     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
925 #endif
926     MARK_REBUILD (set);
927   } else {
928     GST_WARNING ("%p: couldn't find fd !", set);
929   }
930
931   return idx >= 0;
932 }
933
934 /**
935  * gst_poll_fd_ctl_read:
936  * @set: a file descriptor set.
937  * @fd: a file descriptor.
938  * @active: a new status.
939  *
940  * Control whether the descriptor @fd in @set will be monitored for
941  * readability.
942  *
943  * Returns: %TRUE if the descriptor was successfully updated.
944  *
945  * Since: 0.10.18
946  */
947 gboolean
948 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
949 {
950   gboolean ret;
951
952   g_return_val_if_fail (set != NULL, FALSE);
953   g_return_val_if_fail (fd != NULL, FALSE);
954   g_return_val_if_fail (fd->fd >= 0, FALSE);
955
956   g_mutex_lock (&set->lock);
957
958   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
959
960   g_mutex_unlock (&set->lock);
961
962   return ret;
963 }
964
965 /**
966  * gst_poll_fd_ignored:
967  * @set: a file descriptor set.
968  * @fd: a file descriptor.
969  *
970  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
971  * the same result for @fd as last time. This function must be called if no
972  * operation (read/write/recv/send/etc.) will be performed on @fd before
973  * the next call to gst_poll_wait().
974  *
975  * The reason why this is needed is because the underlying implementation
976  * might not allow querying the fd more than once between calls to one of
977  * the re-enabling operations.
978  *
979  * Since: 0.10.18
980  */
981 void
982 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
983 {
984 #ifdef G_OS_WIN32
985   gint idx;
986
987   g_return_if_fail (set != NULL);
988   g_return_if_fail (fd != NULL);
989   g_return_if_fail (fd->fd >= 0);
990
991   g_mutex_lock (&set->lock);
992
993   idx = find_index (set->fds, fd);
994   if (idx >= 0) {
995     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
996
997     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
998     MARK_REBUILD (set);
999   }
1000
1001   g_mutex_unlock (&set->lock);
1002 #endif
1003 }
1004
1005 /**
1006  * gst_poll_fd_has_closed:
1007  * @set: a file descriptor set.
1008  * @fd: a file descriptor.
1009  *
1010  * Check if @fd in @set has closed the connection.
1011  *
1012  * Returns: %TRUE if the connection was closed.
1013  *
1014  * Since: 0.10.18
1015  */
1016 gboolean
1017 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1018 {
1019   gboolean res = FALSE;
1020   gint idx;
1021
1022   g_return_val_if_fail (set != NULL, FALSE);
1023   g_return_val_if_fail (fd != NULL, FALSE);
1024   g_return_val_if_fail (fd->fd >= 0, FALSE);
1025
1026   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1027
1028   g_mutex_lock (&((GstPoll *) set)->lock);
1029
1030   idx = find_index (set->active_fds, fd);
1031   if (idx >= 0) {
1032 #ifndef G_OS_WIN32
1033     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1034
1035     res = (pfd->revents & POLLHUP) != 0;
1036 #else
1037     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1038
1039     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1040 #endif
1041   } else {
1042     GST_WARNING ("%p: couldn't find fd !", set);
1043   }
1044
1045   g_mutex_unlock (&((GstPoll *) set)->lock);
1046
1047   return res;
1048 }
1049
1050 /**
1051  * gst_poll_fd_has_error:
1052  * @set: a file descriptor set.
1053  * @fd: a file descriptor.
1054  *
1055  * Check if @fd in @set has an error.
1056  *
1057  * Returns: %TRUE if the descriptor has an error.
1058  *
1059  * Since: 0.10.18
1060  */
1061 gboolean
1062 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1063 {
1064   gboolean res = FALSE;
1065   gint idx;
1066
1067   g_return_val_if_fail (set != NULL, FALSE);
1068   g_return_val_if_fail (fd != NULL, FALSE);
1069   g_return_val_if_fail (fd->fd >= 0, FALSE);
1070
1071   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1072
1073   g_mutex_lock (&((GstPoll *) set)->lock);
1074
1075   idx = find_index (set->active_fds, fd);
1076   if (idx >= 0) {
1077 #ifndef G_OS_WIN32
1078     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1079
1080     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1081 #else
1082     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1083
1084     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1085         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1086         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1087         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1088         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1089 #endif
1090   } else {
1091     GST_WARNING ("%p: couldn't find fd !", set);
1092   }
1093
1094   g_mutex_unlock (&((GstPoll *) set)->lock);
1095
1096   return res;
1097 }
1098
1099 static gboolean
1100 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1101 {
1102   gboolean res = FALSE;
1103   gint idx;
1104
1105   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1106
1107   idx = find_index (set->active_fds, fd);
1108   if (idx >= 0) {
1109 #ifndef G_OS_WIN32
1110     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1111
1112     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1113 #else
1114     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1115
1116     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1117 #endif
1118   } else {
1119     GST_WARNING ("%p: couldn't find fd !", set);
1120   }
1121
1122   return res;
1123 }
1124
1125 /**
1126  * gst_poll_fd_can_read:
1127  * @set: a file descriptor set.
1128  * @fd: a file descriptor.
1129  *
1130  * Check if @fd in @set has data to be read.
1131  *
1132  * Returns: %TRUE if the descriptor has data to be read.
1133  *
1134  * Since: 0.10.18
1135  */
1136 gboolean
1137 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1138 {
1139   gboolean res = FALSE;
1140
1141   g_return_val_if_fail (set != NULL, FALSE);
1142   g_return_val_if_fail (fd != NULL, FALSE);
1143   g_return_val_if_fail (fd->fd >= 0, FALSE);
1144
1145   g_mutex_lock (&((GstPoll *) set)->lock);
1146
1147   res = gst_poll_fd_can_read_unlocked (set, fd);
1148
1149   g_mutex_unlock (&((GstPoll *) set)->lock);
1150
1151   return res;
1152 }
1153
1154 /**
1155  * gst_poll_fd_can_write:
1156  * @set: a file descriptor set.
1157  * @fd: a file descriptor.
1158  *
1159  * Check if @fd in @set can be used for writing.
1160  *
1161  * Returns: %TRUE if the descriptor can be used for writing.
1162  *
1163  * Since: 0.10.18
1164  */
1165 gboolean
1166 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1167 {
1168   gboolean res = FALSE;
1169   gint idx;
1170
1171   g_return_val_if_fail (set != NULL, FALSE);
1172   g_return_val_if_fail (fd != NULL, FALSE);
1173   g_return_val_if_fail (fd->fd >= 0, FALSE);
1174
1175   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1176
1177   g_mutex_lock (&((GstPoll *) set)->lock);
1178
1179   idx = find_index (set->active_fds, fd);
1180   if (idx >= 0) {
1181 #ifndef G_OS_WIN32
1182     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1183
1184     res = (pfd->revents & POLLOUT) != 0;
1185 #else
1186     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1187
1188     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1189 #endif
1190   } else {
1191     GST_WARNING ("%p: couldn't find fd !", set);
1192   }
1193
1194   g_mutex_unlock (&((GstPoll *) set)->lock);
1195
1196   return res;
1197 }
1198
1199 /**
1200  * gst_poll_wait:
1201  * @set: a #GstPoll.
1202  * @timeout: a timeout in nanoseconds.
1203  *
1204  * Wait for activity on the file descriptors in @set. This function waits up to
1205  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1206  *
1207  * For #GstPoll objects created with gst_poll_new(), this function can only be
1208  * called from a single thread at a time.  If called from multiple threads,
1209  * -1 will be returned with errno set to EPERM.
1210  *
1211  * This is not true for timer #GstPoll objects created with
1212  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1213  * simultaneously.
1214  *
1215  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1216  * activity was detected after @timeout. If an error occurs, -1 is returned
1217  * and errno is set.
1218  *
1219  * Since: 0.10.18
1220  */
1221 gint
1222 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1223 {
1224   gboolean restarting;
1225   gboolean is_timer;
1226   int res;
1227   gint old_waiting;
1228
1229   g_return_val_if_fail (set != NULL, -1);
1230
1231   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1232
1233   is_timer = set->timer;
1234
1235   /* add one more waiter */
1236   old_waiting = INC_WAITING (set);
1237
1238   /* we cannot wait from multiple threads unless we are a timer */
1239   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1240     goto already_waiting;
1241
1242   /* flushing, exit immediately */
1243   if (G_UNLIKELY (IS_FLUSHING (set)))
1244     goto flushing;
1245
1246   do {
1247     GstPollMode mode;
1248
1249     res = -1;
1250     restarting = FALSE;
1251
1252     mode = choose_mode (set, timeout);
1253
1254     if (TEST_REBUILD (set)) {
1255       g_mutex_lock (&set->lock);
1256 #ifndef G_OS_WIN32
1257       g_array_set_size (set->active_fds, set->fds->len);
1258       memcpy (set->active_fds->data, set->fds->data,
1259           set->fds->len * sizeof (struct pollfd));
1260 #else
1261       if (!gst_poll_prepare_winsock_active_sets (set))
1262         goto winsock_error;
1263 #endif
1264       g_mutex_unlock (&set->lock);
1265     }
1266
1267     switch (mode) {
1268       case GST_POLL_MODE_AUTO:
1269         g_assert_not_reached ();
1270         break;
1271       case GST_POLL_MODE_PPOLL:
1272       {
1273 #ifdef HAVE_PPOLL
1274         struct timespec ts;
1275         struct timespec *tsptr;
1276
1277         if (timeout != GST_CLOCK_TIME_NONE) {
1278           GST_TIME_TO_TIMESPEC (timeout, ts);
1279           tsptr = &ts;
1280         } else {
1281           tsptr = NULL;
1282         }
1283
1284         res =
1285             ppoll ((struct pollfd *) set->active_fds->data,
1286             set->active_fds->len, tsptr, NULL);
1287 #else
1288         g_assert_not_reached ();
1289         errno = ENOSYS;
1290 #endif
1291         break;
1292       }
1293       case GST_POLL_MODE_POLL:
1294       {
1295 #ifdef HAVE_POLL
1296         gint t;
1297
1298         if (timeout != GST_CLOCK_TIME_NONE) {
1299           t = GST_TIME_AS_MSECONDS (timeout);
1300         } else {
1301           t = -1;
1302         }
1303
1304         res =
1305             poll ((struct pollfd *) set->active_fds->data,
1306             set->active_fds->len, t);
1307 #else
1308         g_assert_not_reached ();
1309         errno = ENOSYS;
1310 #endif
1311         break;
1312       }
1313       case GST_POLL_MODE_PSELECT:
1314 #ifndef HAVE_PSELECT
1315       {
1316         g_assert_not_reached ();
1317         errno = ENOSYS;
1318         break;
1319       }
1320 #endif
1321       case GST_POLL_MODE_SELECT:
1322       {
1323 #ifndef G_OS_WIN32
1324         fd_set readfds;
1325         fd_set writefds;
1326         fd_set errorfds;
1327         gint max_fd;
1328
1329         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1330
1331         if (mode == GST_POLL_MODE_SELECT) {
1332           struct timeval tv;
1333           struct timeval *tvptr;
1334
1335           if (timeout != GST_CLOCK_TIME_NONE) {
1336             GST_TIME_TO_TIMEVAL (timeout, tv);
1337             tvptr = &tv;
1338           } else {
1339             tvptr = NULL;
1340           }
1341
1342           GST_DEBUG ("Calling select");
1343           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1344           GST_DEBUG ("After select, res:%d", res);
1345         } else {
1346 #ifdef HAVE_PSELECT
1347           struct timespec ts;
1348           struct timespec *tsptr;
1349
1350           if (timeout != GST_CLOCK_TIME_NONE) {
1351             GST_TIME_TO_TIMESPEC (timeout, ts);
1352             tsptr = &ts;
1353           } else {
1354             tsptr = NULL;
1355           }
1356
1357           GST_DEBUG ("Calling pselect");
1358           res =
1359               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1360           GST_DEBUG ("After pselect, res:%d", res);
1361 #endif
1362         }
1363
1364         if (res >= 0) {
1365           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1366         }
1367 #else /* G_OS_WIN32 */
1368         g_assert_not_reached ();
1369         errno = ENOSYS;
1370 #endif
1371         break;
1372       }
1373       case GST_POLL_MODE_WINDOWS:
1374       {
1375 #ifdef G_OS_WIN32
1376         gint ignore_count = set->active_fds_ignored->len;
1377         DWORD t, wait_ret;
1378
1379         if (G_LIKELY (ignore_count == 0)) {
1380           if (timeout != GST_CLOCK_TIME_NONE)
1381             t = GST_TIME_AS_MSECONDS (timeout);
1382           else
1383             t = INFINITE;
1384         } else {
1385           /* already one or more ignored fds, so we quickly sweep the others */
1386           t = 0;
1387         }
1388
1389         if (set->active_events->len != 0) {
1390           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1391               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1392         } else {
1393           wait_ret = WSA_WAIT_FAILED;
1394           WSASetLastError (WSA_INVALID_PARAMETER);
1395         }
1396
1397         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1398           res = 0;
1399         } else if (wait_ret == WSA_WAIT_FAILED) {
1400           res = -1;
1401           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1402         } else {
1403           /* the first entry is the wakeup event */
1404           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1405             res = gst_poll_collect_winsock_events (set);
1406           } else {
1407             res = 1;            /* wakeup event */
1408           }
1409         }
1410 #else
1411         g_assert_not_reached ();
1412         errno = ENOSYS;
1413 #endif
1414         break;
1415       }
1416     }
1417
1418     if (!is_timer) {
1419       /* Applications needs to clear the control socket themselves for timer
1420        * polls.
1421        * For other polls, we need to clear the control socket. If there was only
1422        * one socket with activity and it was the control socket, we need to
1423        * restart */
1424       if (release_all_wakeup (set) > 0 && res == 1)
1425         restarting = TRUE;
1426     }
1427
1428     /* we got woken up and we are flushing, we need to stop */
1429     if (G_UNLIKELY (IS_FLUSHING (set)))
1430       goto flushing;
1431
1432   } while (G_UNLIKELY (restarting));
1433
1434   DEC_WAITING (set);
1435
1436   return res;
1437
1438   /* ERRORS */
1439 already_waiting:
1440   {
1441     GST_LOG ("%p: we are already waiting", set);
1442     DEC_WAITING (set);
1443     errno = EPERM;
1444     return -1;
1445   }
1446 flushing:
1447   {
1448     GST_LOG ("%p: we are flushing", set);
1449     DEC_WAITING (set);
1450     errno = EBUSY;
1451     return -1;
1452   }
1453 #ifdef G_OS_WIN32
1454 winsock_error:
1455   {
1456     GST_LOG ("%p: winsock error", set);
1457     g_mutex_unlock (&set->lock);
1458     DEC_WAITING (set);
1459     return -1;
1460   }
1461 #endif
1462 }
1463
1464 /**
1465  * gst_poll_set_controllable:
1466  * @set: a #GstPoll.
1467  * @controllable: new controllable state.
1468  *
1469  * When @controllable is %TRUE, this function ensures that future calls to
1470  * gst_poll_wait() will be affected by gst_poll_restart() and
1471  * gst_poll_set_flushing().
1472  *
1473  * Returns: %TRUE if the controllability of @set could be updated.
1474  *
1475  * Since: 0.10.18
1476  */
1477 gboolean
1478 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1479 {
1480   g_return_val_if_fail (set != NULL, FALSE);
1481
1482   GST_LOG ("%p: controllable : %d", set, controllable);
1483
1484   set->controllable = controllable;
1485
1486   return TRUE;
1487 }
1488
1489 /**
1490  * gst_poll_restart:
1491  * @set: a #GstPoll.
1492  *
1493  * Restart any gst_poll_wait() that is in progress. This function is typically
1494  * used after adding or removing descriptors to @set.
1495  *
1496  * If @set is not controllable, then this call will have no effect.
1497  *
1498  * Since: 0.10.18
1499  */
1500 void
1501 gst_poll_restart (GstPoll * set)
1502 {
1503   g_return_if_fail (set != NULL);
1504
1505   if (set->controllable && GET_WAITING (set) > 0) {
1506     /* we are controllable and waiting, wake up the waiter. The socket will be
1507      * cleared by the _wait() thread and the poll will be restarted */
1508     raise_wakeup (set);
1509   }
1510 }
1511
1512 /**
1513  * gst_poll_set_flushing:
1514  * @set: a #GstPoll.
1515  * @flushing: new flushing state.
1516  *
1517  * When @flushing is %TRUE, this function ensures that current and future calls
1518  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1519  *
1520  * Unsetting the flushing state will restore normal operation of @set.
1521  *
1522  * Since: 0.10.18
1523  */
1524 void
1525 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1526 {
1527   g_return_if_fail (set != NULL);
1528
1529   GST_LOG ("%p: flushing: %d", set, flushing);
1530
1531   /* update the new state first */
1532   SET_FLUSHING (set, flushing);
1533
1534   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1535     /* we are flushing, controllable and waiting, wake up the waiter. When we
1536      * stop the flushing operation we don't clear the wakeup fd here, this will
1537      * happen in the _wait() thread. */
1538     raise_wakeup (set);
1539   }
1540 }
1541
1542 /**
1543  * gst_poll_write_control:
1544  * @set: a #GstPoll.
1545  *
1546  * Write a byte to the control socket of the controllable @set.
1547  * This function is mostly useful for timer #GstPoll objects created with
1548  * gst_poll_new_timer(). 
1549  *
1550  * It will make any current and future gst_poll_wait() function return with
1551  * 1, meaning the control socket is set. After an equal amount of calls to
1552  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1553  * block again until their timeout expired.
1554  *
1555  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1556  * byte could not be written.
1557  *
1558  * Since: 0.10.23
1559  */
1560 gboolean
1561 gst_poll_write_control (GstPoll * set)
1562 {
1563   gboolean res;
1564
1565   g_return_val_if_fail (set != NULL, FALSE);
1566   g_return_val_if_fail (set->timer, FALSE);
1567
1568   res = raise_wakeup (set);
1569
1570   return res;
1571 }
1572
1573 /**
1574  * gst_poll_read_control:
1575  * @set: a #GstPoll.
1576  *
1577  * Read a byte from the control socket of the controllable @set.
1578  * This function is mostly useful for timer #GstPoll objects created with
1579  * gst_poll_new_timer(). 
1580  *
1581  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1582  * was no byte to read.
1583  *
1584  * Since: 0.10.23
1585  */
1586 gboolean
1587 gst_poll_read_control (GstPoll * set)
1588 {
1589   gboolean res;
1590
1591   g_return_val_if_fail (set != NULL, FALSE);
1592   g_return_val_if_fail (set->timer, FALSE);
1593
1594   res = release_wakeup (set);
1595
1596   return res;
1597 }