Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writeable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writeable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writeable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60
61 #include <sys/types.h>
62
63 #ifdef HAVE_UNISTD_H
64 #include <unistd.h>
65 #endif
66
67 #include <errno.h>
68 #include <fcntl.h>
69
70 #include <glib.h>
71
72 #ifdef G_OS_WIN32
73 #include <winsock2.h>
74 #define EINPROGRESS WSAEINPROGRESS
75 #else
76 #define _GNU_SOURCE 1
77 #include <sys/poll.h>
78 #include <sys/time.h>
79 #include <sys/socket.h>
80 #endif
81
82 /* OS/X needs this because of bad headers */
83 #include <string.h>
84
85 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
86  * so we prefer our own poll emulation.
87  */
88 #if defined(BROKEN_POLL)
89 #undef HAVE_POLL
90 #endif
91
92 #include "gstpoll.h"
93
94 #define GST_CAT_DEFAULT GST_CAT_POLL
95
96 #ifdef G_OS_WIN32
97 typedef struct _WinsockFd WinsockFd;
98
99 struct _WinsockFd
100 {
101   gint fd;
102   glong event_mask;
103   WSANETWORKEVENTS events;
104   glong ignored_event_mask;
105 };
106 #endif
107
108 typedef enum
109 {
110   GST_POLL_MODE_AUTO,
111   GST_POLL_MODE_SELECT,
112   GST_POLL_MODE_PSELECT,
113   GST_POLL_MODE_POLL,
114   GST_POLL_MODE_PPOLL,
115   GST_POLL_MODE_WINDOWS
116 } GstPollMode;
117
118 struct _GstPoll
119 {
120   GstPollMode mode;
121
122   GMutex *lock;
123   /* array of fds, always written to and read from with lock */
124   GArray *fds;
125   /* array of active fds, only written to from the waiting thread with the
126    * lock and read from with the lock or without the lock from the waiting
127    * thread */
128   GArray *active_fds;
129
130 #ifndef G_OS_WIN32
131   gchar buf[1];
132   GstPollFD control_read_fd;
133   GstPollFD control_write_fd;
134 #else
135   GArray *active_fds_ignored;
136   GArray *events;
137   GArray *active_events;
138
139   HANDLE wakeup_event;
140 #endif
141
142   gboolean controllable;
143   volatile gint waiting;
144   volatile gint control_pending;
145   volatile gint flushing;
146   gboolean timer;
147   volatile gint rebuild;
148 };
149
150 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
151     gboolean active);
152 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
153
154 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
155 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
156
157 #define INC_WAITING(s)      (g_atomic_int_exchange_and_add(&(s)->waiting, 1))
158 #define DEC_WAITING(s)      (g_atomic_int_exchange_and_add(&(s)->waiting, -1))
159 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
160
161 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
162 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
163
164 #ifndef G_OS_WIN32
165 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
166 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
167 #else
168 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event))
169 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
170 #endif
171
172 /* the poll/select call is also performed on a control socket, that way
173  * we can send special commands to control it */
174 static inline gboolean
175 raise_wakeup (GstPoll * set)
176 {
177   gboolean result = TRUE;
178
179   if (g_atomic_int_exchange_and_add (&set->control_pending, 1) == 0) {
180     /* raise when nothing pending */
181     result = WAKE_EVENT (set);
182   }
183   return result;
184 }
185
186 /* note how bad things can happen when the 2 threads both raise and release the
187  * wakeup. This is however not a problem because you must always pair a raise
188  * with a release */
189 static inline gboolean
190 release_wakeup (GstPoll * set)
191 {
192   gboolean result = TRUE;
193
194   if (g_atomic_int_dec_and_test (&set->control_pending)) {
195     result = RELEASE_EVENT (set);
196   }
197   return result;
198 }
199
200 static inline gint
201 release_all_wakeup (GstPoll * set)
202 {
203   gint old;
204
205   while (TRUE) {
206     if (!(old = g_atomic_int_get (&set->control_pending)))
207       /* nothing pending, just exit */
208       break;
209
210     /* try to remove all pending control messages */
211     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
212       /* we managed to remove all messages, read the control socket */
213       if (RELEASE_EVENT (set))
214         break;
215       else
216         /* retry again until we read it successfully */
217         g_atomic_int_exchange_and_add (&set->control_pending, 1);
218     }
219   }
220   return old;
221 }
222
223 static gint
224 find_index (GArray * array, GstPollFD * fd)
225 {
226 #ifndef G_OS_WIN32
227   struct pollfd *ifd;
228 #else
229   WinsockFd *ifd;
230 #endif
231   guint i;
232
233   /* start by assuming the index found in the fd is still valid */
234   if (fd->idx >= 0 && fd->idx < array->len) {
235 #ifndef G_OS_WIN32
236     ifd = &g_array_index (array, struct pollfd, fd->idx);
237 #else
238     ifd = &g_array_index (array, WinsockFd, fd->idx);
239 #endif
240
241     if (ifd->fd == fd->fd) {
242       return fd->idx;
243     }
244   }
245
246   /* the pollfd array has changed and we need to lookup the fd again */
247   for (i = 0; i < array->len; i++) {
248 #ifndef G_OS_WIN32
249     ifd = &g_array_index (array, struct pollfd, i);
250 #else
251     ifd = &g_array_index (array, WinsockFd, i);
252 #endif
253
254     if (ifd->fd == fd->fd) {
255       fd->idx = (gint) i;
256       return fd->idx;
257     }
258   }
259
260   fd->idx = -1;
261   return fd->idx;
262 }
263
264 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
265 /* check if all file descriptors will fit in an fd_set */
266 static gboolean
267 selectable_fds (const GstPoll * set)
268 {
269   guint i;
270
271   g_mutex_lock (set->lock);
272   for (i = 0; i < set->fds->len; i++) {
273     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
274
275     if (pfd->fd >= FD_SETSIZE)
276       goto too_many;
277   }
278   g_mutex_unlock (set->lock);
279
280   return TRUE;
281
282 too_many:
283   {
284     g_mutex_unlock (set->lock);
285     return FALSE;
286   }
287 }
288
289 /* check if the timeout will convert to a timeout value used for poll()
290  * without a loss of precision
291  */
292 static gboolean
293 pollable_timeout (GstClockTime timeout)
294 {
295   if (timeout == GST_CLOCK_TIME_NONE)
296     return TRUE;
297
298   /* not a nice multiple of milliseconds */
299   if (timeout % 1000000)
300     return FALSE;
301
302   return TRUE;
303 }
304 #endif
305
306 static GstPollMode
307 choose_mode (const GstPoll * set, GstClockTime timeout)
308 {
309   GstPollMode mode;
310
311   if (set->mode == GST_POLL_MODE_AUTO) {
312 #ifdef HAVE_PPOLL
313     mode = GST_POLL_MODE_PPOLL;
314 #elif defined(HAVE_POLL)
315     if (!selectable_fds (set) || pollable_timeout (timeout)) {
316       mode = GST_POLL_MODE_POLL;
317     } else {
318 #ifdef HAVE_PSELECT
319       mode = GST_POLL_MODE_PSELECT;
320 #else
321       mode = GST_POLL_MODE_SELECT;
322 #endif
323     }
324 #elif defined(HAVE_PSELECT)
325     mode = GST_POLL_MODE_PSELECT;
326 #else
327     mode = GST_POLL_MODE_SELECT;
328 #endif
329   } else {
330     mode = set->mode;
331   }
332   return mode;
333 }
334
335 #ifndef G_OS_WIN32
336 static gint
337 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
338     fd_set * errorfds)
339 {
340   gint max_fd = -1;
341   guint i;
342
343   FD_ZERO (readfds);
344   FD_ZERO (writefds);
345   FD_ZERO (errorfds);
346
347   g_mutex_lock (set->lock);
348
349   for (i = 0; i < set->active_fds->len; i++) {
350     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
351
352     if (pfd->fd < FD_SETSIZE) {
353       if (pfd->events & POLLIN)
354         FD_SET (pfd->fd, readfds);
355       if (pfd->events & POLLOUT)
356         FD_SET (pfd->fd, writefds);
357       if (pfd->events)
358         FD_SET (pfd->fd, errorfds);
359       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
360         max_fd = pfd->fd;
361     }
362   }
363
364   g_mutex_unlock (set->lock);
365
366   return max_fd;
367 }
368
369 static void
370 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
371     fd_set * errorfds)
372 {
373   guint i;
374
375   g_mutex_lock (set->lock);
376
377   for (i = 0; i < set->active_fds->len; i++) {
378     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
379
380     if (pfd->fd < FD_SETSIZE) {
381       pfd->revents = 0;
382       if (FD_ISSET (pfd->fd, readfds))
383         pfd->revents |= POLLIN;
384       if (FD_ISSET (pfd->fd, writefds))
385         pfd->revents |= POLLOUT;
386       if (FD_ISSET (pfd->fd, errorfds))
387         pfd->revents |= POLLERR;
388     }
389   }
390
391   g_mutex_unlock (set->lock);
392 }
393 #else /* G_OS_WIN32 */
394 /*
395  * Translate errors thrown by the Winsock API used by GstPoll:
396  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
397  */
398 static gint
399 gst_poll_winsock_error_to_errno (DWORD last_error)
400 {
401   switch (last_error) {
402     case WSA_INVALID_HANDLE:
403     case WSAEINVAL:
404     case WSAENOTSOCK:
405       return EBADF;
406
407     case WSA_NOT_ENOUGH_MEMORY:
408       return ENOMEM;
409
410       /*
411        * Anything else, including:
412        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
413        *   WSANOTINITIALISED
414        */
415     default:
416       return EINVAL;
417   }
418 }
419
420 static void
421 gst_poll_free_winsock_event (GstPoll * set, gint idx)
422 {
423   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
424   HANDLE event = g_array_index (set->events, HANDLE, idx);
425
426   WSAEventSelect (wfd->fd, event, 0);
427   CloseHandle (event);
428 }
429
430 static void
431 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
432     gboolean active)
433 {
434   WinsockFd *wfd;
435
436   wfd = &g_array_index (set->fds, WinsockFd, idx);
437
438   if (active)
439     wfd->event_mask |= flags;
440   else
441     wfd->event_mask &= ~flags;
442
443   /* reset ignored state if the new mask doesn't overlap at all */
444   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
445     wfd->ignored_event_mask = 0;
446 }
447
448 static gboolean
449 gst_poll_prepare_winsock_active_sets (GstPoll * set)
450 {
451   guint i;
452
453   g_array_set_size (set->active_fds, 0);
454   g_array_set_size (set->active_fds_ignored, 0);
455   g_array_set_size (set->active_events, 0);
456   g_array_append_val (set->active_events, set->wakeup_event);
457
458   for (i = 0; i < set->fds->len; i++) {
459     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
460     HANDLE event = g_array_index (set->events, HANDLE, i);
461
462     if (wfd->ignored_event_mask == 0) {
463       gint ret;
464
465       g_array_append_val (set->active_fds, *wfd);
466       g_array_append_val (set->active_events, event);
467
468       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
469       if (G_UNLIKELY (ret != 0)) {
470         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
471         return FALSE;
472       }
473     } else {
474       g_array_append_val (set->active_fds_ignored, wfd);
475     }
476   }
477
478   return TRUE;
479 }
480
481 static gint
482 gst_poll_collect_winsock_events (GstPoll * set)
483 {
484   gint res, i;
485
486   /*
487    * We need to check which events are signaled, and call
488    * WSAEnumNetworkEvents for those that are, which resets
489    * the event and clears the internal network event records.
490    */
491   res = 0;
492   for (i = 0; i < set->active_fds->len; i++) {
493     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
494     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
495     DWORD wait_ret;
496
497     wait_ret = WaitForSingleObject (event, 0);
498     if (wait_ret == WAIT_OBJECT_0) {
499       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
500
501       if (G_UNLIKELY (enum_ret != 0)) {
502         res = -1;
503         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
504         break;
505       }
506
507       res++;
508     } else {
509       /* clear any previously stored result */
510       memset (&wfd->events, 0, sizeof (wfd->events));
511     }
512   }
513
514   /* If all went well we also need to reset the ignored fds. */
515   if (res >= 0) {
516     res += set->active_fds_ignored->len;
517
518     for (i = 0; i < set->active_fds_ignored->len; i++) {
519       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
520
521       wfd->ignored_event_mask = 0;
522     }
523
524     g_array_set_size (set->active_fds_ignored, 0);
525   }
526
527   return res;
528 }
529 #endif
530
531 /**
532  * gst_poll_new:
533  * @controllable: whether it should be possible to control a wait.
534  *
535  * Create a new file descriptor set. If @controllable, it
536  * is possible to restart or flush a call to gst_poll_wait() with
537  * gst_poll_restart() and gst_poll_set_flushing() respectively.
538  *
539  * Free-function: gst_poll_free
540  *
541  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
542  *     Free with gst_poll_free().
543  *
544  * Since: 0.10.18
545  */
546 GstPoll *
547 gst_poll_new (gboolean controllable)
548 {
549   GstPoll *nset;
550
551   GST_DEBUG ("controllable : %d", controllable);
552
553   nset = g_slice_new0 (GstPoll);
554   nset->lock = g_mutex_new ();
555 #ifndef G_OS_WIN32
556   nset->mode = GST_POLL_MODE_AUTO;
557   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
558   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
559   nset->control_read_fd.fd = -1;
560   nset->control_write_fd.fd = -1;
561   {
562     gint control_sock[2];
563
564     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
565       goto no_socket_pair;
566
567     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
568     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
569
570     nset->control_read_fd.fd = control_sock[0];
571     nset->control_write_fd.fd = control_sock[1];
572
573     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
574     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
575   }
576 #else
577   nset->mode = GST_POLL_MODE_WINDOWS;
578   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
579   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
580   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
581   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
582   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
583
584   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
585 #endif
586
587   /* ensure (re)build, though already sneakily set in non-windows case */
588   MARK_REBUILD (nset);
589
590   nset->controllable = controllable;
591
592   return nset;
593
594   /* ERRORS */
595 #ifndef G_OS_WIN32
596 no_socket_pair:
597   {
598     GST_WARNING ("%p: can't create socket pair !", nset);
599     gst_poll_free (nset);
600     return NULL;
601   }
602 #endif
603 }
604
605 /**
606  * gst_poll_new_timer:
607  *
608  * Create a new poll object that can be used for scheduling cancellable
609  * timeouts.
610  *
611  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
612  * performed from different threads. 
613  *
614  * Free-function: gst_poll_free
615  *
616  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
617  *     Free with gst_poll_free().
618  *
619  * Since: 0.10.23
620  */
621 GstPoll *
622 gst_poll_new_timer (void)
623 {
624   GstPoll *poll;
625
626   /* make a new controllable poll set */
627   if (!(poll = gst_poll_new (TRUE)))
628     goto done;
629
630   /* we are a timer */
631   poll->timer = TRUE;
632
633 done:
634   return poll;
635 }
636
637 /**
638  * gst_poll_free:
639  * @set: (transfer full): a file descriptor set.
640  *
641  * Free a file descriptor set.
642  *
643  * Since: 0.10.18
644  */
645 void
646 gst_poll_free (GstPoll * set)
647 {
648   g_return_if_fail (set != NULL);
649
650   GST_DEBUG ("%p: freeing", set);
651
652 #ifndef G_OS_WIN32
653   if (set->control_write_fd.fd >= 0)
654     close (set->control_write_fd.fd);
655   if (set->control_read_fd.fd >= 0)
656     close (set->control_read_fd.fd);
657 #else
658   CloseHandle (set->wakeup_event);
659
660   {
661     guint i;
662
663     for (i = 0; i < set->events->len; i++)
664       gst_poll_free_winsock_event (set, i);
665   }
666
667   g_array_free (set->active_events, TRUE);
668   g_array_free (set->events, TRUE);
669   g_array_free (set->active_fds_ignored, TRUE);
670 #endif
671
672   g_array_free (set->active_fds, TRUE);
673   g_array_free (set->fds, TRUE);
674   g_mutex_free (set->lock);
675   g_slice_free (GstPoll, set);
676 }
677
678 /**
679  * gst_poll_get_read_gpollfd:
680  * @set: a #GstPoll
681  * @fd: a #GPollFD
682  *
683  * Get a GPollFD for the reading part of the control socket. This is useful when
684  * integrating with a GSource and GMainLoop.
685  *
686  * Since: 0.10.32
687  */
688 void
689 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
690 {
691   g_return_if_fail (set != NULL);
692   g_return_if_fail (fd != NULL);
693
694 #ifndef G_OS_WIN32
695   fd->fd = set->control_read_fd.fd;
696 #else
697 #if GLIB_SIZEOF_VOID_P == 8
698   fd->fd = (gint64) set->wakeup_event;
699 #else
700   fd->fd = (gint) set->wakeup_event;
701 #endif
702 #endif
703   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
704   fd->revents = 0;
705 }
706
707 /**
708  * gst_poll_fd_init:
709  * @fd: a #GstPollFD
710  *
711  * Initializes @fd. Alternatively you can initialize it with
712  * #GST_POLL_FD_INIT.
713  *
714  * Since: 0.10.18
715  */
716 void
717 gst_poll_fd_init (GstPollFD * fd)
718 {
719   g_return_if_fail (fd != NULL);
720
721   fd->fd = -1;
722   fd->idx = -1;
723 }
724
725 static gboolean
726 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
727 {
728   gint idx;
729
730   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
731
732   idx = find_index (set->fds, fd);
733   if (idx < 0) {
734 #ifndef G_OS_WIN32
735     struct pollfd nfd;
736
737     nfd.fd = fd->fd;
738     nfd.events = POLLERR | POLLNVAL | POLLHUP;
739     nfd.revents = 0;
740
741     g_array_append_val (set->fds, nfd);
742
743     fd->idx = set->fds->len - 1;
744 #else
745     WinsockFd wfd;
746     HANDLE event;
747
748     wfd.fd = fd->fd;
749     wfd.event_mask = FD_CLOSE;
750     memset (&wfd.events, 0, sizeof (wfd.events));
751     wfd.ignored_event_mask = 0;
752     event = WSACreateEvent ();
753
754     g_array_append_val (set->fds, wfd);
755     g_array_append_val (set->events, event);
756
757     fd->idx = set->fds->len - 1;
758 #endif
759     MARK_REBUILD (set);
760   } else {
761     GST_WARNING ("%p: couldn't find fd !", set);
762   }
763
764   return TRUE;
765 }
766
767 /**
768  * gst_poll_add_fd:
769  * @set: a file descriptor set.
770  * @fd: a file descriptor.
771  *
772  * Add a file descriptor to the file descriptor set.
773  *
774  * Returns: %TRUE if the file descriptor was successfully added to the set.
775  *
776  * Since: 0.10.18
777  */
778 gboolean
779 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
780 {
781   gboolean ret;
782
783   g_return_val_if_fail (set != NULL, FALSE);
784   g_return_val_if_fail (fd != NULL, FALSE);
785   g_return_val_if_fail (fd->fd >= 0, FALSE);
786
787   g_mutex_lock (set->lock);
788
789   ret = gst_poll_add_fd_unlocked (set, fd);
790
791   g_mutex_unlock (set->lock);
792
793   return ret;
794 }
795
796 /**
797  * gst_poll_remove_fd:
798  * @set: a file descriptor set.
799  * @fd: a file descriptor.
800  *
801  * Remove a file descriptor from the file descriptor set.
802  *
803  * Returns: %TRUE if the file descriptor was successfully removed from the set.
804  *
805  * Since: 0.10.18
806  */
807 gboolean
808 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
809 {
810   gint idx;
811
812   g_return_val_if_fail (set != NULL, FALSE);
813   g_return_val_if_fail (fd != NULL, FALSE);
814   g_return_val_if_fail (fd->fd >= 0, FALSE);
815
816
817   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
818
819   g_mutex_lock (set->lock);
820
821   /* get the index, -1 is an fd that is not added */
822   idx = find_index (set->fds, fd);
823   if (idx >= 0) {
824 #ifdef G_OS_WIN32
825     gst_poll_free_winsock_event (set, idx);
826     g_array_remove_index_fast (set->events, idx);
827 #endif
828
829     /* remove the fd at index, we use _remove_index_fast, which copies the last
830      * element of the array to the freed index */
831     g_array_remove_index_fast (set->fds, idx);
832
833     /* mark fd as removed by setting the index to -1 */
834     fd->idx = -1;
835     MARK_REBUILD (set);
836   } else {
837     GST_WARNING ("%p: couldn't find fd !", set);
838   }
839
840   g_mutex_unlock (set->lock);
841
842   return idx >= 0;
843 }
844
845 /**
846  * gst_poll_fd_ctl_write:
847  * @set: a file descriptor set.
848  * @fd: a file descriptor.
849  * @active: a new status.
850  *
851  * Control whether the descriptor @fd in @set will be monitored for
852  * writability.
853  *
854  * Returns: %TRUE if the descriptor was successfully updated.
855  *
856  * Since: 0.10.18
857  */
858 gboolean
859 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
860 {
861   gint idx;
862
863   g_return_val_if_fail (set != NULL, FALSE);
864   g_return_val_if_fail (fd != NULL, FALSE);
865   g_return_val_if_fail (fd->fd >= 0, FALSE);
866
867   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
868       fd->fd, fd->idx, active);
869
870   g_mutex_lock (set->lock);
871
872   idx = find_index (set->fds, fd);
873   if (idx >= 0) {
874 #ifndef G_OS_WIN32
875     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
876
877     if (active)
878       pfd->events |= POLLOUT;
879     else
880       pfd->events &= ~POLLOUT;
881
882     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
883 #else
884     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
885         active);
886 #endif
887     MARK_REBUILD (set);
888   } else {
889     GST_WARNING ("%p: couldn't find fd !", set);
890   }
891
892   g_mutex_unlock (set->lock);
893
894   return idx >= 0;
895 }
896
897 static gboolean
898 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
899 {
900   gint idx;
901
902   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
903       fd->fd, fd->idx, active);
904
905   idx = find_index (set->fds, fd);
906
907   if (idx >= 0) {
908 #ifndef G_OS_WIN32
909     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
910
911     if (active)
912       pfd->events |= (POLLIN | POLLPRI);
913     else
914       pfd->events &= ~(POLLIN | POLLPRI);
915 #else
916     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
917 #endif
918     MARK_REBUILD (set);
919   } else {
920     GST_WARNING ("%p: couldn't find fd !", set);
921   }
922
923   return idx >= 0;
924 }
925
926 /**
927  * gst_poll_fd_ctl_read:
928  * @set: a file descriptor set.
929  * @fd: a file descriptor.
930  * @active: a new status.
931  *
932  * Control whether the descriptor @fd in @set will be monitored for
933  * readability.
934  *
935  * Returns: %TRUE if the descriptor was successfully updated.
936  *
937  * Since: 0.10.18
938  */
939 gboolean
940 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
941 {
942   gboolean ret;
943
944   g_return_val_if_fail (set != NULL, FALSE);
945   g_return_val_if_fail (fd != NULL, FALSE);
946   g_return_val_if_fail (fd->fd >= 0, FALSE);
947
948   g_mutex_lock (set->lock);
949
950   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
951
952   g_mutex_unlock (set->lock);
953
954   return ret;
955 }
956
957 /**
958  * gst_poll_fd_ignored:
959  * @set: a file descriptor set.
960  * @fd: a file descriptor.
961  *
962  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
963  * the same result for @fd as last time. This function must be called if no
964  * operation (read/write/recv/send/etc.) will be performed on @fd before
965  * the next call to gst_poll_wait().
966  *
967  * The reason why this is needed is because the underlying implementation
968  * might not allow querying the fd more than once between calls to one of
969  * the re-enabling operations.
970  *
971  * Since: 0.10.18
972  */
973 void
974 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
975 {
976 #ifdef G_OS_WIN32
977   gint idx;
978
979   g_return_if_fail (set != NULL);
980   g_return_if_fail (fd != NULL);
981   g_return_if_fail (fd->fd >= 0);
982
983   g_mutex_lock (set->lock);
984
985   idx = find_index (set->fds, fd);
986   if (idx >= 0) {
987     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
988
989     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
990     MARK_REBUILD (set);
991   }
992
993   g_mutex_unlock (set->lock);
994 #endif
995 }
996
997 /**
998  * gst_poll_fd_has_closed:
999  * @set: a file descriptor set.
1000  * @fd: a file descriptor.
1001  *
1002  * Check if @fd in @set has closed the connection.
1003  *
1004  * Returns: %TRUE if the connection was closed.
1005  *
1006  * Since: 0.10.18
1007  */
1008 gboolean
1009 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1010 {
1011   gboolean res = FALSE;
1012   gint idx;
1013
1014   g_return_val_if_fail (set != NULL, FALSE);
1015   g_return_val_if_fail (fd != NULL, FALSE);
1016   g_return_val_if_fail (fd->fd >= 0, FALSE);
1017
1018   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1019
1020   g_mutex_lock (set->lock);
1021
1022   idx = find_index (set->active_fds, fd);
1023   if (idx >= 0) {
1024 #ifndef G_OS_WIN32
1025     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1026
1027     res = (pfd->revents & POLLHUP) != 0;
1028 #else
1029     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1030
1031     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1032 #endif
1033   } else {
1034     GST_WARNING ("%p: couldn't find fd !", set);
1035   }
1036
1037   g_mutex_unlock (set->lock);
1038
1039   return res;
1040 }
1041
1042 /**
1043  * gst_poll_fd_has_error:
1044  * @set: a file descriptor set.
1045  * @fd: a file descriptor.
1046  *
1047  * Check if @fd in @set has an error.
1048  *
1049  * Returns: %TRUE if the descriptor has an error.
1050  *
1051  * Since: 0.10.18
1052  */
1053 gboolean
1054 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1055 {
1056   gboolean res = FALSE;
1057   gint idx;
1058
1059   g_return_val_if_fail (set != NULL, FALSE);
1060   g_return_val_if_fail (fd != NULL, FALSE);
1061   g_return_val_if_fail (fd->fd >= 0, FALSE);
1062
1063   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1064
1065   g_mutex_lock (set->lock);
1066
1067   idx = find_index (set->active_fds, fd);
1068   if (idx >= 0) {
1069 #ifndef G_OS_WIN32
1070     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1071
1072     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1073 #else
1074     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1075
1076     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1077         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1078         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1079         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1080         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1081 #endif
1082   } else {
1083     GST_WARNING ("%p: couldn't find fd !", set);
1084   }
1085
1086   g_mutex_unlock (set->lock);
1087
1088   return res;
1089 }
1090
1091 static gboolean
1092 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1093 {
1094   gboolean res = FALSE;
1095   gint idx;
1096
1097   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1098
1099   idx = find_index (set->active_fds, fd);
1100   if (idx >= 0) {
1101 #ifndef G_OS_WIN32
1102     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1103
1104     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1105 #else
1106     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1107
1108     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1109 #endif
1110   } else {
1111     GST_WARNING ("%p: couldn't find fd !", set);
1112   }
1113
1114   return res;
1115 }
1116
1117 /**
1118  * gst_poll_fd_can_read:
1119  * @set: a file descriptor set.
1120  * @fd: a file descriptor.
1121  *
1122  * Check if @fd in @set has data to be read.
1123  *
1124  * Returns: %TRUE if the descriptor has data to be read.
1125  *
1126  * Since: 0.10.18
1127  */
1128 gboolean
1129 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1130 {
1131   gboolean res = FALSE;
1132
1133   g_return_val_if_fail (set != NULL, FALSE);
1134   g_return_val_if_fail (fd != NULL, FALSE);
1135   g_return_val_if_fail (fd->fd >= 0, FALSE);
1136
1137   g_mutex_lock (set->lock);
1138
1139   res = gst_poll_fd_can_read_unlocked (set, fd);
1140
1141   g_mutex_unlock (set->lock);
1142
1143   return res;
1144 }
1145
1146 /**
1147  * gst_poll_fd_can_write:
1148  * @set: a file descriptor set.
1149  * @fd: a file descriptor.
1150  *
1151  * Check if @fd in @set can be used for writing.
1152  *
1153  * Returns: %TRUE if the descriptor can be used for writing.
1154  *
1155  * Since: 0.10.18
1156  */
1157 gboolean
1158 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1159 {
1160   gboolean res = FALSE;
1161   gint idx;
1162
1163   g_return_val_if_fail (set != NULL, FALSE);
1164   g_return_val_if_fail (fd != NULL, FALSE);
1165   g_return_val_if_fail (fd->fd >= 0, FALSE);
1166
1167   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1168
1169   g_mutex_lock (set->lock);
1170
1171   idx = find_index (set->active_fds, fd);
1172   if (idx >= 0) {
1173 #ifndef G_OS_WIN32
1174     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1175
1176     res = (pfd->revents & POLLOUT) != 0;
1177 #else
1178     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1179
1180     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1181 #endif
1182   } else {
1183     GST_WARNING ("%p: couldn't find fd !", set);
1184   }
1185
1186   g_mutex_unlock (set->lock);
1187
1188   return res;
1189 }
1190
1191 /**
1192  * gst_poll_wait:
1193  * @set: a #GstPoll.
1194  * @timeout: a timeout in nanoseconds.
1195  *
1196  * Wait for activity on the file descriptors in @set. This function waits up to
1197  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1198  *
1199  * For #GstPoll objects created with gst_poll_new(), this function can only be
1200  * called from a single thread at a time.  If called from multiple threads,
1201  * -1 will be returned with errno set to EPERM.
1202  *
1203  * This is not true for timer #GstPoll objects created with
1204  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1205  * simultaneously.
1206  *
1207  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1208  * activity was detected after @timeout. If an error occurs, -1 is returned
1209  * and errno is set.
1210  *
1211  * Since: 0.10.18
1212  */
1213 gint
1214 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1215 {
1216   gboolean restarting;
1217   gboolean is_timer;
1218   int res;
1219   gint old_waiting;
1220
1221   g_return_val_if_fail (set != NULL, -1);
1222
1223   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1224
1225   is_timer = set->timer;
1226
1227   /* add one more waiter */
1228   old_waiting = INC_WAITING (set);
1229
1230   /* we cannot wait from multiple threads unless we are a timer */
1231   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1232     goto already_waiting;
1233
1234   /* flushing, exit immediatly */
1235   if (G_UNLIKELY (IS_FLUSHING (set)))
1236     goto flushing;
1237
1238   do {
1239     GstPollMode mode;
1240
1241     res = -1;
1242     restarting = FALSE;
1243
1244     mode = choose_mode (set, timeout);
1245
1246     if (TEST_REBUILD (set)) {
1247       g_mutex_lock (set->lock);
1248 #ifndef G_OS_WIN32
1249       g_array_set_size (set->active_fds, set->fds->len);
1250       memcpy (set->active_fds->data, set->fds->data,
1251           set->fds->len * sizeof (struct pollfd));
1252 #else
1253       if (!gst_poll_prepare_winsock_active_sets (set))
1254         goto winsock_error;
1255 #endif
1256       g_mutex_unlock (set->lock);
1257     }
1258
1259     switch (mode) {
1260       case GST_POLL_MODE_AUTO:
1261         g_assert_not_reached ();
1262         break;
1263       case GST_POLL_MODE_PPOLL:
1264       {
1265 #ifdef HAVE_PPOLL
1266         struct timespec ts;
1267         struct timespec *tsptr;
1268
1269         if (timeout != GST_CLOCK_TIME_NONE) {
1270           GST_TIME_TO_TIMESPEC (timeout, ts);
1271           tsptr = &ts;
1272         } else {
1273           tsptr = NULL;
1274         }
1275
1276         res =
1277             ppoll ((struct pollfd *) set->active_fds->data,
1278             set->active_fds->len, tsptr, NULL);
1279 #else
1280         g_assert_not_reached ();
1281         errno = ENOSYS;
1282 #endif
1283         break;
1284       }
1285       case GST_POLL_MODE_POLL:
1286       {
1287 #ifdef HAVE_POLL
1288         gint t;
1289
1290         if (timeout != GST_CLOCK_TIME_NONE) {
1291           t = GST_TIME_AS_MSECONDS (timeout);
1292         } else {
1293           t = -1;
1294         }
1295
1296         res =
1297             poll ((struct pollfd *) set->active_fds->data,
1298             set->active_fds->len, t);
1299 #else
1300         g_assert_not_reached ();
1301         errno = ENOSYS;
1302 #endif
1303         break;
1304       }
1305       case GST_POLL_MODE_PSELECT:
1306 #ifndef HAVE_PSELECT
1307       {
1308         g_assert_not_reached ();
1309         errno = ENOSYS;
1310         break;
1311       }
1312 #endif
1313       case GST_POLL_MODE_SELECT:
1314       {
1315 #ifndef G_OS_WIN32
1316         fd_set readfds;
1317         fd_set writefds;
1318         fd_set errorfds;
1319         gint max_fd;
1320
1321         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1322
1323         if (mode == GST_POLL_MODE_SELECT) {
1324           struct timeval tv;
1325           struct timeval *tvptr;
1326
1327           if (timeout != GST_CLOCK_TIME_NONE) {
1328             GST_TIME_TO_TIMEVAL (timeout, tv);
1329             tvptr = &tv;
1330           } else {
1331             tvptr = NULL;
1332           }
1333
1334           GST_DEBUG ("Calling select");
1335           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1336           GST_DEBUG ("After select, res:%d", res);
1337         } else {
1338 #ifdef HAVE_PSELECT
1339           struct timespec ts;
1340           struct timespec *tsptr;
1341
1342           if (timeout != GST_CLOCK_TIME_NONE) {
1343             GST_TIME_TO_TIMESPEC (timeout, ts);
1344             tsptr = &ts;
1345           } else {
1346             tsptr = NULL;
1347           }
1348
1349           GST_DEBUG ("Calling pselect");
1350           res =
1351               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1352           GST_DEBUG ("After pselect, res:%d", res);
1353 #endif
1354         }
1355
1356         if (res >= 0) {
1357           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1358         }
1359 #else /* G_OS_WIN32 */
1360         g_assert_not_reached ();
1361         errno = ENOSYS;
1362 #endif
1363         break;
1364       }
1365       case GST_POLL_MODE_WINDOWS:
1366       {
1367 #ifdef G_OS_WIN32
1368         gint ignore_count = set->active_fds_ignored->len;
1369         DWORD t, wait_ret;
1370
1371         if (G_LIKELY (ignore_count == 0)) {
1372           if (timeout != GST_CLOCK_TIME_NONE)
1373             t = GST_TIME_AS_MSECONDS (timeout);
1374           else
1375             t = INFINITE;
1376         } else {
1377           /* already one or more ignored fds, so we quickly sweep the others */
1378           t = 0;
1379         }
1380
1381         if (set->active_events->len != 0) {
1382           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1383               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1384         } else {
1385           wait_ret = WSA_WAIT_FAILED;
1386           WSASetLastError (WSA_INVALID_PARAMETER);
1387         }
1388
1389         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1390           res = 0;
1391         } else if (wait_ret == WSA_WAIT_FAILED) {
1392           res = -1;
1393           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1394         } else {
1395           /* the first entry is the wakeup event */
1396           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1397             res = gst_poll_collect_winsock_events (set);
1398           } else {
1399             res = 1;            /* wakeup event */
1400           }
1401         }
1402 #else
1403         g_assert_not_reached ();
1404         errno = ENOSYS;
1405 #endif
1406         break;
1407       }
1408     }
1409
1410     if (!is_timer) {
1411       /* Applications needs to clear the control socket themselves for timer
1412        * polls.
1413        * For other polls, we need to clear the control socket. If there was only
1414        * one socket with activity and it was the control socket, we need to
1415        * restart */
1416       if (release_all_wakeup (set) > 0 && res == 1)
1417         restarting = TRUE;
1418     }
1419
1420     if (G_UNLIKELY (IS_FLUSHING (set))) {
1421       /* we got woken up and we are flushing, we need to stop */
1422       errno = EBUSY;
1423       res = -1;
1424       break;
1425     }
1426   } while (G_UNLIKELY (restarting));
1427
1428   DEC_WAITING (set);
1429
1430   return res;
1431
1432   /* ERRORS */
1433 already_waiting:
1434   {
1435     DEC_WAITING (set);
1436     errno = EPERM;
1437     return -1;
1438   }
1439 flushing:
1440   {
1441     DEC_WAITING (set);
1442     errno = EBUSY;
1443     return -1;
1444   }
1445 #ifdef G_OS_WIN32
1446 winsock_error:
1447   {
1448     g_mutex_unlock (set->lock);
1449     DEC_WAITING (set);
1450     return -1;
1451   }
1452 #endif
1453 }
1454
1455 /**
1456  * gst_poll_set_controllable:
1457  * @set: a #GstPoll.
1458  * @controllable: new controllable state.
1459  *
1460  * When @controllable is %TRUE, this function ensures that future calls to
1461  * gst_poll_wait() will be affected by gst_poll_restart() and
1462  * gst_poll_set_flushing().
1463  *
1464  * Returns: %TRUE if the controllability of @set could be updated.
1465  *
1466  * Since: 0.10.18
1467  */
1468 gboolean
1469 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1470 {
1471   g_return_val_if_fail (set != NULL, FALSE);
1472
1473   GST_LOG ("%p: controllable : %d", set, controllable);
1474
1475   set->controllable = controllable;
1476
1477   return TRUE;
1478 }
1479
1480 /**
1481  * gst_poll_restart:
1482  * @set: a #GstPoll.
1483  *
1484  * Restart any gst_poll_wait() that is in progress. This function is typically
1485  * used after adding or removing descriptors to @set.
1486  *
1487  * If @set is not controllable, then this call will have no effect.
1488  *
1489  * Since: 0.10.18
1490  */
1491 void
1492 gst_poll_restart (GstPoll * set)
1493 {
1494   g_return_if_fail (set != NULL);
1495
1496   if (set->controllable && GET_WAITING (set) > 0) {
1497     /* we are controllable and waiting, wake up the waiter. The socket will be
1498      * cleared by the _wait() thread and the poll will be restarted */
1499     raise_wakeup (set);
1500   }
1501 }
1502
1503 /**
1504  * gst_poll_set_flushing:
1505  * @set: a #GstPoll.
1506  * @flushing: new flushing state.
1507  *
1508  * When @flushing is %TRUE, this function ensures that current and future calls
1509  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1510  *
1511  * Unsetting the flushing state will restore normal operation of @set.
1512  *
1513  * Since: 0.10.18
1514  */
1515 void
1516 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1517 {
1518   g_return_if_fail (set != NULL);
1519
1520   /* update the new state first */
1521   SET_FLUSHING (set, flushing);
1522
1523   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1524     /* we are flushing, controllable and waiting, wake up the waiter. When we
1525      * stop the flushing operation we don't clear the wakeup fd here, this will
1526      * happen in the _wait() thread. */
1527     raise_wakeup (set);
1528   }
1529 }
1530
1531 /**
1532  * gst_poll_write_control:
1533  * @set: a #GstPoll.
1534  *
1535  * Write a byte to the control socket of the controllable @set.
1536  * This function is mostly useful for timer #GstPoll objects created with
1537  * gst_poll_new_timer(). 
1538  *
1539  * It will make any current and future gst_poll_wait() function return with
1540  * 1, meaning the control socket is set. After an equal amount of calls to
1541  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1542  * block again until their timeout expired.
1543  *
1544  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1545  * byte could not be written.
1546  *
1547  * Since: 0.10.23
1548  */
1549 gboolean
1550 gst_poll_write_control (GstPoll * set)
1551 {
1552   gboolean res;
1553
1554   g_return_val_if_fail (set != NULL, FALSE);
1555   g_return_val_if_fail (set->timer, FALSE);
1556
1557   res = raise_wakeup (set);
1558
1559   return res;
1560 }
1561
1562 /**
1563  * gst_poll_read_control:
1564  * @set: a #GstPoll.
1565  *
1566  * Read a byte from the control socket of the controllable @set.
1567  * This function is mostly useful for timer #GstPoll objects created with
1568  * gst_poll_new_timer(). 
1569  *
1570  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1571  * was no byte to read.
1572  *
1573  * Since: 0.10.23
1574  */
1575 gboolean
1576 gst_poll_read_control (GstPoll * set)
1577 {
1578   gboolean res;
1579
1580   g_return_val_if_fail (set != NULL, FALSE);
1581   g_return_val_if_fail (set->timer, FALSE);
1582
1583   res = release_wakeup (set);
1584
1585   return res;
1586 }