docs: Fix typos in function/object descriptions
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancellable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 /* OS/X needs this because of bad headers */
88 #include <string.h>
89
90 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
91  * so we prefer our own poll emulation.
92  */
93 #if defined(BROKEN_POLL)
94 #undef HAVE_POLL
95 #endif
96
97 #include "gstpoll.h"
98
99 #define GST_CAT_DEFAULT GST_CAT_POLL
100
101 #ifdef G_OS_WIN32
102 typedef struct _WinsockFd WinsockFd;
103
104 struct _WinsockFd
105 {
106   gint fd;
107   glong event_mask;
108   WSANETWORKEVENTS events;
109   glong ignored_event_mask;
110 };
111 #endif
112
113 typedef enum
114 {
115   GST_POLL_MODE_AUTO,
116   GST_POLL_MODE_SELECT,
117   GST_POLL_MODE_PSELECT,
118   GST_POLL_MODE_POLL,
119   GST_POLL_MODE_PPOLL,
120   GST_POLL_MODE_WINDOWS
121 } GstPollMode;
122
123 struct _GstPoll
124 {
125   GstPollMode mode;
126
127   GMutex lock;
128   /* array of fds, always written to and read from with lock */
129   GArray *fds;
130   /* array of active fds, only written to from the waiting thread with the
131    * lock and read from with the lock or without the lock from the waiting
132    * thread */
133   GArray *active_fds;
134
135 #ifndef G_OS_WIN32
136   gchar buf[1];
137   GstPollFD control_read_fd;
138   GstPollFD control_write_fd;
139 #else
140   GArray *active_fds_ignored;
141   GArray *events;
142   GArray *active_events;
143
144   HANDLE wakeup_event;
145 #endif
146
147   gboolean controllable;
148   volatile gint waiting;
149   volatile gint control_pending;
150   volatile gint flushing;
151   gboolean timer;
152   volatile gint rebuild;
153 };
154
155 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
156     gboolean active);
157 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
158
159 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
160 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
161
162 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
163 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
164 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
165
166 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
167 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
168
169 #ifndef G_OS_WIN32
170 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
171 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
172 #else
173 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
174 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
175 #endif
176
177 /* the poll/select call is also performed on a control socket, that way
178  * we can send special commands to control it */
179 static inline gboolean
180 raise_wakeup (GstPoll * set)
181 {
182   gboolean result = TRUE;
183
184   if (g_atomic_int_add (&set->control_pending, 1) == 0) {
185     /* raise when nothing pending */
186     GST_LOG ("%p: raise", set);
187     result = WAKE_EVENT (set);
188   }
189   return result;
190 }
191
192 /* note how bad things can happen when the 2 threads both raise and release the
193  * wakeup. This is however not a problem because you must always pair a raise
194  * with a release */
195 static inline gboolean
196 release_wakeup (GstPoll * set)
197 {
198   gboolean result = TRUE;
199
200   if (g_atomic_int_dec_and_test (&set->control_pending)) {
201     GST_LOG ("%p: release", set);
202     result = RELEASE_EVENT (set);
203   }
204   return result;
205 }
206
207 static inline gint
208 release_all_wakeup (GstPoll * set)
209 {
210   gint old;
211
212   while (TRUE) {
213     if (!(old = g_atomic_int_get (&set->control_pending)))
214       /* nothing pending, just exit */
215       break;
216
217     /* try to remove all pending control messages */
218     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
219       /* we managed to remove all messages, read the control socket */
220       if (RELEASE_EVENT (set))
221         break;
222       else
223         /* retry again until we read it successfully */
224         g_atomic_int_add (&set->control_pending, 1);
225     }
226   }
227   return old;
228 }
229
230 static gint
231 find_index (GArray * array, GstPollFD * fd)
232 {
233 #ifndef G_OS_WIN32
234   struct pollfd *ifd;
235 #else
236   WinsockFd *ifd;
237 #endif
238   guint i;
239
240   /* start by assuming the index found in the fd is still valid */
241   if (fd->idx >= 0 && fd->idx < array->len) {
242 #ifndef G_OS_WIN32
243     ifd = &g_array_index (array, struct pollfd, fd->idx);
244 #else
245     ifd = &g_array_index (array, WinsockFd, fd->idx);
246 #endif
247
248     if (ifd->fd == fd->fd) {
249       return fd->idx;
250     }
251   }
252
253   /* the pollfd array has changed and we need to lookup the fd again */
254   for (i = 0; i < array->len; i++) {
255 #ifndef G_OS_WIN32
256     ifd = &g_array_index (array, struct pollfd, i);
257 #else
258     ifd = &g_array_index (array, WinsockFd, i);
259 #endif
260
261     if (ifd->fd == fd->fd) {
262       fd->idx = (gint) i;
263       return fd->idx;
264     }
265   }
266
267   fd->idx = -1;
268   return fd->idx;
269 }
270
271 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
272 /* check if all file descriptors will fit in an fd_set */
273 static gboolean
274 selectable_fds (GstPoll * set)
275 {
276   guint i;
277
278   g_mutex_lock (&set->lock);
279   for (i = 0; i < set->fds->len; i++) {
280     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
281
282     if (pfd->fd >= FD_SETSIZE)
283       goto too_many;
284   }
285   g_mutex_unlock (&set->lock);
286
287   return TRUE;
288
289 too_many:
290   {
291     g_mutex_unlock (&set->lock);
292     return FALSE;
293   }
294 }
295
296 /* check if the timeout will convert to a timeout value used for poll()
297  * without a loss of precision
298  */
299 static gboolean
300 pollable_timeout (GstClockTime timeout)
301 {
302   if (timeout == GST_CLOCK_TIME_NONE)
303     return TRUE;
304
305   /* not a nice multiple of milliseconds */
306   if (timeout % 1000000)
307     return FALSE;
308
309   return TRUE;
310 }
311 #endif
312
313 static GstPollMode
314 choose_mode (GstPoll * set, GstClockTime timeout)
315 {
316   GstPollMode mode;
317
318   if (set->mode == GST_POLL_MODE_AUTO) {
319 #ifdef HAVE_PPOLL
320     mode = GST_POLL_MODE_PPOLL;
321 #elif defined(HAVE_POLL)
322     if (!selectable_fds (set) || pollable_timeout (timeout)) {
323       mode = GST_POLL_MODE_POLL;
324     } else {
325 #ifdef HAVE_PSELECT
326       mode = GST_POLL_MODE_PSELECT;
327 #else
328       mode = GST_POLL_MODE_SELECT;
329 #endif
330     }
331 #elif defined(HAVE_PSELECT)
332     mode = GST_POLL_MODE_PSELECT;
333 #else
334     mode = GST_POLL_MODE_SELECT;
335 #endif
336   } else {
337     mode = set->mode;
338   }
339   return mode;
340 }
341
342 #ifndef G_OS_WIN32
343 static gint
344 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
345     fd_set * errorfds)
346 {
347   gint max_fd = -1;
348   guint i;
349
350   FD_ZERO (readfds);
351   FD_ZERO (writefds);
352   FD_ZERO (errorfds);
353
354   g_mutex_lock (&set->lock);
355
356   for (i = 0; i < set->active_fds->len; i++) {
357     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
358
359     if (pfd->fd < FD_SETSIZE) {
360       if (pfd->events & POLLIN)
361         FD_SET (pfd->fd, readfds);
362       if (pfd->events & POLLOUT)
363         FD_SET (pfd->fd, writefds);
364       if (pfd->events)
365         FD_SET (pfd->fd, errorfds);
366       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
367         max_fd = pfd->fd;
368     }
369   }
370
371   g_mutex_unlock (&set->lock);
372
373   return max_fd;
374 }
375
376 static void
377 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
378     fd_set * errorfds)
379 {
380   guint i;
381
382   g_mutex_lock (&set->lock);
383
384   for (i = 0; i < set->active_fds->len; i++) {
385     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
386
387     if (pfd->fd < FD_SETSIZE) {
388       pfd->revents = 0;
389       if (FD_ISSET (pfd->fd, readfds))
390         pfd->revents |= POLLIN;
391       if (FD_ISSET (pfd->fd, writefds))
392         pfd->revents |= POLLOUT;
393       if (FD_ISSET (pfd->fd, errorfds))
394         pfd->revents |= POLLERR;
395     }
396   }
397
398   g_mutex_unlock (&set->lock);
399 }
400 #else /* G_OS_WIN32 */
401 /*
402  * Translate errors thrown by the Winsock API used by GstPoll:
403  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
404  */
405 static gint
406 gst_poll_winsock_error_to_errno (DWORD last_error)
407 {
408   switch (last_error) {
409     case WSA_INVALID_HANDLE:
410     case WSAEINVAL:
411     case WSAENOTSOCK:
412       return EBADF;
413
414     case WSA_NOT_ENOUGH_MEMORY:
415       return ENOMEM;
416
417       /*
418        * Anything else, including:
419        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
420        *   WSANOTINITIALISED
421        */
422     default:
423       return EINVAL;
424   }
425 }
426
427 static void
428 gst_poll_free_winsock_event (GstPoll * set, gint idx)
429 {
430   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
431   HANDLE event = g_array_index (set->events, HANDLE, idx);
432
433   WSAEventSelect (wfd->fd, event, 0);
434   CloseHandle (event);
435 }
436
437 static void
438 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
439     gboolean active)
440 {
441   WinsockFd *wfd;
442
443   wfd = &g_array_index (set->fds, WinsockFd, idx);
444
445   if (active)
446     wfd->event_mask |= flags;
447   else
448     wfd->event_mask &= ~flags;
449
450   /* reset ignored state if the new mask doesn't overlap at all */
451   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
452     wfd->ignored_event_mask = 0;
453 }
454
455 static gboolean
456 gst_poll_prepare_winsock_active_sets (GstPoll * set)
457 {
458   guint i;
459
460   g_array_set_size (set->active_fds, 0);
461   g_array_set_size (set->active_fds_ignored, 0);
462   g_array_set_size (set->active_events, 0);
463   g_array_append_val (set->active_events, set->wakeup_event);
464
465   for (i = 0; i < set->fds->len; i++) {
466     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
467     HANDLE event = g_array_index (set->events, HANDLE, i);
468
469     if (wfd->ignored_event_mask == 0) {
470       gint ret;
471
472       g_array_append_val (set->active_fds, *wfd);
473       g_array_append_val (set->active_events, event);
474
475       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
476       if (G_UNLIKELY (ret != 0)) {
477         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
478         return FALSE;
479       }
480     } else {
481       g_array_append_val (set->active_fds_ignored, wfd);
482     }
483   }
484
485   return TRUE;
486 }
487
488 static gint
489 gst_poll_collect_winsock_events (GstPoll * set)
490 {
491   gint res, i;
492
493   /*
494    * We need to check which events are signaled, and call
495    * WSAEnumNetworkEvents for those that are, which resets
496    * the event and clears the internal network event records.
497    */
498   res = 0;
499   for (i = 0; i < set->active_fds->len; i++) {
500     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
501     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
502     DWORD wait_ret;
503
504     wait_ret = WaitForSingleObject (event, 0);
505     if (wait_ret == WAIT_OBJECT_0) {
506       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
507
508       if (G_UNLIKELY (enum_ret != 0)) {
509         res = -1;
510         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
511         break;
512       }
513
514       res++;
515     } else {
516       /* clear any previously stored result */
517       memset (&wfd->events, 0, sizeof (wfd->events));
518     }
519   }
520
521   /* If all went well we also need to reset the ignored fds. */
522   if (res >= 0) {
523     res += set->active_fds_ignored->len;
524
525     for (i = 0; i < set->active_fds_ignored->len; i++) {
526       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
527
528       wfd->ignored_event_mask = 0;
529     }
530
531     g_array_set_size (set->active_fds_ignored, 0);
532   }
533
534   return res;
535 }
536 #endif
537
538 /**
539  * gst_poll_new: (skip)
540  * @controllable: whether it should be possible to control a wait.
541  *
542  * Create a new file descriptor set. If @controllable, it
543  * is possible to restart or flush a call to gst_poll_wait() with
544  * gst_poll_restart() and gst_poll_set_flushing() respectively.
545  *
546  * Free-function: gst_poll_free
547  *
548  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
549  *     Free with gst_poll_free().
550  */
551 GstPoll *
552 gst_poll_new (gboolean controllable)
553 {
554   GstPoll *nset;
555
556   GST_DEBUG ("controllable : %d", controllable);
557
558   nset = g_slice_new0 (GstPoll);
559   g_mutex_init (&nset->lock);
560 #ifndef G_OS_WIN32
561   nset->mode = GST_POLL_MODE_AUTO;
562   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
563   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
564   nset->control_read_fd.fd = -1;
565   nset->control_write_fd.fd = -1;
566   {
567     gint control_sock[2];
568
569     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
570       goto no_socket_pair;
571
572     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
573     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
574
575     nset->control_read_fd.fd = control_sock[0];
576     nset->control_write_fd.fd = control_sock[1];
577
578     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
579     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
580   }
581 #else
582   nset->mode = GST_POLL_MODE_WINDOWS;
583   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
584   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
585   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
586   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
587   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
588
589   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
590 #endif
591
592   /* ensure (re)build, though already sneakily set in non-windows case */
593   MARK_REBUILD (nset);
594
595   nset->controllable = controllable;
596
597   return nset;
598
599   /* ERRORS */
600 #ifndef G_OS_WIN32
601 no_socket_pair:
602   {
603     GST_WARNING ("%p: can't create socket pair !", nset);
604     gst_poll_free (nset);
605     return NULL;
606   }
607 #endif
608 }
609
610 /**
611  * gst_poll_new_timer: (skip)
612  *
613  * Create a new poll object that can be used for scheduling cancellable
614  * timeouts.
615  *
616  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
617  * performed from different threads. 
618  *
619  * Free-function: gst_poll_free
620  *
621  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
622  *     Free with gst_poll_free().
623  */
624 GstPoll *
625 gst_poll_new_timer (void)
626 {
627   GstPoll *poll;
628
629   /* make a new controllable poll set */
630   if (!(poll = gst_poll_new (TRUE)))
631     goto done;
632
633   /* we are a timer */
634   poll->timer = TRUE;
635
636 done:
637   return poll;
638 }
639
640 /**
641  * gst_poll_free:
642  * @set: (transfer full): a file descriptor set.
643  *
644  * Free a file descriptor set.
645  */
646 void
647 gst_poll_free (GstPoll * set)
648 {
649   g_return_if_fail (set != NULL);
650
651   GST_DEBUG ("%p: freeing", set);
652
653 #ifndef G_OS_WIN32
654   if (set->control_write_fd.fd >= 0)
655     close (set->control_write_fd.fd);
656   if (set->control_read_fd.fd >= 0)
657     close (set->control_read_fd.fd);
658 #else
659   CloseHandle (set->wakeup_event);
660
661   {
662     guint i;
663
664     for (i = 0; i < set->events->len; i++)
665       gst_poll_free_winsock_event (set, i);
666   }
667
668   g_array_free (set->active_events, TRUE);
669   g_array_free (set->events, TRUE);
670   g_array_free (set->active_fds_ignored, TRUE);
671 #endif
672
673   g_array_free (set->active_fds, TRUE);
674   g_array_free (set->fds, TRUE);
675   g_mutex_clear (&set->lock);
676   g_slice_free (GstPoll, set);
677 }
678
679 /**
680  * gst_poll_get_read_gpollfd:
681  * @set: a #GstPoll
682  * @fd: a #GPollFD
683  *
684  * Get a GPollFD for the reading part of the control socket. This is useful when
685  * integrating with a GSource and GMainLoop.
686  */
687 void
688 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
689 {
690   g_return_if_fail (set != NULL);
691   g_return_if_fail (fd != NULL);
692
693 #ifndef G_OS_WIN32
694   fd->fd = set->control_read_fd.fd;
695 #else
696 #if GLIB_SIZEOF_VOID_P == 8
697   fd->fd = (gint64) set->wakeup_event;
698 #else
699   fd->fd = (gint) set->wakeup_event;
700 #endif
701 #endif
702   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
703   fd->revents = 0;
704 }
705
706 /**
707  * gst_poll_fd_init:
708  * @fd: a #GstPollFD
709  *
710  * Initializes @fd. Alternatively you can initialize it with
711  * #GST_POLL_FD_INIT.
712  */
713 void
714 gst_poll_fd_init (GstPollFD * fd)
715 {
716   g_return_if_fail (fd != NULL);
717
718   fd->fd = -1;
719   fd->idx = -1;
720 }
721
722 static gboolean
723 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
724 {
725   gint idx;
726
727   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
728
729   idx = find_index (set->fds, fd);
730   if (idx < 0) {
731 #ifndef G_OS_WIN32
732     struct pollfd nfd;
733
734     nfd.fd = fd->fd;
735     nfd.events = POLLERR | POLLNVAL | POLLHUP;
736     nfd.revents = 0;
737
738     g_array_append_val (set->fds, nfd);
739
740     fd->idx = set->fds->len - 1;
741 #else
742     WinsockFd wfd;
743     HANDLE event;
744
745     wfd.fd = fd->fd;
746     wfd.event_mask = FD_CLOSE;
747     memset (&wfd.events, 0, sizeof (wfd.events));
748     wfd.ignored_event_mask = 0;
749     event = WSACreateEvent ();
750
751     g_array_append_val (set->fds, wfd);
752     g_array_append_val (set->events, event);
753
754     fd->idx = set->fds->len - 1;
755 #endif
756     MARK_REBUILD (set);
757   } else {
758     GST_WARNING ("%p: fd already added !", set);
759   }
760
761   return TRUE;
762 }
763
764 /**
765  * gst_poll_add_fd:
766  * @set: a file descriptor set.
767  * @fd: a file descriptor.
768  *
769  * Add a file descriptor to the file descriptor set.
770  *
771  * Returns: %TRUE if the file descriptor was successfully added to the set.
772  */
773 gboolean
774 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
775 {
776   gboolean ret;
777
778   g_return_val_if_fail (set != NULL, FALSE);
779   g_return_val_if_fail (fd != NULL, FALSE);
780   g_return_val_if_fail (fd->fd >= 0, FALSE);
781
782   g_mutex_lock (&set->lock);
783
784   ret = gst_poll_add_fd_unlocked (set, fd);
785
786   g_mutex_unlock (&set->lock);
787
788   return ret;
789 }
790
791 /**
792  * gst_poll_remove_fd:
793  * @set: a file descriptor set.
794  * @fd: a file descriptor.
795  *
796  * Remove a file descriptor from the file descriptor set.
797  *
798  * Returns: %TRUE if the file descriptor was successfully removed from the set.
799  */
800 gboolean
801 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
802 {
803   gint idx;
804
805   g_return_val_if_fail (set != NULL, FALSE);
806   g_return_val_if_fail (fd != NULL, FALSE);
807   g_return_val_if_fail (fd->fd >= 0, FALSE);
808
809
810   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
811
812   g_mutex_lock (&set->lock);
813
814   /* get the index, -1 is an fd that is not added */
815   idx = find_index (set->fds, fd);
816   if (idx >= 0) {
817 #ifdef G_OS_WIN32
818     gst_poll_free_winsock_event (set, idx);
819     g_array_remove_index_fast (set->events, idx);
820 #endif
821
822     /* remove the fd at index, we use _remove_index_fast, which copies the last
823      * element of the array to the freed index */
824     g_array_remove_index_fast (set->fds, idx);
825
826     /* mark fd as removed by setting the index to -1 */
827     fd->idx = -1;
828     MARK_REBUILD (set);
829   } else {
830     GST_WARNING ("%p: couldn't find fd !", set);
831   }
832
833   g_mutex_unlock (&set->lock);
834
835   return idx >= 0;
836 }
837
838 /**
839  * gst_poll_fd_ctl_write:
840  * @set: a file descriptor set.
841  * @fd: a file descriptor.
842  * @active: a new status.
843  *
844  * Control whether the descriptor @fd in @set will be monitored for
845  * writability.
846  *
847  * Returns: %TRUE if the descriptor was successfully updated.
848  */
849 gboolean
850 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
851 {
852   gint idx;
853
854   g_return_val_if_fail (set != NULL, FALSE);
855   g_return_val_if_fail (fd != NULL, FALSE);
856   g_return_val_if_fail (fd->fd >= 0, FALSE);
857
858   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
859       fd->fd, fd->idx, active);
860
861   g_mutex_lock (&set->lock);
862
863   idx = find_index (set->fds, fd);
864   if (idx >= 0) {
865 #ifndef G_OS_WIN32
866     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
867
868     if (active)
869       pfd->events |= POLLOUT;
870     else
871       pfd->events &= ~POLLOUT;
872
873     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
874 #else
875     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
876         active);
877 #endif
878     MARK_REBUILD (set);
879   } else {
880     GST_WARNING ("%p: couldn't find fd !", set);
881   }
882
883   g_mutex_unlock (&set->lock);
884
885   return idx >= 0;
886 }
887
888 static gboolean
889 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
890 {
891   gint idx;
892
893   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
894       fd->fd, fd->idx, active);
895
896   idx = find_index (set->fds, fd);
897
898   if (idx >= 0) {
899 #ifndef G_OS_WIN32
900     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
901
902     if (active)
903       pfd->events |= (POLLIN | POLLPRI);
904     else
905       pfd->events &= ~(POLLIN | POLLPRI);
906 #else
907     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
908 #endif
909     MARK_REBUILD (set);
910   } else {
911     GST_WARNING ("%p: couldn't find fd !", set);
912   }
913
914   return idx >= 0;
915 }
916
917 /**
918  * gst_poll_fd_ctl_read:
919  * @set: a file descriptor set.
920  * @fd: a file descriptor.
921  * @active: a new status.
922  *
923  * Control whether the descriptor @fd in @set will be monitored for
924  * readability.
925  *
926  * Returns: %TRUE if the descriptor was successfully updated.
927  */
928 gboolean
929 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
930 {
931   gboolean ret;
932
933   g_return_val_if_fail (set != NULL, FALSE);
934   g_return_val_if_fail (fd != NULL, FALSE);
935   g_return_val_if_fail (fd->fd >= 0, FALSE);
936
937   g_mutex_lock (&set->lock);
938
939   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
940
941   g_mutex_unlock (&set->lock);
942
943   return ret;
944 }
945
946 /**
947  * gst_poll_fd_ignored:
948  * @set: a file descriptor set.
949  * @fd: a file descriptor.
950  *
951  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
952  * the same result for @fd as last time. This function must be called if no
953  * operation (read/write/recv/send/etc.) will be performed on @fd before
954  * the next call to gst_poll_wait().
955  *
956  * The reason why this is needed is because the underlying implementation
957  * might not allow querying the fd more than once between calls to one of
958  * the re-enabling operations.
959  */
960 void
961 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
962 {
963 #ifdef G_OS_WIN32
964   gint idx;
965
966   g_return_if_fail (set != NULL);
967   g_return_if_fail (fd != NULL);
968   g_return_if_fail (fd->fd >= 0);
969
970   g_mutex_lock (&set->lock);
971
972   idx = find_index (set->fds, fd);
973   if (idx >= 0) {
974     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
975
976     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
977     MARK_REBUILD (set);
978   }
979
980   g_mutex_unlock (&set->lock);
981 #endif
982 }
983
984 /**
985  * gst_poll_fd_has_closed:
986  * @set: a file descriptor set.
987  * @fd: a file descriptor.
988  *
989  * Check if @fd in @set has closed the connection.
990  *
991  * Returns: %TRUE if the connection was closed.
992  */
993 gboolean
994 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
995 {
996   gboolean res = FALSE;
997   gint idx;
998
999   g_return_val_if_fail (set != NULL, FALSE);
1000   g_return_val_if_fail (fd != NULL, FALSE);
1001   g_return_val_if_fail (fd->fd >= 0, FALSE);
1002
1003   g_mutex_lock (&((GstPoll *) set)->lock);
1004
1005   idx = find_index (set->active_fds, fd);
1006   if (idx >= 0) {
1007 #ifndef G_OS_WIN32
1008     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1009
1010     res = (pfd->revents & POLLHUP) != 0;
1011 #else
1012     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1013
1014     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1015 #endif
1016   } else {
1017     GST_WARNING ("%p: couldn't find fd !", set);
1018   }
1019   g_mutex_unlock (&((GstPoll *) set)->lock);
1020
1021   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1022
1023   return res;
1024 }
1025
1026 /**
1027  * gst_poll_fd_has_error:
1028  * @set: a file descriptor set.
1029  * @fd: a file descriptor.
1030  *
1031  * Check if @fd in @set has an error.
1032  *
1033  * Returns: %TRUE if the descriptor has an error.
1034  */
1035 gboolean
1036 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1037 {
1038   gboolean res = FALSE;
1039   gint idx;
1040
1041   g_return_val_if_fail (set != NULL, FALSE);
1042   g_return_val_if_fail (fd != NULL, FALSE);
1043   g_return_val_if_fail (fd->fd >= 0, FALSE);
1044
1045   g_mutex_lock (&((GstPoll *) set)->lock);
1046
1047   idx = find_index (set->active_fds, fd);
1048   if (idx >= 0) {
1049 #ifndef G_OS_WIN32
1050     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1051
1052     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1053 #else
1054     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1055
1056     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1057         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1058         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1059         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1060         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1061 #endif
1062   } else {
1063     GST_WARNING ("%p: couldn't find fd !", set);
1064   }
1065   g_mutex_unlock (&((GstPoll *) set)->lock);
1066
1067   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1068
1069   return res;
1070 }
1071
1072 static gboolean
1073 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1074 {
1075   gboolean res = FALSE;
1076   gint idx;
1077
1078   idx = find_index (set->active_fds, fd);
1079   if (idx >= 0) {
1080 #ifndef G_OS_WIN32
1081     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1082
1083     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1084 #else
1085     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1086
1087     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1088 #endif
1089   } else {
1090     GST_WARNING ("%p: couldn't find fd !", set);
1091   }
1092   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1093
1094   return res;
1095 }
1096
1097 /**
1098  * gst_poll_fd_can_read:
1099  * @set: a file descriptor set.
1100  * @fd: a file descriptor.
1101  *
1102  * Check if @fd in @set has data to be read.
1103  *
1104  * Returns: %TRUE if the descriptor has data to be read.
1105  */
1106 gboolean
1107 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1108 {
1109   gboolean res = FALSE;
1110
1111   g_return_val_if_fail (set != NULL, FALSE);
1112   g_return_val_if_fail (fd != NULL, FALSE);
1113   g_return_val_if_fail (fd->fd >= 0, FALSE);
1114
1115   g_mutex_lock (&((GstPoll *) set)->lock);
1116
1117   res = gst_poll_fd_can_read_unlocked (set, fd);
1118
1119   g_mutex_unlock (&((GstPoll *) set)->lock);
1120
1121   return res;
1122 }
1123
1124 /**
1125  * gst_poll_fd_can_write:
1126  * @set: a file descriptor set.
1127  * @fd: a file descriptor.
1128  *
1129  * Check if @fd in @set can be used for writing.
1130  *
1131  * Returns: %TRUE if the descriptor can be used for writing.
1132  */
1133 gboolean
1134 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1135 {
1136   gboolean res = FALSE;
1137   gint idx;
1138
1139   g_return_val_if_fail (set != NULL, FALSE);
1140   g_return_val_if_fail (fd != NULL, FALSE);
1141   g_return_val_if_fail (fd->fd >= 0, FALSE);
1142
1143   g_mutex_lock (&((GstPoll *) set)->lock);
1144
1145   idx = find_index (set->active_fds, fd);
1146   if (idx >= 0) {
1147 #ifndef G_OS_WIN32
1148     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1149
1150     res = (pfd->revents & POLLOUT) != 0;
1151 #else
1152     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1153
1154     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1155 #endif
1156   } else {
1157     GST_WARNING ("%p: couldn't find fd !", set);
1158   }
1159   g_mutex_unlock (&((GstPoll *) set)->lock);
1160
1161   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1162
1163   return res;
1164 }
1165
1166 /**
1167  * gst_poll_wait:
1168  * @set: a #GstPoll.
1169  * @timeout: a timeout in nanoseconds.
1170  *
1171  * Wait for activity on the file descriptors in @set. This function waits up to
1172  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1173  *
1174  * For #GstPoll objects created with gst_poll_new(), this function can only be
1175  * called from a single thread at a time.  If called from multiple threads,
1176  * -1 will be returned with errno set to EPERM.
1177  *
1178  * This is not true for timer #GstPoll objects created with
1179  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1180  * simultaneously.
1181  *
1182  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1183  * activity was detected after @timeout. If an error occurs, -1 is returned
1184  * and errno is set.
1185  */
1186 gint
1187 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1188 {
1189   gboolean restarting;
1190   gboolean is_timer;
1191   int res;
1192   gint old_waiting;
1193
1194   g_return_val_if_fail (set != NULL, -1);
1195
1196   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1197
1198   is_timer = set->timer;
1199
1200   /* add one more waiter */
1201   old_waiting = INC_WAITING (set);
1202
1203   /* we cannot wait from multiple threads unless we are a timer */
1204   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1205     goto already_waiting;
1206
1207   /* flushing, exit immediately */
1208   if (G_UNLIKELY (IS_FLUSHING (set)))
1209     goto flushing;
1210
1211   do {
1212     GstPollMode mode;
1213
1214     res = -1;
1215     restarting = FALSE;
1216
1217     mode = choose_mode (set, timeout);
1218
1219     if (TEST_REBUILD (set)) {
1220       g_mutex_lock (&set->lock);
1221 #ifndef G_OS_WIN32
1222       g_array_set_size (set->active_fds, set->fds->len);
1223       memcpy (set->active_fds->data, set->fds->data,
1224           set->fds->len * sizeof (struct pollfd));
1225 #else
1226       if (!gst_poll_prepare_winsock_active_sets (set))
1227         goto winsock_error;
1228 #endif
1229       g_mutex_unlock (&set->lock);
1230     }
1231
1232     switch (mode) {
1233       case GST_POLL_MODE_AUTO:
1234         g_assert_not_reached ();
1235         break;
1236       case GST_POLL_MODE_PPOLL:
1237       {
1238 #ifdef HAVE_PPOLL
1239         struct timespec ts;
1240         struct timespec *tsptr;
1241
1242         if (timeout != GST_CLOCK_TIME_NONE) {
1243           GST_TIME_TO_TIMESPEC (timeout, ts);
1244           tsptr = &ts;
1245         } else {
1246           tsptr = NULL;
1247         }
1248
1249         res =
1250             ppoll ((struct pollfd *) set->active_fds->data,
1251             set->active_fds->len, tsptr, NULL);
1252 #else
1253         g_assert_not_reached ();
1254         errno = ENOSYS;
1255 #endif
1256         break;
1257       }
1258       case GST_POLL_MODE_POLL:
1259       {
1260 #ifdef HAVE_POLL
1261         gint t;
1262
1263         if (timeout != GST_CLOCK_TIME_NONE) {
1264           t = GST_TIME_AS_MSECONDS (timeout);
1265         } else {
1266           t = -1;
1267         }
1268
1269         res =
1270             poll ((struct pollfd *) set->active_fds->data,
1271             set->active_fds->len, t);
1272 #else
1273         g_assert_not_reached ();
1274         errno = ENOSYS;
1275 #endif
1276         break;
1277       }
1278       case GST_POLL_MODE_PSELECT:
1279 #ifndef HAVE_PSELECT
1280       {
1281         g_assert_not_reached ();
1282         errno = ENOSYS;
1283         break;
1284       }
1285 #endif
1286       case GST_POLL_MODE_SELECT:
1287       {
1288 #ifndef G_OS_WIN32
1289         fd_set readfds;
1290         fd_set writefds;
1291         fd_set errorfds;
1292         gint max_fd;
1293
1294         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1295
1296         if (mode == GST_POLL_MODE_SELECT) {
1297           struct timeval tv;
1298           struct timeval *tvptr;
1299
1300           if (timeout != GST_CLOCK_TIME_NONE) {
1301             GST_TIME_TO_TIMEVAL (timeout, tv);
1302             tvptr = &tv;
1303           } else {
1304             tvptr = NULL;
1305           }
1306
1307           GST_DEBUG ("Calling select");
1308           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1309           GST_DEBUG ("After select, res:%d", res);
1310         } else {
1311 #ifdef HAVE_PSELECT
1312           struct timespec ts;
1313           struct timespec *tsptr;
1314
1315           if (timeout != GST_CLOCK_TIME_NONE) {
1316             GST_TIME_TO_TIMESPEC (timeout, ts);
1317             tsptr = &ts;
1318           } else {
1319             tsptr = NULL;
1320           }
1321
1322           GST_DEBUG ("Calling pselect");
1323           res =
1324               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1325           GST_DEBUG ("After pselect, res:%d", res);
1326 #endif
1327         }
1328
1329         if (res >= 0) {
1330           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1331         }
1332 #else /* G_OS_WIN32 */
1333         g_assert_not_reached ();
1334         errno = ENOSYS;
1335 #endif
1336         break;
1337       }
1338       case GST_POLL_MODE_WINDOWS:
1339       {
1340 #ifdef G_OS_WIN32
1341         gint ignore_count = set->active_fds_ignored->len;
1342         DWORD t, wait_ret;
1343
1344         if (G_LIKELY (ignore_count == 0)) {
1345           if (timeout != GST_CLOCK_TIME_NONE)
1346             t = GST_TIME_AS_MSECONDS (timeout);
1347           else
1348             t = INFINITE;
1349         } else {
1350           /* already one or more ignored fds, so we quickly sweep the others */
1351           t = 0;
1352         }
1353
1354         if (set->active_events->len != 0) {
1355           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1356               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1357         } else {
1358           wait_ret = WSA_WAIT_FAILED;
1359           WSASetLastError (WSA_INVALID_PARAMETER);
1360         }
1361
1362         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1363           res = 0;
1364         } else if (wait_ret == WSA_WAIT_FAILED) {
1365           res = -1;
1366           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1367         } else {
1368           /* the first entry is the wakeup event */
1369           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1370             res = gst_poll_collect_winsock_events (set);
1371           } else {
1372             res = 1;            /* wakeup event */
1373           }
1374         }
1375 #else
1376         g_assert_not_reached ();
1377         errno = ENOSYS;
1378 #endif
1379         break;
1380       }
1381     }
1382
1383     if (!is_timer) {
1384       /* Applications needs to clear the control socket themselves for timer
1385        * polls.
1386        * For other polls, we need to clear the control socket. If there was only
1387        * one socket with activity and it was the control socket, we need to
1388        * restart */
1389       if (release_all_wakeup (set) > 0 && res == 1)
1390         restarting = TRUE;
1391     }
1392
1393     /* we got woken up and we are flushing, we need to stop */
1394     if (G_UNLIKELY (IS_FLUSHING (set)))
1395       goto flushing;
1396
1397   } while (G_UNLIKELY (restarting));
1398
1399   DEC_WAITING (set);
1400
1401   return res;
1402
1403   /* ERRORS */
1404 already_waiting:
1405   {
1406     GST_LOG ("%p: we are already waiting", set);
1407     DEC_WAITING (set);
1408     errno = EPERM;
1409     return -1;
1410   }
1411 flushing:
1412   {
1413     GST_LOG ("%p: we are flushing", set);
1414     DEC_WAITING (set);
1415     errno = EBUSY;
1416     return -1;
1417   }
1418 #ifdef G_OS_WIN32
1419 winsock_error:
1420   {
1421     GST_LOG ("%p: winsock error", set);
1422     g_mutex_unlock (&set->lock);
1423     DEC_WAITING (set);
1424     return -1;
1425   }
1426 #endif
1427 }
1428
1429 /**
1430  * gst_poll_set_controllable:
1431  * @set: a #GstPoll.
1432  * @controllable: new controllable state.
1433  *
1434  * When @controllable is %TRUE, this function ensures that future calls to
1435  * gst_poll_wait() will be affected by gst_poll_restart() and
1436  * gst_poll_set_flushing().
1437  *
1438  * Returns: %TRUE if the controllability of @set could be updated.
1439  */
1440 gboolean
1441 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1442 {
1443   g_return_val_if_fail (set != NULL, FALSE);
1444
1445   GST_LOG ("%p: controllable : %d", set, controllable);
1446
1447   set->controllable = controllable;
1448
1449   return TRUE;
1450 }
1451
1452 /**
1453  * gst_poll_restart:
1454  * @set: a #GstPoll.
1455  *
1456  * Restart any gst_poll_wait() that is in progress. This function is typically
1457  * used after adding or removing descriptors to @set.
1458  *
1459  * If @set is not controllable, then this call will have no effect.
1460  */
1461 void
1462 gst_poll_restart (GstPoll * set)
1463 {
1464   g_return_if_fail (set != NULL);
1465
1466   if (set->controllable && GET_WAITING (set) > 0) {
1467     /* we are controllable and waiting, wake up the waiter. The socket will be
1468      * cleared by the _wait() thread and the poll will be restarted */
1469     raise_wakeup (set);
1470   }
1471 }
1472
1473 /**
1474  * gst_poll_set_flushing:
1475  * @set: a #GstPoll.
1476  * @flushing: new flushing state.
1477  *
1478  * When @flushing is %TRUE, this function ensures that current and future calls
1479  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1480  *
1481  * Unsetting the flushing state will restore normal operation of @set.
1482  */
1483 void
1484 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1485 {
1486   g_return_if_fail (set != NULL);
1487
1488   GST_LOG ("%p: flushing: %d", set, flushing);
1489
1490   /* update the new state first */
1491   SET_FLUSHING (set, flushing);
1492
1493   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1494     /* we are flushing, controllable and waiting, wake up the waiter. When we
1495      * stop the flushing operation we don't clear the wakeup fd here, this will
1496      * happen in the _wait() thread. */
1497     raise_wakeup (set);
1498   }
1499 }
1500
1501 /**
1502  * gst_poll_write_control:
1503  * @set: a #GstPoll.
1504  *
1505  * Write a byte to the control socket of the controllable @set.
1506  * This function is mostly useful for timer #GstPoll objects created with
1507  * gst_poll_new_timer(). 
1508  *
1509  * It will make any current and future gst_poll_wait() function return with
1510  * 1, meaning the control socket is set. After an equal amount of calls to
1511  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1512  * block again until their timeout expired.
1513  *
1514  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1515  * byte could not be written.
1516  */
1517 gboolean
1518 gst_poll_write_control (GstPoll * set)
1519 {
1520   gboolean res;
1521
1522   g_return_val_if_fail (set != NULL, FALSE);
1523   g_return_val_if_fail (set->timer, FALSE);
1524
1525   res = raise_wakeup (set);
1526
1527   return res;
1528 }
1529
1530 /**
1531  * gst_poll_read_control:
1532  * @set: a #GstPoll.
1533  *
1534  * Read a byte from the control socket of the controllable @set.
1535  * This function is mostly useful for timer #GstPoll objects created with
1536  * gst_poll_new_timer(). 
1537  *
1538  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1539  * was no byte to read.
1540  */
1541 gboolean
1542 gst_poll_read_control (GstPoll * set)
1543 {
1544   gboolean res;
1545
1546   g_return_val_if_fail (set != NULL, FALSE);
1547   g_return_val_if_fail (set->timer, FALSE);
1548
1549   res = release_wakeup (set);
1550
1551   return res;
1552 }