poll: improve debug
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancellable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 /* OS/X needs this because of bad headers */
88 #include <string.h>
89
90 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
91  * so we prefer our own poll emulation.
92  */
93 #if defined(BROKEN_POLL)
94 #undef HAVE_POLL
95 #endif
96
97 #include "gstpoll.h"
98
99 #define GST_CAT_DEFAULT GST_CAT_POLL
100
101 #ifdef G_OS_WIN32
102 typedef struct _WinsockFd WinsockFd;
103
104 struct _WinsockFd
105 {
106   gint fd;
107   glong event_mask;
108   WSANETWORKEVENTS events;
109   glong ignored_event_mask;
110 };
111 #endif
112
113 typedef enum
114 {
115   GST_POLL_MODE_AUTO,
116   GST_POLL_MODE_SELECT,
117   GST_POLL_MODE_PSELECT,
118   GST_POLL_MODE_POLL,
119   GST_POLL_MODE_PPOLL,
120   GST_POLL_MODE_WINDOWS
121 } GstPollMode;
122
123 struct _GstPoll
124 {
125   GstPollMode mode;
126
127   GMutex lock;
128   /* array of fds, always written to and read from with lock */
129   GArray *fds;
130   /* array of active fds, only written to from the waiting thread with the
131    * lock and read from with the lock or without the lock from the waiting
132    * thread */
133   GArray *active_fds;
134
135 #ifndef G_OS_WIN32
136   gchar buf[1];
137   GstPollFD control_read_fd;
138   GstPollFD control_write_fd;
139 #else
140   GArray *active_fds_ignored;
141   GArray *events;
142   GArray *active_events;
143
144   HANDLE wakeup_event;
145 #endif
146
147   gboolean controllable;
148   volatile gint waiting;
149   volatile gint control_pending;
150   volatile gint flushing;
151   gboolean timer;
152   volatile gint rebuild;
153 };
154
155 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
156     gboolean active);
157 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
158
159 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
160 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
161
162 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
163 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
164 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
165
166 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
167 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
168
169 #ifndef G_OS_WIN32
170 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
171 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
172 #else
173 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
174 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
175 #endif
176
177 /* the poll/select call is also performed on a control socket, that way
178  * we can send special commands to control it */
179 static inline gboolean
180 raise_wakeup (GstPoll * set)
181 {
182   gboolean result = TRUE;
183
184   if (g_atomic_int_add (&set->control_pending, 1) == 0) {
185     /* raise when nothing pending */
186     GST_LOG ("%p: raise", set);
187     result = WAKE_EVENT (set);
188   }
189   return result;
190 }
191
192 /* note how bad things can happen when the 2 threads both raise and release the
193  * wakeup. This is however not a problem because you must always pair a raise
194  * with a release */
195 static inline gboolean
196 release_wakeup (GstPoll * set)
197 {
198   gboolean result = TRUE;
199
200   if (g_atomic_int_dec_and_test (&set->control_pending)) {
201     GST_LOG ("%p: release", set);
202     result = RELEASE_EVENT (set);
203   }
204   return result;
205 }
206
207 static inline gint
208 release_all_wakeup (GstPoll * set)
209 {
210   gint old;
211
212   while (TRUE) {
213     if (!(old = g_atomic_int_get (&set->control_pending)))
214       /* nothing pending, just exit */
215       break;
216
217     /* try to remove all pending control messages */
218     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
219       /* we managed to remove all messages, read the control socket */
220       if (RELEASE_EVENT (set))
221         break;
222       else
223         /* retry again until we read it successfully */
224         g_atomic_int_add (&set->control_pending, 1);
225     }
226   }
227   return old;
228 }
229
230 static gint
231 find_index (GArray * array, GstPollFD * fd)
232 {
233 #ifndef G_OS_WIN32
234   struct pollfd *ifd;
235 #else
236   WinsockFd *ifd;
237 #endif
238   guint i;
239
240   /* start by assuming the index found in the fd is still valid */
241   if (fd->idx >= 0 && fd->idx < array->len) {
242 #ifndef G_OS_WIN32
243     ifd = &g_array_index (array, struct pollfd, fd->idx);
244 #else
245     ifd = &g_array_index (array, WinsockFd, fd->idx);
246 #endif
247
248     if (ifd->fd == fd->fd) {
249       return fd->idx;
250     }
251   }
252
253   /* the pollfd array has changed and we need to lookup the fd again */
254   for (i = 0; i < array->len; i++) {
255 #ifndef G_OS_WIN32
256     ifd = &g_array_index (array, struct pollfd, i);
257 #else
258     ifd = &g_array_index (array, WinsockFd, i);
259 #endif
260
261     if (ifd->fd == fd->fd) {
262       fd->idx = (gint) i;
263       return fd->idx;
264     }
265   }
266
267   fd->idx = -1;
268   return fd->idx;
269 }
270
271 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
272 /* check if all file descriptors will fit in an fd_set */
273 static gboolean
274 selectable_fds (GstPoll * set)
275 {
276   guint i;
277
278   g_mutex_lock (&set->lock);
279   for (i = 0; i < set->fds->len; i++) {
280     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
281
282     if (pfd->fd >= FD_SETSIZE)
283       goto too_many;
284   }
285   g_mutex_unlock (&set->lock);
286
287   return TRUE;
288
289 too_many:
290   {
291     g_mutex_unlock (&set->lock);
292     return FALSE;
293   }
294 }
295
296 /* check if the timeout will convert to a timeout value used for poll()
297  * without a loss of precision
298  */
299 static gboolean
300 pollable_timeout (GstClockTime timeout)
301 {
302   if (timeout == GST_CLOCK_TIME_NONE)
303     return TRUE;
304
305   /* not a nice multiple of milliseconds */
306   if (timeout % 1000000)
307     return FALSE;
308
309   return TRUE;
310 }
311 #endif
312
313 static GstPollMode
314 choose_mode (GstPoll * set, GstClockTime timeout)
315 {
316   GstPollMode mode;
317
318   if (set->mode == GST_POLL_MODE_AUTO) {
319 #ifdef HAVE_PPOLL
320     mode = GST_POLL_MODE_PPOLL;
321 #elif defined(HAVE_POLL)
322     if (!selectable_fds (set) || pollable_timeout (timeout)) {
323       mode = GST_POLL_MODE_POLL;
324     } else {
325 #ifdef HAVE_PSELECT
326       mode = GST_POLL_MODE_PSELECT;
327 #else
328       mode = GST_POLL_MODE_SELECT;
329 #endif
330     }
331 #elif defined(HAVE_PSELECT)
332     mode = GST_POLL_MODE_PSELECT;
333 #else
334     mode = GST_POLL_MODE_SELECT;
335 #endif
336   } else {
337     mode = set->mode;
338   }
339   return mode;
340 }
341
342 #ifndef G_OS_WIN32
343 static gint
344 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
345     fd_set * errorfds)
346 {
347   gint max_fd = -1;
348   guint i;
349
350   FD_ZERO (readfds);
351   FD_ZERO (writefds);
352   FD_ZERO (errorfds);
353
354   g_mutex_lock (&set->lock);
355
356   for (i = 0; i < set->active_fds->len; i++) {
357     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
358
359     if (pfd->fd < FD_SETSIZE) {
360       if (pfd->events & POLLIN)
361         FD_SET (pfd->fd, readfds);
362       if (pfd->events & POLLOUT)
363         FD_SET (pfd->fd, writefds);
364       if (pfd->events)
365         FD_SET (pfd->fd, errorfds);
366       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
367         max_fd = pfd->fd;
368     }
369   }
370
371   g_mutex_unlock (&set->lock);
372
373   return max_fd;
374 }
375
376 static void
377 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
378     fd_set * errorfds)
379 {
380   guint i;
381
382   g_mutex_lock (&set->lock);
383
384   for (i = 0; i < set->active_fds->len; i++) {
385     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
386
387     if (pfd->fd < FD_SETSIZE) {
388       pfd->revents = 0;
389       if (FD_ISSET (pfd->fd, readfds))
390         pfd->revents |= POLLIN;
391       if (FD_ISSET (pfd->fd, writefds))
392         pfd->revents |= POLLOUT;
393       if (FD_ISSET (pfd->fd, errorfds))
394         pfd->revents |= POLLERR;
395     }
396   }
397
398   g_mutex_unlock (&set->lock);
399 }
400 #else /* G_OS_WIN32 */
401 /*
402  * Translate errors thrown by the Winsock API used by GstPoll:
403  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
404  */
405 static gint
406 gst_poll_winsock_error_to_errno (DWORD last_error)
407 {
408   switch (last_error) {
409     case WSA_INVALID_HANDLE:
410     case WSAEINVAL:
411     case WSAENOTSOCK:
412       return EBADF;
413
414     case WSA_NOT_ENOUGH_MEMORY:
415       return ENOMEM;
416
417       /*
418        * Anything else, including:
419        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
420        *   WSANOTINITIALISED
421        */
422     default:
423       return EINVAL;
424   }
425 }
426
427 static void
428 gst_poll_free_winsock_event (GstPoll * set, gint idx)
429 {
430   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
431   HANDLE event = g_array_index (set->events, HANDLE, idx);
432
433   WSAEventSelect (wfd->fd, event, 0);
434   CloseHandle (event);
435 }
436
437 static void
438 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
439     gboolean active)
440 {
441   WinsockFd *wfd;
442
443   wfd = &g_array_index (set->fds, WinsockFd, idx);
444
445   if (active)
446     wfd->event_mask |= flags;
447   else
448     wfd->event_mask &= ~flags;
449
450   /* reset ignored state if the new mask doesn't overlap at all */
451   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
452     wfd->ignored_event_mask = 0;
453 }
454
455 static gboolean
456 gst_poll_prepare_winsock_active_sets (GstPoll * set)
457 {
458   guint i;
459
460   g_array_set_size (set->active_fds, 0);
461   g_array_set_size (set->active_fds_ignored, 0);
462   g_array_set_size (set->active_events, 0);
463   g_array_append_val (set->active_events, set->wakeup_event);
464
465   for (i = 0; i < set->fds->len; i++) {
466     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
467     HANDLE event = g_array_index (set->events, HANDLE, i);
468
469     if (wfd->ignored_event_mask == 0) {
470       gint ret;
471
472       g_array_append_val (set->active_fds, *wfd);
473       g_array_append_val (set->active_events, event);
474
475       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
476       if (G_UNLIKELY (ret != 0)) {
477         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
478         return FALSE;
479       }
480     } else {
481       g_array_append_val (set->active_fds_ignored, wfd);
482     }
483   }
484
485   return TRUE;
486 }
487
488 static gint
489 gst_poll_collect_winsock_events (GstPoll * set)
490 {
491   gint res, i;
492
493   /*
494    * We need to check which events are signaled, and call
495    * WSAEnumNetworkEvents for those that are, which resets
496    * the event and clears the internal network event records.
497    */
498   res = 0;
499   for (i = 0; i < set->active_fds->len; i++) {
500     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
501     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
502     DWORD wait_ret;
503
504     wait_ret = WaitForSingleObject (event, 0);
505     if (wait_ret == WAIT_OBJECT_0) {
506       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
507
508       if (G_UNLIKELY (enum_ret != 0)) {
509         res = -1;
510         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
511         break;
512       }
513
514       res++;
515     } else {
516       /* clear any previously stored result */
517       memset (&wfd->events, 0, sizeof (wfd->events));
518     }
519   }
520
521   /* If all went well we also need to reset the ignored fds. */
522   if (res >= 0) {
523     res += set->active_fds_ignored->len;
524
525     for (i = 0; i < set->active_fds_ignored->len; i++) {
526       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
527
528       wfd->ignored_event_mask = 0;
529     }
530
531     g_array_set_size (set->active_fds_ignored, 0);
532   }
533
534   return res;
535 }
536 #endif
537
538 /**
539  * gst_poll_new: (skip)
540  * @controllable: whether it should be possible to control a wait.
541  *
542  * Create a new file descriptor set. If @controllable, it
543  * is possible to restart or flush a call to gst_poll_wait() with
544  * gst_poll_restart() and gst_poll_set_flushing() respectively.
545  *
546  * Free-function: gst_poll_free
547  *
548  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
549  *     Free with gst_poll_free().
550  */
551 GstPoll *
552 gst_poll_new (gboolean controllable)
553 {
554   GstPoll *nset;
555
556   nset = g_slice_new0 (GstPoll);
557   GST_DEBUG ("%p: new controllable : %d", nset, controllable);
558   g_mutex_init (&nset->lock);
559 #ifndef G_OS_WIN32
560   nset->mode = GST_POLL_MODE_AUTO;
561   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
562   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
563   nset->control_read_fd.fd = -1;
564   nset->control_write_fd.fd = -1;
565   {
566     gint control_sock[2];
567
568     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
569       goto no_socket_pair;
570
571     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
572     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
573
574     nset->control_read_fd.fd = control_sock[0];
575     nset->control_write_fd.fd = control_sock[1];
576
577     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
578     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
579   }
580 #else
581   nset->mode = GST_POLL_MODE_WINDOWS;
582   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
583   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
584   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
585   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
586   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
587
588   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
589 #endif
590
591   /* ensure (re)build, though already sneakily set in non-windows case */
592   MARK_REBUILD (nset);
593
594   nset->controllable = controllable;
595
596   return nset;
597
598   /* ERRORS */
599 #ifndef G_OS_WIN32
600 no_socket_pair:
601   {
602     GST_WARNING ("%p: can't create socket pair !", nset);
603     gst_poll_free (nset);
604     return NULL;
605   }
606 #endif
607 }
608
609 /**
610  * gst_poll_new_timer: (skip)
611  *
612  * Create a new poll object that can be used for scheduling cancellable
613  * timeouts.
614  *
615  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
616  * performed from different threads. 
617  *
618  * Free-function: gst_poll_free
619  *
620  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
621  *     Free with gst_poll_free().
622  */
623 GstPoll *
624 gst_poll_new_timer (void)
625 {
626   GstPoll *poll;
627
628   /* make a new controllable poll set */
629   if (!(poll = gst_poll_new (TRUE)))
630     goto done;
631
632   /* we are a timer */
633   poll->timer = TRUE;
634
635 done:
636   return poll;
637 }
638
639 /**
640  * gst_poll_free:
641  * @set: (transfer full): a file descriptor set.
642  *
643  * Free a file descriptor set.
644  */
645 void
646 gst_poll_free (GstPoll * set)
647 {
648   g_return_if_fail (set != NULL);
649
650   GST_DEBUG ("%p: freeing", set);
651
652 #ifndef G_OS_WIN32
653   if (set->control_write_fd.fd >= 0)
654     close (set->control_write_fd.fd);
655   if (set->control_read_fd.fd >= 0)
656     close (set->control_read_fd.fd);
657 #else
658   CloseHandle (set->wakeup_event);
659
660   {
661     guint i;
662
663     for (i = 0; i < set->events->len; i++)
664       gst_poll_free_winsock_event (set, i);
665   }
666
667   g_array_free (set->active_events, TRUE);
668   g_array_free (set->events, TRUE);
669   g_array_free (set->active_fds_ignored, TRUE);
670 #endif
671
672   g_array_free (set->active_fds, TRUE);
673   g_array_free (set->fds, TRUE);
674   g_mutex_clear (&set->lock);
675   g_slice_free (GstPoll, set);
676 }
677
678 /**
679  * gst_poll_get_read_gpollfd:
680  * @set: a #GstPoll
681  * @fd: a #GPollFD
682  *
683  * Get a GPollFD for the reading part of the control socket. This is useful when
684  * integrating with a GSource and GMainLoop.
685  */
686 void
687 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
688 {
689   g_return_if_fail (set != NULL);
690   g_return_if_fail (fd != NULL);
691
692 #ifndef G_OS_WIN32
693   fd->fd = set->control_read_fd.fd;
694 #else
695 #if GLIB_SIZEOF_VOID_P == 8
696   fd->fd = (gint64) set->wakeup_event;
697 #else
698   fd->fd = (gint) set->wakeup_event;
699 #endif
700 #endif
701   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
702   fd->revents = 0;
703 }
704
705 /**
706  * gst_poll_fd_init:
707  * @fd: a #GstPollFD
708  *
709  * Initializes @fd. Alternatively you can initialize it with
710  * #GST_POLL_FD_INIT.
711  */
712 void
713 gst_poll_fd_init (GstPollFD * fd)
714 {
715   g_return_if_fail (fd != NULL);
716
717   fd->fd = -1;
718   fd->idx = -1;
719 }
720
721 static gboolean
722 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
723 {
724   gint idx;
725
726   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
727
728   idx = find_index (set->fds, fd);
729   if (idx < 0) {
730 #ifndef G_OS_WIN32
731     struct pollfd nfd;
732
733     nfd.fd = fd->fd;
734     nfd.events = POLLERR | POLLNVAL | POLLHUP;
735     nfd.revents = 0;
736
737     g_array_append_val (set->fds, nfd);
738
739     fd->idx = set->fds->len - 1;
740 #else
741     WinsockFd wfd;
742     HANDLE event;
743
744     wfd.fd = fd->fd;
745     wfd.event_mask = FD_CLOSE;
746     memset (&wfd.events, 0, sizeof (wfd.events));
747     wfd.ignored_event_mask = 0;
748     event = WSACreateEvent ();
749
750     g_array_append_val (set->fds, wfd);
751     g_array_append_val (set->events, event);
752
753     fd->idx = set->fds->len - 1;
754 #endif
755     MARK_REBUILD (set);
756   } else {
757     GST_WARNING ("%p: fd already added !", set);
758   }
759
760   return TRUE;
761 }
762
763 /**
764  * gst_poll_add_fd:
765  * @set: a file descriptor set.
766  * @fd: a file descriptor.
767  *
768  * Add a file descriptor to the file descriptor set.
769  *
770  * Returns: %TRUE if the file descriptor was successfully added to the set.
771  */
772 gboolean
773 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
774 {
775   gboolean ret;
776
777   g_return_val_if_fail (set != NULL, FALSE);
778   g_return_val_if_fail (fd != NULL, FALSE);
779   g_return_val_if_fail (fd->fd >= 0, FALSE);
780
781   g_mutex_lock (&set->lock);
782
783   ret = gst_poll_add_fd_unlocked (set, fd);
784
785   g_mutex_unlock (&set->lock);
786
787   return ret;
788 }
789
790 /**
791  * gst_poll_remove_fd:
792  * @set: a file descriptor set.
793  * @fd: a file descriptor.
794  *
795  * Remove a file descriptor from the file descriptor set.
796  *
797  * Returns: %TRUE if the file descriptor was successfully removed from the set.
798  */
799 gboolean
800 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
801 {
802   gint idx;
803
804   g_return_val_if_fail (set != NULL, FALSE);
805   g_return_val_if_fail (fd != NULL, FALSE);
806   g_return_val_if_fail (fd->fd >= 0, FALSE);
807
808
809   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
810
811   g_mutex_lock (&set->lock);
812
813   /* get the index, -1 is an fd that is not added */
814   idx = find_index (set->fds, fd);
815   if (idx >= 0) {
816 #ifdef G_OS_WIN32
817     gst_poll_free_winsock_event (set, idx);
818     g_array_remove_index_fast (set->events, idx);
819 #endif
820
821     /* remove the fd at index, we use _remove_index_fast, which copies the last
822      * element of the array to the freed index */
823     g_array_remove_index_fast (set->fds, idx);
824
825     /* mark fd as removed by setting the index to -1 */
826     fd->idx = -1;
827     MARK_REBUILD (set);
828   } else {
829     GST_WARNING ("%p: couldn't find fd !", set);
830   }
831
832   g_mutex_unlock (&set->lock);
833
834   return idx >= 0;
835 }
836
837 /**
838  * gst_poll_fd_ctl_write:
839  * @set: a file descriptor set.
840  * @fd: a file descriptor.
841  * @active: a new status.
842  *
843  * Control whether the descriptor @fd in @set will be monitored for
844  * writability.
845  *
846  * Returns: %TRUE if the descriptor was successfully updated.
847  */
848 gboolean
849 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
850 {
851   gint idx;
852
853   g_return_val_if_fail (set != NULL, FALSE);
854   g_return_val_if_fail (fd != NULL, FALSE);
855   g_return_val_if_fail (fd->fd >= 0, FALSE);
856
857   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
858       fd->fd, fd->idx, active);
859
860   g_mutex_lock (&set->lock);
861
862   idx = find_index (set->fds, fd);
863   if (idx >= 0) {
864 #ifndef G_OS_WIN32
865     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
866
867     if (active)
868       pfd->events |= POLLOUT;
869     else
870       pfd->events &= ~POLLOUT;
871
872     GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
873 #else
874     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
875         active);
876 #endif
877     MARK_REBUILD (set);
878   } else {
879     GST_WARNING ("%p: couldn't find fd !", set);
880   }
881
882   g_mutex_unlock (&set->lock);
883
884   return idx >= 0;
885 }
886
887 static gboolean
888 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
889 {
890   gint idx;
891
892   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
893       fd->fd, fd->idx, active);
894
895   idx = find_index (set->fds, fd);
896
897   if (idx >= 0) {
898 #ifndef G_OS_WIN32
899     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
900
901     if (active)
902       pfd->events |= (POLLIN | POLLPRI);
903     else
904       pfd->events &= ~(POLLIN | POLLPRI);
905 #else
906     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
907 #endif
908     MARK_REBUILD (set);
909   } else {
910     GST_WARNING ("%p: couldn't find fd !", set);
911   }
912
913   return idx >= 0;
914 }
915
916 /**
917  * gst_poll_fd_ctl_read:
918  * @set: a file descriptor set.
919  * @fd: a file descriptor.
920  * @active: a new status.
921  *
922  * Control whether the descriptor @fd in @set will be monitored for
923  * readability.
924  *
925  * Returns: %TRUE if the descriptor was successfully updated.
926  */
927 gboolean
928 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
929 {
930   gboolean ret;
931
932   g_return_val_if_fail (set != NULL, FALSE);
933   g_return_val_if_fail (fd != NULL, FALSE);
934   g_return_val_if_fail (fd->fd >= 0, FALSE);
935
936   g_mutex_lock (&set->lock);
937
938   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
939
940   g_mutex_unlock (&set->lock);
941
942   return ret;
943 }
944
945 /**
946  * gst_poll_fd_ignored:
947  * @set: a file descriptor set.
948  * @fd: a file descriptor.
949  *
950  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
951  * the same result for @fd as last time. This function must be called if no
952  * operation (read/write/recv/send/etc.) will be performed on @fd before
953  * the next call to gst_poll_wait().
954  *
955  * The reason why this is needed is because the underlying implementation
956  * might not allow querying the fd more than once between calls to one of
957  * the re-enabling operations.
958  */
959 void
960 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
961 {
962 #ifdef G_OS_WIN32
963   gint idx;
964
965   g_return_if_fail (set != NULL);
966   g_return_if_fail (fd != NULL);
967   g_return_if_fail (fd->fd >= 0);
968
969   g_mutex_lock (&set->lock);
970
971   idx = find_index (set->fds, fd);
972   if (idx >= 0) {
973     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
974
975     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
976     MARK_REBUILD (set);
977   }
978
979   g_mutex_unlock (&set->lock);
980 #endif
981 }
982
983 /**
984  * gst_poll_fd_has_closed:
985  * @set: a file descriptor set.
986  * @fd: a file descriptor.
987  *
988  * Check if @fd in @set has closed the connection.
989  *
990  * Returns: %TRUE if the connection was closed.
991  */
992 gboolean
993 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
994 {
995   gboolean res = FALSE;
996   gint idx;
997
998   g_return_val_if_fail (set != NULL, FALSE);
999   g_return_val_if_fail (fd != NULL, FALSE);
1000   g_return_val_if_fail (fd->fd >= 0, FALSE);
1001
1002   g_mutex_lock (&((GstPoll *) set)->lock);
1003
1004   idx = find_index (set->active_fds, fd);
1005   if (idx >= 0) {
1006 #ifndef G_OS_WIN32
1007     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1008
1009     res = (pfd->revents & POLLHUP) != 0;
1010 #else
1011     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1012
1013     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1014 #endif
1015   } else {
1016     GST_WARNING ("%p: couldn't find fd !", set);
1017   }
1018   g_mutex_unlock (&((GstPoll *) set)->lock);
1019
1020   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1021
1022   return res;
1023 }
1024
1025 /**
1026  * gst_poll_fd_has_error:
1027  * @set: a file descriptor set.
1028  * @fd: a file descriptor.
1029  *
1030  * Check if @fd in @set has an error.
1031  *
1032  * Returns: %TRUE if the descriptor has an error.
1033  */
1034 gboolean
1035 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1036 {
1037   gboolean res = FALSE;
1038   gint idx;
1039
1040   g_return_val_if_fail (set != NULL, FALSE);
1041   g_return_val_if_fail (fd != NULL, FALSE);
1042   g_return_val_if_fail (fd->fd >= 0, FALSE);
1043
1044   g_mutex_lock (&((GstPoll *) set)->lock);
1045
1046   idx = find_index (set->active_fds, fd);
1047   if (idx >= 0) {
1048 #ifndef G_OS_WIN32
1049     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1050
1051     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1052 #else
1053     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1054
1055     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1056         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1057         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1058         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1059         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1060 #endif
1061   } else {
1062     GST_WARNING ("%p: couldn't find fd !", set);
1063   }
1064   g_mutex_unlock (&((GstPoll *) set)->lock);
1065
1066   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1067
1068   return res;
1069 }
1070
1071 static gboolean
1072 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1073 {
1074   gboolean res = FALSE;
1075   gint idx;
1076
1077   idx = find_index (set->active_fds, fd);
1078   if (idx >= 0) {
1079 #ifndef G_OS_WIN32
1080     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1081
1082     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1083 #else
1084     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1085
1086     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1087 #endif
1088   } else {
1089     GST_WARNING ("%p: couldn't find fd !", set);
1090   }
1091   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1092
1093   return res;
1094 }
1095
1096 /**
1097  * gst_poll_fd_can_read:
1098  * @set: a file descriptor set.
1099  * @fd: a file descriptor.
1100  *
1101  * Check if @fd in @set has data to be read.
1102  *
1103  * Returns: %TRUE if the descriptor has data to be read.
1104  */
1105 gboolean
1106 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1107 {
1108   gboolean res = FALSE;
1109
1110   g_return_val_if_fail (set != NULL, FALSE);
1111   g_return_val_if_fail (fd != NULL, FALSE);
1112   g_return_val_if_fail (fd->fd >= 0, FALSE);
1113
1114   g_mutex_lock (&((GstPoll *) set)->lock);
1115
1116   res = gst_poll_fd_can_read_unlocked (set, fd);
1117
1118   g_mutex_unlock (&((GstPoll *) set)->lock);
1119
1120   return res;
1121 }
1122
1123 /**
1124  * gst_poll_fd_can_write:
1125  * @set: a file descriptor set.
1126  * @fd: a file descriptor.
1127  *
1128  * Check if @fd in @set can be used for writing.
1129  *
1130  * Returns: %TRUE if the descriptor can be used for writing.
1131  */
1132 gboolean
1133 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1134 {
1135   gboolean res = FALSE;
1136   gint idx;
1137
1138   g_return_val_if_fail (set != NULL, FALSE);
1139   g_return_val_if_fail (fd != NULL, FALSE);
1140   g_return_val_if_fail (fd->fd >= 0, FALSE);
1141
1142   g_mutex_lock (&((GstPoll *) set)->lock);
1143
1144   idx = find_index (set->active_fds, fd);
1145   if (idx >= 0) {
1146 #ifndef G_OS_WIN32
1147     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1148
1149     res = (pfd->revents & POLLOUT) != 0;
1150 #else
1151     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1152
1153     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1154 #endif
1155   } else {
1156     GST_WARNING ("%p: couldn't find fd !", set);
1157   }
1158   g_mutex_unlock (&((GstPoll *) set)->lock);
1159
1160   GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1161
1162   return res;
1163 }
1164
1165 /**
1166  * gst_poll_wait:
1167  * @set: a #GstPoll.
1168  * @timeout: a timeout in nanoseconds.
1169  *
1170  * Wait for activity on the file descriptors in @set. This function waits up to
1171  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1172  *
1173  * For #GstPoll objects created with gst_poll_new(), this function can only be
1174  * called from a single thread at a time.  If called from multiple threads,
1175  * -1 will be returned with errno set to EPERM.
1176  *
1177  * This is not true for timer #GstPoll objects created with
1178  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1179  * simultaneously.
1180  *
1181  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1182  * activity was detected after @timeout. If an error occurs, -1 is returned
1183  * and errno is set.
1184  */
1185 gint
1186 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1187 {
1188   gboolean restarting;
1189   gboolean is_timer;
1190   int res;
1191   gint old_waiting;
1192
1193   g_return_val_if_fail (set != NULL, -1);
1194
1195   GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1196
1197   is_timer = set->timer;
1198
1199   /* add one more waiter */
1200   old_waiting = INC_WAITING (set);
1201
1202   /* we cannot wait from multiple threads unless we are a timer */
1203   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1204     goto already_waiting;
1205
1206   /* flushing, exit immediately */
1207   if (G_UNLIKELY (IS_FLUSHING (set)))
1208     goto flushing;
1209
1210   do {
1211     GstPollMode mode;
1212
1213     res = -1;
1214     restarting = FALSE;
1215
1216     mode = choose_mode (set, timeout);
1217
1218     if (TEST_REBUILD (set)) {
1219       g_mutex_lock (&set->lock);
1220 #ifndef G_OS_WIN32
1221       g_array_set_size (set->active_fds, set->fds->len);
1222       memcpy (set->active_fds->data, set->fds->data,
1223           set->fds->len * sizeof (struct pollfd));
1224 #else
1225       if (!gst_poll_prepare_winsock_active_sets (set))
1226         goto winsock_error;
1227 #endif
1228       g_mutex_unlock (&set->lock);
1229     }
1230
1231     switch (mode) {
1232       case GST_POLL_MODE_AUTO:
1233         g_assert_not_reached ();
1234         break;
1235       case GST_POLL_MODE_PPOLL:
1236       {
1237 #ifdef HAVE_PPOLL
1238         struct timespec ts;
1239         struct timespec *tsptr;
1240
1241         if (timeout != GST_CLOCK_TIME_NONE) {
1242           GST_TIME_TO_TIMESPEC (timeout, ts);
1243           tsptr = &ts;
1244         } else {
1245           tsptr = NULL;
1246         }
1247
1248         res =
1249             ppoll ((struct pollfd *) set->active_fds->data,
1250             set->active_fds->len, tsptr, NULL);
1251 #else
1252         g_assert_not_reached ();
1253         errno = ENOSYS;
1254 #endif
1255         break;
1256       }
1257       case GST_POLL_MODE_POLL:
1258       {
1259 #ifdef HAVE_POLL
1260         gint t;
1261
1262         if (timeout != GST_CLOCK_TIME_NONE) {
1263           t = GST_TIME_AS_MSECONDS (timeout);
1264         } else {
1265           t = -1;
1266         }
1267
1268         res =
1269             poll ((struct pollfd *) set->active_fds->data,
1270             set->active_fds->len, t);
1271 #else
1272         g_assert_not_reached ();
1273         errno = ENOSYS;
1274 #endif
1275         break;
1276       }
1277       case GST_POLL_MODE_PSELECT:
1278 #ifndef HAVE_PSELECT
1279       {
1280         g_assert_not_reached ();
1281         errno = ENOSYS;
1282         break;
1283       }
1284 #endif
1285       case GST_POLL_MODE_SELECT:
1286       {
1287 #ifndef G_OS_WIN32
1288         fd_set readfds;
1289         fd_set writefds;
1290         fd_set errorfds;
1291         gint max_fd;
1292
1293         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1294
1295         if (mode == GST_POLL_MODE_SELECT) {
1296           struct timeval tv;
1297           struct timeval *tvptr;
1298
1299           if (timeout != GST_CLOCK_TIME_NONE) {
1300             GST_TIME_TO_TIMEVAL (timeout, tv);
1301             tvptr = &tv;
1302           } else {
1303             tvptr = NULL;
1304           }
1305
1306           GST_DEBUG ("%p: Calling select", set);
1307           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1308           GST_DEBUG ("%p: After select, res:%d", set, res);
1309         } else {
1310 #ifdef HAVE_PSELECT
1311           struct timespec ts;
1312           struct timespec *tsptr;
1313
1314           if (timeout != GST_CLOCK_TIME_NONE) {
1315             GST_TIME_TO_TIMESPEC (timeout, ts);
1316             tsptr = &ts;
1317           } else {
1318             tsptr = NULL;
1319           }
1320
1321           GST_DEBUG ("%p: Calling pselect", set);
1322           res =
1323               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1324           GST_DEBUG ("%p: After pselect, res:%d", set, res);
1325 #endif
1326         }
1327
1328         if (res >= 0) {
1329           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1330         }
1331 #else /* G_OS_WIN32 */
1332         g_assert_not_reached ();
1333         errno = ENOSYS;
1334 #endif
1335         break;
1336       }
1337       case GST_POLL_MODE_WINDOWS:
1338       {
1339 #ifdef G_OS_WIN32
1340         gint ignore_count = set->active_fds_ignored->len;
1341         DWORD t, wait_ret;
1342
1343         if (G_LIKELY (ignore_count == 0)) {
1344           if (timeout != GST_CLOCK_TIME_NONE)
1345             t = GST_TIME_AS_MSECONDS (timeout);
1346           else
1347             t = INFINITE;
1348         } else {
1349           /* already one or more ignored fds, so we quickly sweep the others */
1350           t = 0;
1351         }
1352
1353         if (set->active_events->len != 0) {
1354           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1355               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1356         } else {
1357           wait_ret = WSA_WAIT_FAILED;
1358           WSASetLastError (WSA_INVALID_PARAMETER);
1359         }
1360
1361         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1362           res = 0;
1363         } else if (wait_ret == WSA_WAIT_FAILED) {
1364           res = -1;
1365           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1366         } else {
1367           /* the first entry is the wakeup event */
1368           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1369             res = gst_poll_collect_winsock_events (set);
1370           } else {
1371             res = 1;            /* wakeup event */
1372           }
1373         }
1374 #else
1375         g_assert_not_reached ();
1376         errno = ENOSYS;
1377 #endif
1378         break;
1379       }
1380     }
1381
1382     if (!is_timer) {
1383       /* Applications needs to clear the control socket themselves for timer
1384        * polls.
1385        * For other polls, we need to clear the control socket. If there was only
1386        * one socket with activity and it was the control socket, we need to
1387        * restart */
1388       if (release_all_wakeup (set) > 0 && res == 1)
1389         restarting = TRUE;
1390     }
1391
1392     /* we got woken up and we are flushing, we need to stop */
1393     if (G_UNLIKELY (IS_FLUSHING (set)))
1394       goto flushing;
1395
1396   } while (G_UNLIKELY (restarting));
1397
1398   DEC_WAITING (set);
1399
1400   return res;
1401
1402   /* ERRORS */
1403 already_waiting:
1404   {
1405     GST_LOG ("%p: we are already waiting", set);
1406     DEC_WAITING (set);
1407     errno = EPERM;
1408     return -1;
1409   }
1410 flushing:
1411   {
1412     GST_LOG ("%p: we are flushing", set);
1413     DEC_WAITING (set);
1414     errno = EBUSY;
1415     return -1;
1416   }
1417 #ifdef G_OS_WIN32
1418 winsock_error:
1419   {
1420     GST_LOG ("%p: winsock error", set);
1421     g_mutex_unlock (&set->lock);
1422     DEC_WAITING (set);
1423     return -1;
1424   }
1425 #endif
1426 }
1427
1428 /**
1429  * gst_poll_set_controllable:
1430  * @set: a #GstPoll.
1431  * @controllable: new controllable state.
1432  *
1433  * When @controllable is %TRUE, this function ensures that future calls to
1434  * gst_poll_wait() will be affected by gst_poll_restart() and
1435  * gst_poll_set_flushing().
1436  *
1437  * Returns: %TRUE if the controllability of @set could be updated.
1438  */
1439 gboolean
1440 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1441 {
1442   g_return_val_if_fail (set != NULL, FALSE);
1443
1444   GST_LOG ("%p: controllable : %d", set, controllable);
1445
1446   set->controllable = controllable;
1447
1448   return TRUE;
1449 }
1450
1451 /**
1452  * gst_poll_restart:
1453  * @set: a #GstPoll.
1454  *
1455  * Restart any gst_poll_wait() that is in progress. This function is typically
1456  * used after adding or removing descriptors to @set.
1457  *
1458  * If @set is not controllable, then this call will have no effect.
1459  */
1460 void
1461 gst_poll_restart (GstPoll * set)
1462 {
1463   g_return_if_fail (set != NULL);
1464
1465   if (set->controllable && GET_WAITING (set) > 0) {
1466     /* we are controllable and waiting, wake up the waiter. The socket will be
1467      * cleared by the _wait() thread and the poll will be restarted */
1468     raise_wakeup (set);
1469   }
1470 }
1471
1472 /**
1473  * gst_poll_set_flushing:
1474  * @set: a #GstPoll.
1475  * @flushing: new flushing state.
1476  *
1477  * When @flushing is %TRUE, this function ensures that current and future calls
1478  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1479  *
1480  * Unsetting the flushing state will restore normal operation of @set.
1481  */
1482 void
1483 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1484 {
1485   g_return_if_fail (set != NULL);
1486
1487   GST_LOG ("%p: flushing: %d", set, flushing);
1488
1489   /* update the new state first */
1490   SET_FLUSHING (set, flushing);
1491
1492   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1493     /* we are flushing, controllable and waiting, wake up the waiter. When we
1494      * stop the flushing operation we don't clear the wakeup fd here, this will
1495      * happen in the _wait() thread. */
1496     raise_wakeup (set);
1497   }
1498 }
1499
1500 /**
1501  * gst_poll_write_control:
1502  * @set: a #GstPoll.
1503  *
1504  * Write a byte to the control socket of the controllable @set.
1505  * This function is mostly useful for timer #GstPoll objects created with
1506  * gst_poll_new_timer(). 
1507  *
1508  * It will make any current and future gst_poll_wait() function return with
1509  * 1, meaning the control socket is set. After an equal amount of calls to
1510  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1511  * block again until their timeout expired.
1512  *
1513  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1514  * byte could not be written.
1515  */
1516 gboolean
1517 gst_poll_write_control (GstPoll * set)
1518 {
1519   gboolean res;
1520
1521   g_return_val_if_fail (set != NULL, FALSE);
1522   g_return_val_if_fail (set->timer, FALSE);
1523
1524   res = raise_wakeup (set);
1525
1526   return res;
1527 }
1528
1529 /**
1530  * gst_poll_read_control:
1531  * @set: a #GstPoll.
1532  *
1533  * Read a byte from the control socket of the controllable @set.
1534  * This function is mostly useful for timer #GstPoll objects created with
1535  * gst_poll_new_timer(). 
1536  *
1537  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1538  * was no byte to read.
1539  */
1540 gboolean
1541 gst_poll_read_control (GstPoll * set)
1542 {
1543   gboolean res;
1544
1545   g_return_val_if_fail (set != NULL, FALSE);
1546   g_return_val_if_fail (set->timer, FALSE);
1547
1548   res = release_wakeup (set);
1549
1550   return res;
1551 }